added tmtc and interface directories
This commit is contained in:
53
src/tmtc/ccsds.rs
Normal file
53
src/tmtc/ccsds.rs
Normal file
@ -0,0 +1,53 @@
|
||||
use satrs::pus::ReceivesEcssPusTc;
|
||||
use satrs::spacepackets::{CcsdsPacket, SpHeader};
|
||||
use satrs::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc};
|
||||
use satrs::ValidatorU16Id;
|
||||
use ops_sat_rs::config::components::Apid;
|
||||
use ops_sat_rs::config::APID_VALIDATOR;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CcsdsReceiver<
|
||||
TcSource: ReceivesCcsdsTc<Error = E> + ReceivesEcssPusTc<Error = E> + Clone,
|
||||
E,
|
||||
> {
|
||||
pub tc_source: TcSource,
|
||||
}
|
||||
|
||||
impl<
|
||||
TcSource: ReceivesCcsdsTc<Error = E> + ReceivesEcssPusTc<Error = E> + Clone + 'static,
|
||||
E: 'static,
|
||||
> ValidatorU16Id for CcsdsReceiver<TcSource, E>
|
||||
{
|
||||
fn validate(&self, apid: u16) -> bool {
|
||||
APID_VALIDATOR.contains(&apid)
|
||||
}
|
||||
}
|
||||
|
||||
impl<
|
||||
TcSource: ReceivesCcsdsTc<Error = E> + ReceivesEcssPusTc<Error = E> + Clone + 'static,
|
||||
E: 'static,
|
||||
> CcsdsPacketHandler for CcsdsReceiver<TcSource, E>
|
||||
{
|
||||
type Error = E;
|
||||
|
||||
fn handle_packet_with_valid_apid(
|
||||
&mut self,
|
||||
sp_header: &SpHeader,
|
||||
tc_raw: &[u8],
|
||||
) -> Result<(), Self::Error> {
|
||||
if sp_header.apid() == Apid::Cfdp as u16 {
|
||||
} else {
|
||||
return self.tc_source.pass_ccsds(sp_header, tc_raw);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_packet_with_unknown_apid(
|
||||
&mut self,
|
||||
sp_header: &SpHeader,
|
||||
_tc_raw: &[u8],
|
||||
) -> Result<(), Self::Error> {
|
||||
log::warn!("unknown APID 0x{:x?} detected", sp_header.apid());
|
||||
Ok(())
|
||||
}
|
||||
}
|
97
src/tmtc/mod.rs
Normal file
97
src/tmtc/mod.rs
Normal file
@ -0,0 +1,97 @@
|
||||
use satrs::{
|
||||
pus::ReceivesEcssPusTc,
|
||||
spacepackets::{ecss::tc::PusTcReader, SpHeader},
|
||||
tmtc::ReceivesCcsdsTc,
|
||||
};
|
||||
use std::sync::mpsc::{self, SendError, Sender, TryRecvError};
|
||||
use satrs::pool::{StoreAddr, StoreError};
|
||||
use satrs::pus::{EcssTcAndToken, MpscTmAsVecSender};
|
||||
use satrs::spacepackets::ecss::PusPacket;
|
||||
use crate::pus::PusReceiver;
|
||||
use thiserror::Error;
|
||||
|
||||
pub mod tm_funnel;
|
||||
pub mod ccsds;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Error)]
|
||||
pub enum MpscStoreAndSendError {
|
||||
#[error("Store error: {0}")]
|
||||
Store(#[from] StoreError),
|
||||
#[error("TC send error: {0}")]
|
||||
TcSend(#[from] SendError<EcssTcAndToken>),
|
||||
#[error("TMTC send error: {0}")]
|
||||
TmTcSend(#[from] SendError<StoreAddr>),
|
||||
}
|
||||
|
||||
// Newtype, can not implement necessary traits on MPSC sender directly because of orphan rules.
|
||||
#[derive(Clone)]
|
||||
pub struct PusTcSourceProviderDynamic(pub Sender<Vec<u8>>);
|
||||
|
||||
impl ReceivesEcssPusTc for PusTcSourceProviderDynamic {
|
||||
type Error = SendError<Vec<u8>>;
|
||||
|
||||
fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
|
||||
self.0.send(pus_tc.raw_data().to_vec())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ReceivesCcsdsTc for PusTcSourceProviderDynamic {
|
||||
type Error = mpsc::SendError<Vec<u8>>;
|
||||
|
||||
fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||
self.0.send(tc_raw.to_vec())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// TC source components where the heap is the backing memory of the received telecommands.
|
||||
pub struct TcSourceTaskDynamic {
|
||||
pub tc_receiver: mpsc::Receiver<Vec<u8>>,
|
||||
pus_receiver: PusReceiver<MpscTmAsVecSender>,
|
||||
}
|
||||
|
||||
impl TcSourceTaskDynamic {
|
||||
pub fn new(
|
||||
tc_receiver: mpsc::Receiver<Vec<u8>>,
|
||||
pus_receiver: PusReceiver<MpscTmAsVecSender>,
|
||||
) -> Self {
|
||||
Self {
|
||||
tc_receiver,
|
||||
pus_receiver,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn periodic_operation(&mut self) {
|
||||
self.poll_tc();
|
||||
}
|
||||
|
||||
pub fn poll_tc(&mut self) -> bool {
|
||||
match self.tc_receiver.try_recv() {
|
||||
Ok(tc) => match PusTcReader::new(&tc) {
|
||||
Ok((pus_tc, _)) => {
|
||||
self.pus_receiver
|
||||
.handle_tc_packet(
|
||||
satrs::pus::TcInMemory::Vec(tc.clone()),
|
||||
pus_tc.service(),
|
||||
&pus_tc,
|
||||
)
|
||||
.ok();
|
||||
true
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("error creating PUS TC from raw data: {e}");
|
||||
log::warn!("raw data: {:x?}", tc);
|
||||
true
|
||||
}
|
||||
},
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => false,
|
||||
TryRecvError::Disconnected => {
|
||||
log::warn!("tmtc thread: sender disconnected");
|
||||
false
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
157
src/tmtc/tm_funnel.rs
Normal file
157
src/tmtc/tm_funnel.rs
Normal file
@ -0,0 +1,157 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::mpsc::{self},
|
||||
};
|
||||
|
||||
use log::info;
|
||||
use satrs::pus::{PusTmAsVec, PusTmInPool};
|
||||
use satrs::{
|
||||
pool::PoolProvider,
|
||||
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
|
||||
spacepackets::{
|
||||
ecss::{tm::PusTmZeroCopyWriter, PusPacket},
|
||||
time::cds::MIN_CDS_FIELD_LEN,
|
||||
CcsdsPacket,
|
||||
},
|
||||
tmtc::tm_helper::SharedTmPool,
|
||||
};
|
||||
|
||||
use crate::interface::tcp::SyncTcpTmSource;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct CcsdsSeqCounterMap {
|
||||
apid_seq_counter_map: HashMap<u16, CcsdsSimpleSeqCountProvider>,
|
||||
}
|
||||
impl CcsdsSeqCounterMap {
|
||||
pub fn get_and_increment(&mut self, apid: u16) -> u16 {
|
||||
self.apid_seq_counter_map
|
||||
.entry(apid)
|
||||
.or_default()
|
||||
.get_and_increment()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TmFunnelCommon {
|
||||
seq_counter_map: CcsdsSeqCounterMap,
|
||||
msg_counter_map: HashMap<u8, u16>,
|
||||
sync_tm_tcp_source: SyncTcpTmSource,
|
||||
}
|
||||
|
||||
impl TmFunnelCommon {
|
||||
pub fn new(sync_tm_tcp_source: SyncTcpTmSource) -> Self {
|
||||
Self {
|
||||
seq_counter_map: Default::default(),
|
||||
msg_counter_map: Default::default(),
|
||||
sync_tm_tcp_source,
|
||||
}
|
||||
}
|
||||
|
||||
// Applies common packet processing operations for PUS TM packets. This includes setting
|
||||
// a sequence counter
|
||||
fn apply_packet_processing(&mut self, mut zero_copy_writer: PusTmZeroCopyWriter) {
|
||||
// zero_copy_writer.set_apid(PUS_APID);
|
||||
zero_copy_writer.set_seq_count(
|
||||
self.seq_counter_map
|
||||
.get_and_increment(zero_copy_writer.apid()),
|
||||
);
|
||||
let entry = self
|
||||
.msg_counter_map
|
||||
.entry(zero_copy_writer.service())
|
||||
.or_insert(0);
|
||||
zero_copy_writer.set_msg_count(*entry);
|
||||
if *entry == u16::MAX {
|
||||
*entry = 0;
|
||||
} else {
|
||||
*entry += 1;
|
||||
}
|
||||
|
||||
Self::packet_printout(&zero_copy_writer);
|
||||
// This operation has to come last!
|
||||
zero_copy_writer.finish();
|
||||
}
|
||||
|
||||
fn packet_printout(tm: &PusTmZeroCopyWriter) {
|
||||
info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice());
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TmFunnelStatic {
|
||||
common: TmFunnelCommon,
|
||||
shared_tm_store: SharedTmPool,
|
||||
tm_funnel_rx: mpsc::Receiver<PusTmInPool>,
|
||||
tm_server_tx: mpsc::SyncSender<PusTmInPool>,
|
||||
}
|
||||
|
||||
impl TmFunnelStatic {
|
||||
pub fn new(
|
||||
shared_tm_store: SharedTmPool,
|
||||
sync_tm_tcp_source: SyncTcpTmSource,
|
||||
tm_funnel_rx: mpsc::Receiver<PusTmInPool>,
|
||||
tm_server_tx: mpsc::SyncSender<PusTmInPool>,
|
||||
) -> Self {
|
||||
Self {
|
||||
common: TmFunnelCommon::new(sync_tm_tcp_source),
|
||||
shared_tm_store,
|
||||
tm_funnel_rx,
|
||||
tm_server_tx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn operation(&mut self) {
|
||||
if let Ok(pus_tm_in_pool) = self.tm_funnel_rx.recv() {
|
||||
// Read the TM, set sequence counter and message counter, and finally update
|
||||
// the CRC.
|
||||
let shared_pool = self.shared_tm_store.clone_backing_pool();
|
||||
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
|
||||
let mut tm_copy = Vec::new();
|
||||
pool_guard
|
||||
.modify(&pus_tm_in_pool.store_addr, |buf| {
|
||||
let zero_copy_writer = PusTmZeroCopyWriter::new(buf, MIN_CDS_FIELD_LEN)
|
||||
.expect("Creating TM zero copy writer failed");
|
||||
self.common.apply_packet_processing(zero_copy_writer);
|
||||
tm_copy = buf.to_vec()
|
||||
})
|
||||
.expect("Reading TM from pool failed");
|
||||
self.tm_server_tx
|
||||
.send(pus_tm_in_pool)
|
||||
.expect("Sending TM to server failed");
|
||||
// We could also do this step in the update closure, but I'd rather avoid this, could
|
||||
// lead to nested locking.
|
||||
self.common.sync_tm_tcp_source.add_tm(&tm_copy);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TmFunnelDynamic {
|
||||
common: TmFunnelCommon,
|
||||
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
|
||||
tm_server_tx: mpsc::Sender<PusTmAsVec>,
|
||||
}
|
||||
|
||||
impl TmFunnelDynamic {
|
||||
pub fn new(
|
||||
sync_tm_tcp_source: SyncTcpTmSource,
|
||||
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
|
||||
tm_server_tx: mpsc::Sender<PusTmAsVec>,
|
||||
) -> Self {
|
||||
Self {
|
||||
common: TmFunnelCommon::new(sync_tm_tcp_source),
|
||||
tm_funnel_rx,
|
||||
tm_server_tx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn operation(&mut self) {
|
||||
if let Ok(mut tm) = self.tm_funnel_rx.recv() {
|
||||
// Read the TM, set sequence counter and message counter, and finally update
|
||||
// the CRC.
|
||||
let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN)
|
||||
.expect("Creating TM zero copy writer failed");
|
||||
self.common.apply_packet_processing(zero_copy_writer);
|
||||
self.common.sync_tm_tcp_source.add_tm(&tm.packet);
|
||||
self.tm_server_tx
|
||||
.send(tm)
|
||||
.expect("Sending TM to server failed");
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user