ops-sat-rs/src/tmtc/tm_sink.rs

135 lines
4.2 KiB
Rust
Raw Normal View History

2024-04-10 12:47:26 +02:00
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
2024-04-09 17:07:39 +02:00
use std::{collections::HashMap, sync::mpsc, time::Duration};
2024-04-09 11:18:54 +02:00
use log::info;
2024-04-10 12:47:26 +02:00
use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY;
2024-04-09 13:52:02 +02:00
use satrs::pus::PusTmAsVec;
2024-04-09 11:18:54 +02:00
use satrs::{
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
spacepackets::{
ecss::{tm::PusTmZeroCopyWriter, PusPacket},
time::cds::MIN_CDS_FIELD_LEN,
CcsdsPacket,
},
};
2024-04-13 15:16:53 +02:00
use crate::interface::tcp_server::SyncTcpTmSource;
2024-04-09 11:18:54 +02:00
#[derive(Default)]
pub struct CcsdsSeqCounterMap {
apid_seq_counter_map: HashMap<u16, CcsdsSimpleSeqCountProvider>,
}
2024-04-09 13:52:02 +02:00
2024-04-09 11:18:54 +02:00
impl CcsdsSeqCounterMap {
pub fn get_and_increment(&mut self, apid: u16) -> u16 {
self.apid_seq_counter_map
.entry(apid)
.or_default()
.get_and_increment()
}
}
pub struct TmFunnelCommon {
seq_counter_map: CcsdsSeqCounterMap,
msg_counter_map: HashMap<u8, u16>,
sync_tm_tcp_source: SyncTcpTmSource,
}
impl TmFunnelCommon {
pub fn new(sync_tm_tcp_source: SyncTcpTmSource) -> Self {
Self {
seq_counter_map: Default::default(),
msg_counter_map: Default::default(),
sync_tm_tcp_source,
}
}
// Applies common packet processing operations for PUS TM packets. This includes setting
// a sequence counter
fn apply_packet_processing(&mut self, mut zero_copy_writer: PusTmZeroCopyWriter) {
// zero_copy_writer.set_apid(PUS_APID);
zero_copy_writer.set_seq_count(
self.seq_counter_map
.get_and_increment(zero_copy_writer.apid()),
);
let entry = self
.msg_counter_map
.entry(zero_copy_writer.service())
.or_insert(0);
zero_copy_writer.set_msg_count(*entry);
if *entry == u16::MAX {
*entry = 0;
} else {
*entry += 1;
}
Self::packet_printout(&zero_copy_writer);
// This operation has to come last!
zero_copy_writer.finish();
}
fn packet_printout(tm: &PusTmZeroCopyWriter) {
info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice());
}
}
pub struct TmFunnelDynamic {
common: TmFunnelCommon,
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
tm_server_tx: mpsc::Sender<PusTmAsVec>,
2024-04-10 12:47:26 +02:00
stop_signal: Arc<AtomicBool>,
2024-04-09 11:18:54 +02:00
}
impl TmFunnelDynamic {
pub fn new(
sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
tm_server_tx: mpsc::Sender<PusTmAsVec>,
2024-04-10 12:47:26 +02:00
stop_signal: Arc<AtomicBool>,
2024-04-09 11:18:54 +02:00
) -> Self {
Self {
common: TmFunnelCommon::new(sync_tm_tcp_source),
tm_funnel_rx,
tm_server_tx,
2024-04-10 12:47:26 +02:00
stop_signal,
2024-04-09 17:07:39 +02:00
}
}
2024-04-09 11:18:54 +02:00
pub fn operation(&mut self) {
2024-04-09 17:07:39 +02:00
loop {
2024-04-10 12:47:26 +02:00
match self
.tm_funnel_rx
.recv_timeout(Duration::from_millis(STOP_CHECK_FREQUENCY))
{
2024-04-09 17:07:39 +02:00
Ok(mut tm) => {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.
let zero_copy_writer =
PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
self.common.sync_tm_tcp_source.add_tm(&tm.packet);
self.tm_server_tx
.send(tm)
.expect("Sending TM to server failed");
2024-04-10 12:47:26 +02:00
if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
2024-04-09 17:07:39 +02:00
break;
}
}
Err(e) => match e {
mpsc::RecvTimeoutError::Timeout => {
2024-04-10 12:47:26 +02:00
if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
2024-04-09 17:07:39 +02:00
break;
}
}
mpsc::RecvTimeoutError::Disconnected => {
log::warn!("All TM funnel senders have disconnected");
2024-04-10 12:47:26 +02:00
break;
2024-04-09 17:07:39 +02:00
}
},
}
2024-04-09 11:18:54 +02:00
}
}
}