diff --git a/src/config.rs b/src/config.rs index cccd73c..719df9b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,6 +6,8 @@ use satrs_mib::resultcode; use std::{collections::HashSet, net::Ipv4Addr}; use strum::IntoEnumIterator; +pub const STOP_FILE_NAME: &str = "stop-experiment"; + pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const SERVER_PORT: u16 = 7301; @@ -124,4 +126,5 @@ pub mod tasks { pub const FREQ_MS_EVENT_HANDLING: u64 = 400; pub const FREQ_MS_AOCS: u64 = 500; pub const FREQ_MS_PUS_STACK: u64 = 200; + pub const FREQ_MS_CTRL: u64 = 400; } diff --git a/src/interface/udp.rs b/src/interface/udp.rs index 5c45e9e..65193a9 100644 --- a/src/interface/udp.rs +++ b/src/interface/udp.rs @@ -82,6 +82,7 @@ mod tests { sync::{Arc, Mutex}, }; + use ops_sat_rs::config::{components, OBSW_SERVER_ADDR}; use satrs::{ spacepackets::{ ecss::{tc::PusTcCreator, WritablePusPacket}, @@ -89,7 +90,6 @@ mod tests { }, tmtc::ReceivesTcCore, }; - use ops_sat_rs::config::{components, OBSW_SERVER_ADDR}; use super::*; diff --git a/src/main.rs b/src/main.rs index 2db5ca5..607e1e8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,18 @@ use std::{ net::{IpAddr, SocketAddr}, - sync::mpsc, + sync::{atomic::AtomicBool, mpsc, Arc}, thread, time::Duration, }; use log::info; -use ops_sat_rs::config::tasks::FREQ_MS_PUS_STACK; use ops_sat_rs::config::{ tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT, }; +use ops_sat_rs::config::{ + tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, + STOP_FILE_NAME, +}; use satrs::{ hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, tmtc::CcsdsDistributor, @@ -36,10 +39,11 @@ mod requests; mod tm_funnel; mod tmtc; -#[allow(dead_code)] fn main() { setup_logger().expect("setting up logging with fern failed"); - println!("OPS-SAT Rust experiment OBSW"); + println!("OPS-SAT Rust Experiment OBSW"); + + let stop_signal = Arc::new(AtomicBool::new(false)); let (tc_source_tx, tc_source_rx) = mpsc::channel(); let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); @@ -152,6 +156,23 @@ fn main() { }) .unwrap(); + let ctrl_stop_signal = stop_signal.clone(); + let jh_ctrl_thread = thread::Builder::new() + .name("CTRL".to_string()) + .spawn(move || loop { + // TODO: Check stop file status regularly. If it exists, set the stop signal. + if std::path::Path::new(STOP_FILE_NAME).exists() { + log::warn!( + "Detected stop file name at {}. Initiating experiment shutdown", + STOP_FILE_NAME + ); + ctrl_stop_signal.store(true, std::sync::atomic::Ordering::Relaxed); + } + thread::sleep(Duration::from_millis(FREQ_MS_CTRL)); + }) + .unwrap(); + + let tcp_stop_signal = stop_signal.clone(); info!("Starting TCP task"); let jh_tcp = thread::Builder::new() .name("TCP".to_string()) @@ -159,27 +180,42 @@ fn main() { info!("Running TCP server on port {SERVER_PORT}"); loop { tcp_server.periodic_operation(); + if tcp_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + log::warn!("breaking0"); + break; + } } }) .unwrap(); info!("Starting TM funnel task"); + let funnel_stop_signal = stop_signal.clone(); let jh_tm_funnel = thread::Builder::new() .name("TM Funnel".to_string()) .spawn(move || loop { tm_funnel.operation(); + if funnel_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } }) .unwrap(); info!("Starting PUS handler thread"); + let pus_stop_signal = stop_signal.clone(); let jh_pus_handler = thread::Builder::new() .name("PUS".to_string()) .spawn(move || loop { pus_stack.periodic_operation(); + if pus_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); }) .unwrap(); + jh_ctrl_thread + .join() + .expect("Joining Controller thread failed"); jh_udp_tmtc .join() .expect("Joining UDP TMTC server thread failed"); diff --git a/src/tm_funnel.rs b/src/tm_funnel.rs index 20c9d91..e64b354 100644 --- a/src/tm_funnel.rs +++ b/src/tm_funnel.rs @@ -1,7 +1,5 @@ -use std::{ - collections::HashMap, - sync::mpsc::{self}, -}; +use std::time::Instant; +use std::{collections::HashMap, sync::mpsc, time::Duration}; use log::info; use satrs::pus::PusTmAsVec; @@ -76,6 +74,7 @@ impl TmFunnelCommon { pub struct TmFunnelDynamic { common: TmFunnelCommon, + last_ctrl_check: Instant, tm_funnel_rx: mpsc::Receiver, tm_server_tx: mpsc::Sender, } @@ -90,20 +89,47 @@ impl TmFunnelDynamic { common: TmFunnelCommon::new(sync_tm_tcp_source), tm_funnel_rx, tm_server_tx, + last_ctrl_check: Instant::now(), } } + pub fn check_for_ctrl_check(&mut self) -> bool { + if Instant::now() - self.last_ctrl_check > Duration::from_millis(400) { + self.last_ctrl_check = Instant::now(); + return true; + } + false + } + pub fn operation(&mut self) { - if let Ok(mut tm) = self.tm_funnel_rx.recv() { - // Read the TM, set sequence counter and message counter, and finally update - // the CRC. - let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN) - .expect("Creating TM zero copy writer failed"); - self.common.apply_packet_processing(zero_copy_writer); - self.common.sync_tm_tcp_source.add_tm(&tm.packet); - self.tm_server_tx - .send(tm) - .expect("Sending TM to server failed"); + loop { + match self.tm_funnel_rx.recv_timeout(Duration::from_millis(100)) { + Ok(mut tm) => { + // Read the TM, set sequence counter and message counter, and finally update + // the CRC. + let zero_copy_writer = + PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN) + .expect("Creating TM zero copy writer failed"); + self.common.apply_packet_processing(zero_copy_writer); + self.common.sync_tm_tcp_source.add_tm(&tm.packet); + self.tm_server_tx + .send(tm) + .expect("Sending TM to server failed"); + if self.check_for_ctrl_check() { + break; + } + } + Err(e) => match e { + mpsc::RecvTimeoutError::Timeout => { + if self.check_for_ctrl_check() { + break; + } + } + mpsc::RecvTimeoutError::Disconnected => { + log::warn!("All TM funnel senders have disconnected"); + } + }, + } } } }