diff --git a/Cargo.lock b/Cargo.lock index 05ceabe..f8d0d52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -317,6 +317,18 @@ version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "nodrop" version = "0.1.14" @@ -424,9 +436,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.35" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ "proc-macro2", ] @@ -455,7 +467,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" version = "0.2.0-rc.0" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" dependencies = [ "bus", "cobs", @@ -465,6 +477,7 @@ dependencies = [ "downcast-rs", "dyn-clone", "hashbrown", + "mio", "num-traits", "num_enum", "paste", @@ -479,7 +492,7 @@ dependencies = [ [[package]] name = "satrs-mib" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" dependencies = [ "csv", "satrs-mib-codegen", @@ -491,7 +504,7 @@ dependencies = [ [[package]] name = "satrs-mib-codegen" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" dependencies = [ "proc-macro2", "quote", @@ -501,7 +514,7 @@ dependencies = [ [[package]] name = "satrs-shared" version = "0.1.3" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" dependencies = [ "serde", "spacepackets", @@ -560,7 +573,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -672,6 +685,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "wasm-bindgen" version = "0.2.92" @@ -735,6 +754,15 @@ dependencies = [ "windows-targets 0.52.4", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/src/config.rs b/src/config.rs index 719df9b..623f8f1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -127,4 +127,6 @@ pub mod tasks { pub const FREQ_MS_AOCS: u64 = 500; pub const FREQ_MS_PUS_STACK: u64 = 200; pub const FREQ_MS_CTRL: u64 = 400; + + pub const STOP_CHECK_FREQUENCY: u64 = 400; } diff --git a/src/interface/tcp.rs b/src/interface/tcp.rs index 04bb136..a5d0569 100644 --- a/src/interface/tcp.rs +++ b/src/interface/tcp.rs @@ -1,11 +1,13 @@ use std::{ collections::{HashSet, VecDeque}, - sync::{Arc, Mutex}, + sync::{atomic::AtomicBool, Arc, Mutex}, + time::Duration, }; use log::{info, warn}; +use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use satrs::{ - hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer}, + hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer}, pus::ReceivesEcssPusTc, spacepackets::PacketId, tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore}, @@ -69,12 +71,25 @@ impl TmPacketSourceCore for SyncTcpTmSource { } } +#[derive(Default)] +pub struct ConnectionFinishedHandler {} + +impl HandledConnectionHandler for ConnectionFinishedHandler { + fn handled_connection(&mut self, info: satrs::hal::std::tcp_server::HandledConnectionInfo) { + info!( + "Served {} TMs and {} TCs for client {:?}", + info.num_sent_tms, info.num_received_tcs, info.addr + ); + } +} + pub type TcpServerType = TcpSpacepacketsServer< - (), - CcsdsError, SyncTcpTmSource, CcsdsDistributor, MpscErrorType>, HashSet, + ConnectionFinishedHandler, + (), + CcsdsError, >; pub struct TcpTask< @@ -102,25 +117,28 @@ impl< tm_source: SyncTcpTmSource, tc_receiver: CcsdsDistributor, MpscErrorType>, packet_id_lookup: HashSet, + stop_signal: Arc, ) -> Result { Ok(Self { - server: TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)?, + server: TcpSpacepacketsServer::new( + cfg, + tm_source, + tc_receiver, + packet_id_lookup, + ConnectionFinishedHandler::default(), + Some(stop_signal), + )?, }) } pub fn periodic_operation(&mut self) { - loop { - let result = self.server.handle_next_connection(); - match result { - Ok(conn_result) => { - info!( - "Served {} TMs and {} TCs for client {:?}", - conn_result.num_sent_tms, conn_result.num_received_tcs, conn_result.addr - ); - } - Err(e) => { - warn!("TCP server error: {e:?}"); - } + let result = self + .server + .handle_next_connection(Some(Duration::from_millis(STOP_CHECK_FREQUENCY))); + match result { + Ok(_conn_result) => (), + Err(e) => { + warn!("TCP server error: {e:?}"); } } } diff --git a/src/main.rs b/src/main.rs index 607e1e8..fb6c215 100644 --- a/src/main.rs +++ b/src/main.rs @@ -138,50 +138,60 @@ fn main() { sync_tm_tcp_source.clone(), tcp_ccsds_distributor, PACKET_ID_VALIDATOR.clone(), + stop_signal.clone(), ) .expect("tcp server creation failed"); - let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); - - info!("Starting TMTC and UDP task"); - let jh_udp_tmtc = thread::Builder::new() - .name("TMTC and UDP".to_string()) - .spawn(move || { - info!("Running UDP server on port {SERVER_PORT}"); - loop { - udp_tmtc_server.periodic_operation(); - tmtc_task.periodic_operation(); - thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); - } - }) - .unwrap(); + let mut tm_funnel = TmFunnelDynamic::new( + sync_tm_tcp_source, + tm_funnel_rx, + tm_server_tx, + stop_signal.clone(), + ); + info!("Starting CTRL task"); let ctrl_stop_signal = stop_signal.clone(); let jh_ctrl_thread = thread::Builder::new() - .name("CTRL".to_string()) + .name("ops-sat ctrl".to_string()) .spawn(move || loop { - // TODO: Check stop file status regularly. If it exists, set the stop signal. if std::path::Path::new(STOP_FILE_NAME).exists() { log::warn!( "Detected stop file name at {}. Initiating experiment shutdown", STOP_FILE_NAME ); ctrl_stop_signal.store(true, std::sync::atomic::Ordering::Relaxed); + break; } thread::sleep(Duration::from_millis(FREQ_MS_CTRL)); }) .unwrap(); + info!("Starting TMTC and UDP task"); + let tmtc_stop_signal = stop_signal.clone(); + let jh_udp_tmtc = thread::Builder::new() + .name("ops-sat tmtc-udp".to_string()) + .spawn(move || { + info!("Running UDP server on port {SERVER_PORT}"); + loop { + udp_tmtc_server.periodic_operation(); + tmtc_task.periodic_operation(); + if tmtc_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); + } + }) + .unwrap(); + let tcp_stop_signal = stop_signal.clone(); info!("Starting TCP task"); let jh_tcp = thread::Builder::new() - .name("TCP".to_string()) + .name("ops-sat tcp".to_string()) .spawn(move || { info!("Running TCP server on port {SERVER_PORT}"); loop { tcp_server.periodic_operation(); if tcp_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { - log::warn!("breaking0"); break; } } @@ -191,7 +201,7 @@ fn main() { info!("Starting TM funnel task"); let funnel_stop_signal = stop_signal.clone(); let jh_tm_funnel = thread::Builder::new() - .name("TM Funnel".to_string()) + .name("ops-sat tm-funnel".to_string()) .spawn(move || loop { tm_funnel.operation(); if funnel_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { @@ -203,7 +213,7 @@ fn main() { info!("Starting PUS handler thread"); let pus_stop_signal = stop_signal.clone(); let jh_pus_handler = thread::Builder::new() - .name("PUS".to_string()) + .name("ops-sat pus".to_string()) .spawn(move || loop { pus_stack.periodic_operation(); if pus_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { diff --git a/src/tm_funnel.rs b/src/tm_funnel.rs index e64b354..a0ef7b1 100644 --- a/src/tm_funnel.rs +++ b/src/tm_funnel.rs @@ -1,7 +1,9 @@ -use std::time::Instant; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; use std::{collections::HashMap, sync::mpsc, time::Duration}; use log::info; +use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use satrs::pus::PusTmAsVec; use satrs::{ seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, @@ -74,9 +76,9 @@ impl TmFunnelCommon { pub struct TmFunnelDynamic { common: TmFunnelCommon, - last_ctrl_check: Instant, tm_funnel_rx: mpsc::Receiver, tm_server_tx: mpsc::Sender, + stop_signal: Arc, } impl TmFunnelDynamic { @@ -84,26 +86,22 @@ impl TmFunnelDynamic { sync_tm_tcp_source: SyncTcpTmSource, tm_funnel_rx: mpsc::Receiver, tm_server_tx: mpsc::Sender, + stop_signal: Arc, ) -> Self { Self { common: TmFunnelCommon::new(sync_tm_tcp_source), tm_funnel_rx, tm_server_tx, - last_ctrl_check: Instant::now(), + stop_signal, } } - pub fn check_for_ctrl_check(&mut self) -> bool { - if Instant::now() - self.last_ctrl_check > Duration::from_millis(400) { - self.last_ctrl_check = Instant::now(); - return true; - } - false - } - pub fn operation(&mut self) { loop { - match self.tm_funnel_rx.recv_timeout(Duration::from_millis(100)) { + match self + .tm_funnel_rx + .recv_timeout(Duration::from_millis(STOP_CHECK_FREQUENCY)) + { Ok(mut tm) => { // Read the TM, set sequence counter and message counter, and finally update // the CRC. @@ -115,18 +113,19 @@ impl TmFunnelDynamic { self.tm_server_tx .send(tm) .expect("Sending TM to server failed"); - if self.check_for_ctrl_check() { + if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } } Err(e) => match e { mpsc::RecvTimeoutError::Timeout => { - if self.check_for_ctrl_check() { + if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } } mpsc::RecvTimeoutError::Disconnected => { log::warn!("All TM funnel senders have disconnected"); + break; } }, }