diff --git a/Cargo.lock b/Cargo.lock index 461c5fd..82c08cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,7 +79,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" dependencies = [ - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -89,7 +89,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" dependencies = [ "anstyle", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -409,6 +409,18 @@ version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "nodrop" version = "0.1.14" @@ -517,9 +529,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.35" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ "proc-macro2", ] @@ -577,7 +589,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" version = "0.2.0-rc.0" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" dependencies = [ "bus", "cobs", @@ -587,6 +599,7 @@ dependencies = [ "downcast-rs", "dyn-clone", "hashbrown", + "mio", "num-traits", "num_enum", "paste", @@ -601,7 +614,7 @@ dependencies = [ [[package]] name = "satrs-mib" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" dependencies = [ "csv", "satrs-mib-codegen", @@ -613,7 +626,7 @@ dependencies = [ [[package]] name = "satrs-mib-codegen" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" dependencies = [ "proc-macro2", "quote", @@ -623,7 +636,7 @@ dependencies = [ [[package]] name = "satrs-shared" version = "0.1.3" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" dependencies = [ "serde", "spacepackets", @@ -682,7 +695,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -800,6 +813,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "wasm-bindgen" version = "0.2.92" @@ -863,6 +882,15 @@ dependencies = [ "windows-targets 0.52.4", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/pytmtc/tmtc_conf.json b/pytmtc/tmtc_conf.json deleted file mode 100644 index f2c8afd..0000000 --- a/pytmtc/tmtc_conf.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "com_if": "tcp", - "tcpip_udp_ip_addr": "127.0.0.1", - "tcpip_udp_port": 7301, - "tcpip_udp_recv_max_size": 1500, - "tcpip_tcp_ip_addr": "127.0.0.1", - "tcpip_tcp_port": 7301 -} diff --git a/src/ccsds.rs b/src/ccsds.rs index 0ba4776..6828538 100644 --- a/src/ccsds.rs +++ b/src/ccsds.rs @@ -1,5 +1,4 @@ -use ops_sat_rs::config::components::Apid; -use ops_sat_rs::config::APID_VALIDATOR; +use ops_sat_rs::config::EXPERIMENT_APID; use satrs::pus::ReceivesEcssPusTc; use satrs::spacepackets::{CcsdsPacket, SpHeader}; use satrs::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; @@ -19,7 +18,7 @@ impl< > ValidatorU16Id for CcsdsReceiver { fn validate(&self, apid: u16) -> bool { - APID_VALIDATOR.contains(&apid) + apid == EXPERIMENT_APID } } @@ -35,11 +34,7 @@ impl< sp_header: &SpHeader, tc_raw: &[u8], ) -> Result<(), Self::Error> { - if sp_header.apid() == Apid::Cfdp as u16 { - } else { - return self.tc_source.pass_ccsds(sp_header, tc_raw); - } - Ok(()) + return self.tc_source.pass_ccsds(sp_header, tc_raw); } fn handle_packet_with_unknown_apid( @@ -48,6 +43,7 @@ impl< _tc_raw: &[u8], ) -> Result<(), Self::Error> { log::warn!("unknown APID 0x{:x?} detected", sp_header.apid()); + // TODO: Log event or something similar? Ok(()) } } diff --git a/src/config.rs b/src/config.rs index 8041828..f6fcc3c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,6 +3,8 @@ use satrs_mib::res_code::ResultU16Info; use satrs_mib::resultcode; use std::net::Ipv4Addr; +pub const STOP_FILE_NAME: &str = "stop-experiment"; + pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const SERVER_PORT: u16 = 7301; pub const EXPERIMENT_ID: u32 = 278; @@ -96,4 +98,7 @@ pub mod tasks { pub const FREQ_MS_EVENT_HANDLING: u64 = 400; pub const FREQ_MS_AOCS: u64 = 500; pub const FREQ_MS_PUS_STACK: u64 = 200; + pub const FREQ_MS_CTRL: u64 = 400; + + pub const STOP_CHECK_FREQUENCY: u64 = 400; } diff --git a/src/interface/tcp.rs b/src/interface/tcp.rs index 8055af8..99e3805 100644 --- a/src/interface/tcp.rs +++ b/src/interface/tcp.rs @@ -1,11 +1,13 @@ use std::{ collections::{HashSet, VecDeque}, - sync::{Arc, Mutex}, + sync::{atomic::AtomicBool, Arc, Mutex}, + time::Duration, }; use log::{info, warn}; +use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use satrs::{ - hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer}, + hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer}, pus::ReceivesEcssPusTc, spacepackets::PacketId, tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore}, @@ -69,12 +71,25 @@ impl TmPacketSourceCore for SyncTcpTmSource { } } +#[derive(Default)] +pub struct ConnectionFinishedHandler {} + +impl HandledConnectionHandler for ConnectionFinishedHandler { + fn handled_connection(&mut self, info: satrs::hal::std::tcp_server::HandledConnectionInfo) { + info!( + "Served {} TMs and {} TCs for client {:?}", + info.num_sent_tms, info.num_received_tcs, info.addr + ); + } +} + pub type TcpServerType = TcpSpacepacketsServer< - (), - CcsdsError, SyncTcpTmSource, CcsdsDistributor, MpscErrorType>, HashSet, + ConnectionFinishedHandler, + (), + CcsdsError, >; pub struct TcpTask< @@ -102,25 +117,28 @@ impl< tm_source: SyncTcpTmSource, tc_receiver: CcsdsDistributor, MpscErrorType>, packet_id_lookup: Vec, + stop_signal: Arc, ) -> Result { Ok(Self { - server: TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)?, + server: TcpSpacepacketsServer::new( + cfg, + tm_source, + tc_receiver, + packet_id_lookup, + ConnectionFinishedHandler::default(), + Some(stop_signal), + )?, }) } pub fn periodic_operation(&mut self) { - loop { - let result = self.server.handle_next_connection(); - match result { - Ok(conn_result) => { - info!( - "Served {} TMs and {} TCs for client {:?}", - conn_result.num_sent_tms, conn_result.num_received_tcs, conn_result.addr - ); - } - Err(e) => { - warn!("TCP server error: {e:?}"); - } + let result = self + .server + .handle_next_connection(Some(Duration::from_millis(STOP_CHECK_FREQUENCY))); + match result { + Ok(_conn_result) => (), + Err(e) => { + warn!("TCP server error: {e:?}"); } } } diff --git a/src/main.rs b/src/main.rs index b4ee981..236b245 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,16 @@ use std::{ net::{IpAddr, SocketAddr}, - sync::mpsc, + sync::{atomic::AtomicBool, mpsc, Arc}, thread, time::Duration, }; use log::info; -use ops_sat_rs::config::{components::CONTROLLER_ID, tasks::FREQ_MS_PUS_STACK, EXPERIMENT_APID}; +use ops_sat_rs::config::{ + components::CONTROLLER_ID, + tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, + EXPERIMENT_APID, STOP_FILE_NAME, +}; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use satrs::{ hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, @@ -38,10 +42,11 @@ mod requests; mod tm_funnel; mod tmtc; -#[allow(dead_code)] fn main() { setup_logger().expect("setting up logging with fern failed"); - println!("OPS-SAT Rust experiment OBSW"); + println!("OPS-SAT Rust Experiment OBSW"); + + let stop_signal = Arc::new(AtomicBool::new(false)); let (tc_source_tx, tc_source_rx) = mpsc::channel(); let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); @@ -143,53 +148,94 @@ fn main() { sync_tm_tcp_source.clone(), tcp_ccsds_distributor, vec![PacketId::new_for_tc(true, EXPERIMENT_APID)], - // PACKET_ID_VALIDATOR.clone(), + stop_signal.clone(), ) .expect("tcp server creation failed"); - let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); + let mut tm_funnel = TmFunnelDynamic::new( + sync_tm_tcp_source, + tm_funnel_rx, + tm_server_tx, + stop_signal.clone(), + ); + + info!("Starting CTRL task"); + let ctrl_stop_signal = stop_signal.clone(); + let jh_ctrl_thread = thread::Builder::new() + .name("ops-sat ctrl".to_string()) + .spawn(move || loop { + if std::path::Path::new(STOP_FILE_NAME).exists() { + log::warn!( + "Detected stop file name at {}. Initiating experiment shutdown", + STOP_FILE_NAME + ); + ctrl_stop_signal.store(true, std::sync::atomic::Ordering::Relaxed); + break; + } + thread::sleep(Duration::from_millis(FREQ_MS_CTRL)); + }) + .unwrap(); info!("Starting TMTC and UDP task"); + let tmtc_stop_signal = stop_signal.clone(); let jh_udp_tmtc = thread::Builder::new() - .name("TMTC and UDP".to_string()) + .name("ops-sat tmtc-udp".to_string()) .spawn(move || { info!("Running UDP server on port {SERVER_PORT}"); loop { udp_tmtc_server.periodic_operation(); tmtc_task.periodic_operation(); + if tmtc_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); } }) .unwrap(); + let tcp_stop_signal = stop_signal.clone(); info!("Starting TCP task"); let jh_tcp = thread::Builder::new() - .name("TCP".to_string()) + .name("ops-sat tcp".to_string()) .spawn(move || { info!("Running TCP server on port {SERVER_PORT}"); loop { tcp_server.periodic_operation(); + if tcp_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } } }) .unwrap(); info!("Starting TM funnel task"); + let funnel_stop_signal = stop_signal.clone(); let jh_tm_funnel = thread::Builder::new() - .name("TM Funnel".to_string()) + .name("ops-sat tm-funnel".to_string()) .spawn(move || loop { tm_funnel.operation(); + if funnel_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } }) .unwrap(); info!("Starting PUS handler thread"); + let pus_stop_signal = stop_signal.clone(); let jh_pus_handler = thread::Builder::new() - .name("PUS".to_string()) + .name("ops-sat pus".to_string()) .spawn(move || loop { pus_stack.periodic_operation(); + if pus_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); }) .unwrap(); + jh_ctrl_thread + .join() + .expect("Joining Controller thread failed"); jh_udp_tmtc .join() .expect("Joining UDP TMTC server thread failed"); diff --git a/src/tm_funnel.rs b/src/tm_funnel.rs index 20c9d91..a0ef7b1 100644 --- a/src/tm_funnel.rs +++ b/src/tm_funnel.rs @@ -1,9 +1,9 @@ -use std::{ - collections::HashMap, - sync::mpsc::{self}, -}; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::{collections::HashMap, sync::mpsc, time::Duration}; use log::info; +use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use satrs::pus::PusTmAsVec; use satrs::{ seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, @@ -78,6 +78,7 @@ pub struct TmFunnelDynamic { common: TmFunnelCommon, tm_funnel_rx: mpsc::Receiver, tm_server_tx: mpsc::Sender, + stop_signal: Arc, } impl TmFunnelDynamic { @@ -85,25 +86,49 @@ impl TmFunnelDynamic { sync_tm_tcp_source: SyncTcpTmSource, tm_funnel_rx: mpsc::Receiver, tm_server_tx: mpsc::Sender, + stop_signal: Arc, ) -> Self { Self { common: TmFunnelCommon::new(sync_tm_tcp_source), tm_funnel_rx, tm_server_tx, + stop_signal, } } pub fn operation(&mut self) { - if let Ok(mut tm) = self.tm_funnel_rx.recv() { - // Read the TM, set sequence counter and message counter, and finally update - // the CRC. - let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN) - .expect("Creating TM zero copy writer failed"); - self.common.apply_packet_processing(zero_copy_writer); - self.common.sync_tm_tcp_source.add_tm(&tm.packet); - self.tm_server_tx - .send(tm) - .expect("Sending TM to server failed"); + loop { + match self + .tm_funnel_rx + .recv_timeout(Duration::from_millis(STOP_CHECK_FREQUENCY)) + { + Ok(mut tm) => { + // Read the TM, set sequence counter and message counter, and finally update + // the CRC. + let zero_copy_writer = + PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN) + .expect("Creating TM zero copy writer failed"); + self.common.apply_packet_processing(zero_copy_writer); + self.common.sync_tm_tcp_source.add_tm(&tm.packet); + self.tm_server_tx + .send(tm) + .expect("Sending TM to server failed"); + if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + } + Err(e) => match e { + mpsc::RecvTimeoutError::Timeout => { + if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + } + mpsc::RecvTimeoutError::Disconnected => { + log::warn!("All TM funnel senders have disconnected"); + break; + } + }, + } } } }