use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::{collections::HashMap, sync::mpsc, time::Duration}; use log::info; use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use satrs::tmtc::PacketAsVec; use satrs::{ seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, spacepackets::{ ecss::{tm::PusTmZeroCopyWriter, PusPacket}, time::cds::MIN_CDS_FIELD_LEN, CcsdsPacket, }, }; use crate::interface::tcp_server::SyncTcpTmSource; #[derive(Default)] pub struct CcsdsSeqCounterMap { apid_seq_counter_map: HashMap, } impl CcsdsSeqCounterMap { pub fn get_and_increment(&mut self, apid: u16) -> u16 { self.apid_seq_counter_map .entry(apid) .or_default() .get_and_increment() } } pub struct TmFunnelCommon { seq_counter_map: CcsdsSeqCounterMap, msg_counter_map: HashMap, sync_tm_tcp_source: SyncTcpTmSource, } impl TmFunnelCommon { pub fn new(sync_tm_tcp_source: SyncTcpTmSource) -> Self { Self { seq_counter_map: Default::default(), msg_counter_map: Default::default(), sync_tm_tcp_source, } } // Applies common packet processing operations for PUS TM packets. This includes setting // a sequence counter fn apply_packet_processing(&mut self, mut zero_copy_writer: PusTmZeroCopyWriter) { // zero_copy_writer.set_apid(PUS_APID); zero_copy_writer.set_seq_count( self.seq_counter_map .get_and_increment(zero_copy_writer.apid()), ); let entry = self .msg_counter_map .entry(zero_copy_writer.service()) .or_insert(0); zero_copy_writer.set_msg_count(*entry); if *entry == u16::MAX { *entry = 0; } else { *entry += 1; } Self::packet_printout(&zero_copy_writer); // This operation has to come last! zero_copy_writer.finish(); } fn packet_printout(tm: &PusTmZeroCopyWriter) { info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice()); } } pub struct TmFunnelDynamic { common: TmFunnelCommon, tm_funnel_rx: mpsc::Receiver, tm_server_tx: mpsc::Sender, stop_signal: Arc, } impl TmFunnelDynamic { pub fn new( sync_tm_tcp_source: SyncTcpTmSource, tm_funnel_rx: mpsc::Receiver, tm_server_tx: mpsc::Sender, stop_signal: Arc, ) -> Self { Self { common: TmFunnelCommon::new(sync_tm_tcp_source), tm_funnel_rx, tm_server_tx, stop_signal, } } pub fn operation(&mut self) { loop { match self .tm_funnel_rx .recv_timeout(Duration::from_millis(STOP_CHECK_FREQUENCY)) { Ok(mut tm) => { // Read the TM, set sequence counter and message counter, and finally update // the CRC. let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN) .expect("Creating TM zero copy writer failed"); self.common.apply_packet_processing(zero_copy_writer); self.common.sync_tm_tcp_source.add_tm(&tm.packet); self.tm_server_tx .send(tm) .expect("Sending TM to server failed"); if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } } Err(e) => match e { mpsc::RecvTimeoutError::Timeout => { if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } } mpsc::RecvTimeoutError::Disconnected => { log::warn!("All TM funnel senders have disconnected"); break; } }, } } } }