2024-02-07 18:10:47 +01:00
|
|
|
use std::{
|
|
|
|
collections::HashMap,
|
2024-02-26 11:41:42 +01:00
|
|
|
sync::mpsc::{self},
|
2024-02-07 18:10:47 +01:00
|
|
|
};
|
|
|
|
|
2024-02-10 11:59:26 +01:00
|
|
|
use log::info;
|
2024-04-16 11:04:22 +02:00
|
|
|
use satrs::tmtc::{PacketAsVec, PacketInPool, SharedPacketPool};
|
2024-02-12 15:51:37 +01:00
|
|
|
use satrs::{
|
2024-04-04 15:18:53 +02:00
|
|
|
pool::PoolProvider,
|
2024-02-07 18:10:47 +01:00
|
|
|
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
|
|
|
|
spacepackets::{
|
|
|
|
ecss::{tm::PusTmZeroCopyWriter, PusPacket},
|
|
|
|
time::cds::MIN_CDS_FIELD_LEN,
|
|
|
|
CcsdsPacket,
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
2024-04-10 12:57:32 +02:00
|
|
|
use crate::interface::tcp::SyncTcpTmSource;
|
2024-02-07 18:10:47 +01:00
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
pub struct CcsdsSeqCounterMap {
|
|
|
|
apid_seq_counter_map: HashMap<u16, CcsdsSimpleSeqCountProvider>,
|
|
|
|
}
|
|
|
|
impl CcsdsSeqCounterMap {
|
|
|
|
pub fn get_and_increment(&mut self, apid: u16) -> u16 {
|
|
|
|
self.apid_seq_counter_map
|
|
|
|
.entry(apid)
|
|
|
|
.or_default()
|
|
|
|
.get_and_increment()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct TmFunnelCommon {
|
|
|
|
seq_counter_map: CcsdsSeqCounterMap,
|
|
|
|
msg_counter_map: HashMap<u8, u16>,
|
|
|
|
sync_tm_tcp_source: SyncTcpTmSource,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl TmFunnelCommon {
|
|
|
|
pub fn new(sync_tm_tcp_source: SyncTcpTmSource) -> Self {
|
|
|
|
Self {
|
|
|
|
seq_counter_map: Default::default(),
|
|
|
|
msg_counter_map: Default::default(),
|
|
|
|
sync_tm_tcp_source,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Applies common packet processing operations for PUS TM packets. This includes setting
|
|
|
|
// a sequence counter
|
|
|
|
fn apply_packet_processing(&mut self, mut zero_copy_writer: PusTmZeroCopyWriter) {
|
|
|
|
// zero_copy_writer.set_apid(PUS_APID);
|
|
|
|
zero_copy_writer.set_seq_count(
|
|
|
|
self.seq_counter_map
|
|
|
|
.get_and_increment(zero_copy_writer.apid()),
|
|
|
|
);
|
|
|
|
let entry = self
|
|
|
|
.msg_counter_map
|
|
|
|
.entry(zero_copy_writer.service())
|
|
|
|
.or_insert(0);
|
|
|
|
zero_copy_writer.set_msg_count(*entry);
|
|
|
|
if *entry == u16::MAX {
|
|
|
|
*entry = 0;
|
|
|
|
} else {
|
|
|
|
*entry += 1;
|
|
|
|
}
|
|
|
|
|
2024-02-10 11:59:26 +01:00
|
|
|
Self::packet_printout(&zero_copy_writer);
|
2024-02-07 18:10:47 +01:00
|
|
|
// This operation has to come last!
|
|
|
|
zero_copy_writer.finish();
|
|
|
|
}
|
2024-02-10 11:59:26 +01:00
|
|
|
|
|
|
|
fn packet_printout(tm: &PusTmZeroCopyWriter) {
|
2024-04-24 14:30:45 +02:00
|
|
|
info!(
|
|
|
|
"Sending PUS TM[{},{}] with APID {}",
|
|
|
|
tm.service(),
|
|
|
|
tm.subservice(),
|
|
|
|
tm.apid()
|
|
|
|
);
|
2024-02-10 11:59:26 +01:00
|
|
|
}
|
2024-02-07 18:10:47 +01:00
|
|
|
}
|
|
|
|
|
2024-04-24 14:30:45 +02:00
|
|
|
pub struct TmSinkStatic {
|
2024-02-07 18:10:47 +01:00
|
|
|
common: TmFunnelCommon,
|
2024-04-16 11:04:22 +02:00
|
|
|
shared_tm_store: SharedPacketPool,
|
|
|
|
tm_funnel_rx: mpsc::Receiver<PacketInPool>,
|
|
|
|
tm_server_tx: mpsc::SyncSender<PacketInPool>,
|
2024-02-07 18:10:47 +01:00
|
|
|
}
|
|
|
|
|
2024-04-24 14:30:45 +02:00
|
|
|
impl TmSinkStatic {
|
2024-02-07 18:10:47 +01:00
|
|
|
pub fn new(
|
2024-04-16 11:04:22 +02:00
|
|
|
shared_tm_store: SharedPacketPool,
|
2024-02-07 18:10:47 +01:00
|
|
|
sync_tm_tcp_source: SyncTcpTmSource,
|
2024-04-16 11:04:22 +02:00
|
|
|
tm_funnel_rx: mpsc::Receiver<PacketInPool>,
|
|
|
|
tm_server_tx: mpsc::SyncSender<PacketInPool>,
|
2024-02-07 18:10:47 +01:00
|
|
|
) -> Self {
|
|
|
|
Self {
|
|
|
|
common: TmFunnelCommon::new(sync_tm_tcp_source),
|
|
|
|
shared_tm_store,
|
|
|
|
tm_funnel_rx,
|
|
|
|
tm_server_tx,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn operation(&mut self) {
|
2024-04-04 15:18:53 +02:00
|
|
|
if let Ok(pus_tm_in_pool) = self.tm_funnel_rx.recv() {
|
2024-02-07 18:10:47 +01:00
|
|
|
// Read the TM, set sequence counter and message counter, and finally update
|
|
|
|
// the CRC.
|
2024-04-16 11:04:22 +02:00
|
|
|
let shared_pool = self.shared_tm_store.0.clone();
|
2024-02-07 18:10:47 +01:00
|
|
|
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
|
2024-02-10 11:59:26 +01:00
|
|
|
let mut tm_copy = Vec::new();
|
|
|
|
pool_guard
|
2024-04-04 15:18:53 +02:00
|
|
|
.modify(&pus_tm_in_pool.store_addr, |buf| {
|
2024-02-10 11:59:26 +01:00
|
|
|
let zero_copy_writer = PusTmZeroCopyWriter::new(buf, MIN_CDS_FIELD_LEN)
|
|
|
|
.expect("Creating TM zero copy writer failed");
|
|
|
|
self.common.apply_packet_processing(zero_copy_writer);
|
|
|
|
tm_copy = buf.to_vec()
|
|
|
|
})
|
2024-02-07 18:10:47 +01:00
|
|
|
.expect("Reading TM from pool failed");
|
|
|
|
self.tm_server_tx
|
2024-04-04 15:18:53 +02:00
|
|
|
.send(pus_tm_in_pool)
|
2024-02-07 18:10:47 +01:00
|
|
|
.expect("Sending TM to server failed");
|
2024-02-10 11:59:26 +01:00
|
|
|
// We could also do this step in the update closure, but I'd rather avoid this, could
|
|
|
|
// lead to nested locking.
|
|
|
|
self.common.sync_tm_tcp_source.add_tm(&tm_copy);
|
2024-02-07 18:10:47 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-04-24 14:30:45 +02:00
|
|
|
pub struct TmSinkDynamic {
|
2024-02-07 18:10:47 +01:00
|
|
|
common: TmFunnelCommon,
|
2024-04-16 11:04:22 +02:00
|
|
|
tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
|
|
|
|
tm_server_tx: mpsc::Sender<PacketAsVec>,
|
2024-02-07 18:10:47 +01:00
|
|
|
}
|
|
|
|
|
2024-04-24 14:30:45 +02:00
|
|
|
impl TmSinkDynamic {
|
2024-02-07 18:10:47 +01:00
|
|
|
pub fn new(
|
|
|
|
sync_tm_tcp_source: SyncTcpTmSource,
|
2024-04-16 11:04:22 +02:00
|
|
|
tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
|
|
|
|
tm_server_tx: mpsc::Sender<PacketAsVec>,
|
2024-02-07 18:10:47 +01:00
|
|
|
) -> Self {
|
|
|
|
Self {
|
|
|
|
common: TmFunnelCommon::new(sync_tm_tcp_source),
|
|
|
|
tm_funnel_rx,
|
|
|
|
tm_server_tx,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn operation(&mut self) {
|
|
|
|
if let Ok(mut tm) = self.tm_funnel_rx.recv() {
|
|
|
|
// Read the TM, set sequence counter and message counter, and finally update
|
|
|
|
// the CRC.
|
2024-04-04 15:18:53 +02:00
|
|
|
let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN)
|
2024-02-07 18:10:47 +01:00
|
|
|
.expect("Creating TM zero copy writer failed");
|
|
|
|
self.common.apply_packet_processing(zero_copy_writer);
|
2024-04-04 15:18:53 +02:00
|
|
|
self.common.sync_tm_tcp_source.add_tm(&tm.packet);
|
2024-02-07 18:10:47 +01:00
|
|
|
self.tm_server_tx
|
2024-04-04 15:18:53 +02:00
|
|
|
.send(tm)
|
2024-02-07 18:10:47 +01:00
|
|
|
.expect("Sending TM to server failed");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|