From 710fc94384d813a5c52f23d0e282921d0eef4f48 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 9 Apr 2024 17:07:39 +0200 Subject: [PATCH 1/3] start adding stop logic --- src/config.rs | 3 +++ src/interface/udp.rs | 2 +- src/main.rs | 44 ++++++++++++++++++++++++++++++++---- src/tm_funnel.rs | 54 ++++++++++++++++++++++++++++++++------------ 4 files changed, 84 insertions(+), 19 deletions(-) diff --git a/src/config.rs b/src/config.rs index cccd73c..719df9b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,6 +6,8 @@ use satrs_mib::resultcode; use std::{collections::HashSet, net::Ipv4Addr}; use strum::IntoEnumIterator; +pub const STOP_FILE_NAME: &str = "stop-experiment"; + pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const SERVER_PORT: u16 = 7301; @@ -124,4 +126,5 @@ pub mod tasks { pub const FREQ_MS_EVENT_HANDLING: u64 = 400; pub const FREQ_MS_AOCS: u64 = 500; pub const FREQ_MS_PUS_STACK: u64 = 200; + pub const FREQ_MS_CTRL: u64 = 400; } diff --git a/src/interface/udp.rs b/src/interface/udp.rs index 5c45e9e..65193a9 100644 --- a/src/interface/udp.rs +++ b/src/interface/udp.rs @@ -82,6 +82,7 @@ mod tests { sync::{Arc, Mutex}, }; + use ops_sat_rs::config::{components, OBSW_SERVER_ADDR}; use satrs::{ spacepackets::{ ecss::{tc::PusTcCreator, WritablePusPacket}, @@ -89,7 +90,6 @@ mod tests { }, tmtc::ReceivesTcCore, }; - use ops_sat_rs::config::{components, OBSW_SERVER_ADDR}; use super::*; diff --git a/src/main.rs b/src/main.rs index 2db5ca5..607e1e8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,18 @@ use std::{ net::{IpAddr, SocketAddr}, - sync::mpsc, + sync::{atomic::AtomicBool, mpsc, Arc}, thread, time::Duration, }; use log::info; -use ops_sat_rs::config::tasks::FREQ_MS_PUS_STACK; use ops_sat_rs::config::{ tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT, }; +use ops_sat_rs::config::{ + tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, + STOP_FILE_NAME, +}; use satrs::{ hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, tmtc::CcsdsDistributor, @@ -36,10 +39,11 @@ mod requests; mod tm_funnel; mod tmtc; -#[allow(dead_code)] fn main() { setup_logger().expect("setting up logging with fern failed"); - println!("OPS-SAT Rust experiment OBSW"); + println!("OPS-SAT Rust Experiment OBSW"); + + let stop_signal = Arc::new(AtomicBool::new(false)); let (tc_source_tx, tc_source_rx) = mpsc::channel(); let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); @@ -152,6 +156,23 @@ fn main() { }) .unwrap(); + let ctrl_stop_signal = stop_signal.clone(); + let jh_ctrl_thread = thread::Builder::new() + .name("CTRL".to_string()) + .spawn(move || loop { + // TODO: Check stop file status regularly. If it exists, set the stop signal. + if std::path::Path::new(STOP_FILE_NAME).exists() { + log::warn!( + "Detected stop file name at {}. Initiating experiment shutdown", + STOP_FILE_NAME + ); + ctrl_stop_signal.store(true, std::sync::atomic::Ordering::Relaxed); + } + thread::sleep(Duration::from_millis(FREQ_MS_CTRL)); + }) + .unwrap(); + + let tcp_stop_signal = stop_signal.clone(); info!("Starting TCP task"); let jh_tcp = thread::Builder::new() .name("TCP".to_string()) @@ -159,27 +180,42 @@ fn main() { info!("Running TCP server on port {SERVER_PORT}"); loop { tcp_server.periodic_operation(); + if tcp_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + log::warn!("breaking0"); + break; + } } }) .unwrap(); info!("Starting TM funnel task"); + let funnel_stop_signal = stop_signal.clone(); let jh_tm_funnel = thread::Builder::new() .name("TM Funnel".to_string()) .spawn(move || loop { tm_funnel.operation(); + if funnel_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } }) .unwrap(); info!("Starting PUS handler thread"); + let pus_stop_signal = stop_signal.clone(); let jh_pus_handler = thread::Builder::new() .name("PUS".to_string()) .spawn(move || loop { pus_stack.periodic_operation(); + if pus_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); }) .unwrap(); + jh_ctrl_thread + .join() + .expect("Joining Controller thread failed"); jh_udp_tmtc .join() .expect("Joining UDP TMTC server thread failed"); diff --git a/src/tm_funnel.rs b/src/tm_funnel.rs index 20c9d91..e64b354 100644 --- a/src/tm_funnel.rs +++ b/src/tm_funnel.rs @@ -1,7 +1,5 @@ -use std::{ - collections::HashMap, - sync::mpsc::{self}, -}; +use std::time::Instant; +use std::{collections::HashMap, sync::mpsc, time::Duration}; use log::info; use satrs::pus::PusTmAsVec; @@ -76,6 +74,7 @@ impl TmFunnelCommon { pub struct TmFunnelDynamic { common: TmFunnelCommon, + last_ctrl_check: Instant, tm_funnel_rx: mpsc::Receiver, tm_server_tx: mpsc::Sender, } @@ -90,20 +89,47 @@ impl TmFunnelDynamic { common: TmFunnelCommon::new(sync_tm_tcp_source), tm_funnel_rx, tm_server_tx, + last_ctrl_check: Instant::now(), } } + pub fn check_for_ctrl_check(&mut self) -> bool { + if Instant::now() - self.last_ctrl_check > Duration::from_millis(400) { + self.last_ctrl_check = Instant::now(); + return true; + } + false + } + pub fn operation(&mut self) { - if let Ok(mut tm) = self.tm_funnel_rx.recv() { - // Read the TM, set sequence counter and message counter, and finally update - // the CRC. - let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN) - .expect("Creating TM zero copy writer failed"); - self.common.apply_packet_processing(zero_copy_writer); - self.common.sync_tm_tcp_source.add_tm(&tm.packet); - self.tm_server_tx - .send(tm) - .expect("Sending TM to server failed"); + loop { + match self.tm_funnel_rx.recv_timeout(Duration::from_millis(100)) { + Ok(mut tm) => { + // Read the TM, set sequence counter and message counter, and finally update + // the CRC. + let zero_copy_writer = + PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN) + .expect("Creating TM zero copy writer failed"); + self.common.apply_packet_processing(zero_copy_writer); + self.common.sync_tm_tcp_source.add_tm(&tm.packet); + self.tm_server_tx + .send(tm) + .expect("Sending TM to server failed"); + if self.check_for_ctrl_check() { + break; + } + } + Err(e) => match e { + mpsc::RecvTimeoutError::Timeout => { + if self.check_for_ctrl_check() { + break; + } + } + mpsc::RecvTimeoutError::Disconnected => { + log::warn!("All TM funnel senders have disconnected"); + } + }, + } } } } From dc66dcd469f365e4558f496e3fc81d97871993ce Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 10 Apr 2024 12:47:26 +0200 Subject: [PATCH 2/3] it works --- Cargo.lock | 42 +++++++++++++++++++++++++++++------ src/config.rs | 2 ++ src/interface/tcp.rs | 52 +++++++++++++++++++++++++++++--------------- src/main.rs | 50 +++++++++++++++++++++++++----------------- src/tm_funnel.rs | 27 +++++++++++------------ 5 files changed, 115 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 05ceabe..f8d0d52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -317,6 +317,18 @@ version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "nodrop" version = "0.1.14" @@ -424,9 +436,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.35" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ "proc-macro2", ] @@ -455,7 +467,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" version = "0.2.0-rc.0" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" dependencies = [ "bus", "cobs", @@ -465,6 +477,7 @@ dependencies = [ "downcast-rs", "dyn-clone", "hashbrown", + "mio", "num-traits", "num_enum", "paste", @@ -479,7 +492,7 @@ dependencies = [ [[package]] name = "satrs-mib" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" dependencies = [ "csv", "satrs-mib-codegen", @@ -491,7 +504,7 @@ dependencies = [ [[package]] name = "satrs-mib-codegen" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" dependencies = [ "proc-macro2", "quote", @@ -501,7 +514,7 @@ dependencies = [ [[package]] name = "satrs-shared" version = "0.1.3" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" dependencies = [ "serde", "spacepackets", @@ -560,7 +573,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -672,6 +685,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "wasm-bindgen" version = "0.2.92" @@ -735,6 +754,15 @@ dependencies = [ "windows-targets 0.52.4", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/src/config.rs b/src/config.rs index 719df9b..623f8f1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -127,4 +127,6 @@ pub mod tasks { pub const FREQ_MS_AOCS: u64 = 500; pub const FREQ_MS_PUS_STACK: u64 = 200; pub const FREQ_MS_CTRL: u64 = 400; + + pub const STOP_CHECK_FREQUENCY: u64 = 400; } diff --git a/src/interface/tcp.rs b/src/interface/tcp.rs index 04bb136..a5d0569 100644 --- a/src/interface/tcp.rs +++ b/src/interface/tcp.rs @@ -1,11 +1,13 @@ use std::{ collections::{HashSet, VecDeque}, - sync::{Arc, Mutex}, + sync::{atomic::AtomicBool, Arc, Mutex}, + time::Duration, }; use log::{info, warn}; +use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use satrs::{ - hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer}, + hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer}, pus::ReceivesEcssPusTc, spacepackets::PacketId, tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore}, @@ -69,12 +71,25 @@ impl TmPacketSourceCore for SyncTcpTmSource { } } +#[derive(Default)] +pub struct ConnectionFinishedHandler {} + +impl HandledConnectionHandler for ConnectionFinishedHandler { + fn handled_connection(&mut self, info: satrs::hal::std::tcp_server::HandledConnectionInfo) { + info!( + "Served {} TMs and {} TCs for client {:?}", + info.num_sent_tms, info.num_received_tcs, info.addr + ); + } +} + pub type TcpServerType = TcpSpacepacketsServer< - (), - CcsdsError, SyncTcpTmSource, CcsdsDistributor, MpscErrorType>, HashSet, + ConnectionFinishedHandler, + (), + CcsdsError, >; pub struct TcpTask< @@ -102,25 +117,28 @@ impl< tm_source: SyncTcpTmSource, tc_receiver: CcsdsDistributor, MpscErrorType>, packet_id_lookup: HashSet, + stop_signal: Arc, ) -> Result { Ok(Self { - server: TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)?, + server: TcpSpacepacketsServer::new( + cfg, + tm_source, + tc_receiver, + packet_id_lookup, + ConnectionFinishedHandler::default(), + Some(stop_signal), + )?, }) } pub fn periodic_operation(&mut self) { - loop { - let result = self.server.handle_next_connection(); - match result { - Ok(conn_result) => { - info!( - "Served {} TMs and {} TCs for client {:?}", - conn_result.num_sent_tms, conn_result.num_received_tcs, conn_result.addr - ); - } - Err(e) => { - warn!("TCP server error: {e:?}"); - } + let result = self + .server + .handle_next_connection(Some(Duration::from_millis(STOP_CHECK_FREQUENCY))); + match result { + Ok(_conn_result) => (), + Err(e) => { + warn!("TCP server error: {e:?}"); } } } diff --git a/src/main.rs b/src/main.rs index 607e1e8..fb6c215 100644 --- a/src/main.rs +++ b/src/main.rs @@ -138,50 +138,60 @@ fn main() { sync_tm_tcp_source.clone(), tcp_ccsds_distributor, PACKET_ID_VALIDATOR.clone(), + stop_signal.clone(), ) .expect("tcp server creation failed"); - let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); - - info!("Starting TMTC and UDP task"); - let jh_udp_tmtc = thread::Builder::new() - .name("TMTC and UDP".to_string()) - .spawn(move || { - info!("Running UDP server on port {SERVER_PORT}"); - loop { - udp_tmtc_server.periodic_operation(); - tmtc_task.periodic_operation(); - thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); - } - }) - .unwrap(); + let mut tm_funnel = TmFunnelDynamic::new( + sync_tm_tcp_source, + tm_funnel_rx, + tm_server_tx, + stop_signal.clone(), + ); + info!("Starting CTRL task"); let ctrl_stop_signal = stop_signal.clone(); let jh_ctrl_thread = thread::Builder::new() - .name("CTRL".to_string()) + .name("ops-sat ctrl".to_string()) .spawn(move || loop { - // TODO: Check stop file status regularly. If it exists, set the stop signal. if std::path::Path::new(STOP_FILE_NAME).exists() { log::warn!( "Detected stop file name at {}. Initiating experiment shutdown", STOP_FILE_NAME ); ctrl_stop_signal.store(true, std::sync::atomic::Ordering::Relaxed); + break; } thread::sleep(Duration::from_millis(FREQ_MS_CTRL)); }) .unwrap(); + info!("Starting TMTC and UDP task"); + let tmtc_stop_signal = stop_signal.clone(); + let jh_udp_tmtc = thread::Builder::new() + .name("ops-sat tmtc-udp".to_string()) + .spawn(move || { + info!("Running UDP server on port {SERVER_PORT}"); + loop { + udp_tmtc_server.periodic_operation(); + tmtc_task.periodic_operation(); + if tmtc_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); + } + }) + .unwrap(); + let tcp_stop_signal = stop_signal.clone(); info!("Starting TCP task"); let jh_tcp = thread::Builder::new() - .name("TCP".to_string()) + .name("ops-sat tcp".to_string()) .spawn(move || { info!("Running TCP server on port {SERVER_PORT}"); loop { tcp_server.periodic_operation(); if tcp_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { - log::warn!("breaking0"); break; } } @@ -191,7 +201,7 @@ fn main() { info!("Starting TM funnel task"); let funnel_stop_signal = stop_signal.clone(); let jh_tm_funnel = thread::Builder::new() - .name("TM Funnel".to_string()) + .name("ops-sat tm-funnel".to_string()) .spawn(move || loop { tm_funnel.operation(); if funnel_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { @@ -203,7 +213,7 @@ fn main() { info!("Starting PUS handler thread"); let pus_stop_signal = stop_signal.clone(); let jh_pus_handler = thread::Builder::new() - .name("PUS".to_string()) + .name("ops-sat pus".to_string()) .spawn(move || loop { pus_stack.periodic_operation(); if pus_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { diff --git a/src/tm_funnel.rs b/src/tm_funnel.rs index e64b354..a0ef7b1 100644 --- a/src/tm_funnel.rs +++ b/src/tm_funnel.rs @@ -1,7 +1,9 @@ -use std::time::Instant; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; use std::{collections::HashMap, sync::mpsc, time::Duration}; use log::info; +use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use satrs::pus::PusTmAsVec; use satrs::{ seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, @@ -74,9 +76,9 @@ impl TmFunnelCommon { pub struct TmFunnelDynamic { common: TmFunnelCommon, - last_ctrl_check: Instant, tm_funnel_rx: mpsc::Receiver, tm_server_tx: mpsc::Sender, + stop_signal: Arc, } impl TmFunnelDynamic { @@ -84,26 +86,22 @@ impl TmFunnelDynamic { sync_tm_tcp_source: SyncTcpTmSource, tm_funnel_rx: mpsc::Receiver, tm_server_tx: mpsc::Sender, + stop_signal: Arc, ) -> Self { Self { common: TmFunnelCommon::new(sync_tm_tcp_source), tm_funnel_rx, tm_server_tx, - last_ctrl_check: Instant::now(), + stop_signal, } } - pub fn check_for_ctrl_check(&mut self) -> bool { - if Instant::now() - self.last_ctrl_check > Duration::from_millis(400) { - self.last_ctrl_check = Instant::now(); - return true; - } - false - } - pub fn operation(&mut self) { loop { - match self.tm_funnel_rx.recv_timeout(Duration::from_millis(100)) { + match self + .tm_funnel_rx + .recv_timeout(Duration::from_millis(STOP_CHECK_FREQUENCY)) + { Ok(mut tm) => { // Read the TM, set sequence counter and message counter, and finally update // the CRC. @@ -115,18 +113,19 @@ impl TmFunnelDynamic { self.tm_server_tx .send(tm) .expect("Sending TM to server failed"); - if self.check_for_ctrl_check() { + if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } } Err(e) => match e { mpsc::RecvTimeoutError::Timeout => { - if self.check_for_ctrl_check() { + if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } } mpsc::RecvTimeoutError::Disconnected => { log::warn!("All TM funnel senders have disconnected"); + break; } }, } From ebd3514dece27d4177fbe6c47a794c2191c7c7fb Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 10 Apr 2024 14:31:25 +0200 Subject: [PATCH 3/3] delete cached json file --- pytmtc/tmtc_conf.json | 8 -------- 1 file changed, 8 deletions(-) delete mode 100644 pytmtc/tmtc_conf.json diff --git a/pytmtc/tmtc_conf.json b/pytmtc/tmtc_conf.json deleted file mode 100644 index f2c8afd..0000000 --- a/pytmtc/tmtc_conf.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "com_if": "tcp", - "tcpip_udp_ip_addr": "127.0.0.1", - "tcpip_udp_port": 7301, - "tcpip_udp_recv_max_size": 1500, - "tcpip_tcp_ip_addr": "127.0.0.1", - "tcpip_tcp_port": 7301 -}