Merge pull request 'Add Crossbeam helpers' (#52) from add-crossbeam-tmtc-helpers into main

Reviewed-on: #52
This commit is contained in:
Robin Müller 2023-07-11 22:53:33 +02:00
commit aa99be8fce
2 changed files with 124 additions and 26 deletions

View File

@ -61,7 +61,7 @@ pub type AcceptedTc = (StoreAddr, VerificationToken<TcStateAccepted>);
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
pub enum GenericSendError { pub enum GenericSendError {
RxDisconnected, RxDisconnected,
QueueFull(u32), QueueFull(Option<u32>),
} }
impl Display for GenericSendError { impl Display for GenericSendError {
@ -71,7 +71,7 @@ impl Display for GenericSendError {
write!(f, "rx side has disconnected") write!(f, "rx side has disconnected")
} }
GenericSendError::QueueFull(max_cap) => { GenericSendError::QueueFull(max_cap) => {
write!(f, "queue with max capacity of {max_cap} is full") write!(f, "queue with max capacity of {max_cap:?} is full")
} }
} }
} }
@ -319,6 +319,7 @@ pub mod std_mod {
use crate::ChannelId; use crate::ChannelId;
use alloc::boxed::Box; use alloc::boxed::Box;
use alloc::vec::Vec; use alloc::vec::Vec;
use crossbeam_channel as cb;
use spacepackets::ecss::tm::PusTmCreator; use spacepackets::ecss::tm::PusTmCreator;
use spacepackets::ecss::PusError; use spacepackets::ecss::PusError;
use spacepackets::time::cds::TimeProvider; use spacepackets::time::cds::TimeProvider;
@ -327,16 +328,36 @@ pub mod std_mod {
use std::cell::RefCell; use std::cell::RefCell;
use std::string::String; use std::string::String;
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::mpsc::{SendError, TryRecvError}; use std::sync::mpsc::TryRecvError;
use thiserror::Error; use thiserror::Error;
impl From<mpsc::SendError<StoreAddr>> for EcssTmtcError {
fn from(_: mpsc::SendError<StoreAddr>) -> Self {
Self::Send(GenericSendError::RxDisconnected)
}
}
impl From<cb::SendError<StoreAddr>> for EcssTmtcError {
fn from(_: cb::SendError<StoreAddr>) -> Self {
Self::Send(GenericSendError::RxDisconnected)
}
}
impl From<cb::TrySendError<StoreAddr>> for EcssTmtcError {
fn from(value: cb::TrySendError<StoreAddr>) -> Self {
match value {
cb::TrySendError::Full(_) => Self::Send(GenericSendError::QueueFull(None)),
cb::TrySendError::Disconnected(_) => Self::Send(GenericSendError::RxDisconnected),
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct MpscTmInStoreSender { pub struct MpscTmInStoreSender {
id: ChannelId, id: ChannelId,
name: &'static str, name: &'static str,
shared_tm_store: SharedTmStore, shared_tm_store: SharedTmStore,
sender: mpsc::Sender<StoreAddr>, sender: mpsc::Sender<StoreAddr>,
pub ignore_poison_errors: bool,
} }
impl EcssChannel for MpscTmInStoreSender { impl EcssChannel for MpscTmInStoreSender {
@ -349,11 +370,6 @@ pub mod std_mod {
} }
} }
impl From<SendError<StoreAddr>> for EcssTmtcError {
fn from(_: SendError<StoreAddr>) -> Self {
Self::Send(GenericSendError::RxDisconnected)
}
}
impl MpscTmInStoreSender { impl MpscTmInStoreSender {
pub fn send_direct_tm(&self, tm: PusTmCreator) -> Result<(), EcssTmtcError> { pub fn send_direct_tm(&self, tm: PusTmCreator) -> Result<(), EcssTmtcError> {
let addr = self.shared_tm_store.add_pus_tm(&tm)?; let addr = self.shared_tm_store.add_pus_tm(&tm)?;
@ -366,7 +382,10 @@ pub mod std_mod {
impl EcssTmSenderCore for MpscTmInStoreSender { impl EcssTmSenderCore for MpscTmInStoreSender {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
match tm { match tm {
PusTmWrapper::InStore(addr) => self.sender.send(addr).map_err(|e| e.into()), PusTmWrapper::InStore(addr) => {
self.sender.send(addr)?;
Ok(())
}
PusTmWrapper::Direct(tm) => self.send_direct_tm(tm), PusTmWrapper::Direct(tm) => self.send_direct_tm(tm),
} }
} }
@ -384,7 +403,6 @@ pub mod std_mod {
name, name,
shared_tm_store, shared_tm_store,
sender, sender,
ignore_poison_errors: false,
} }
} }
} }
@ -393,7 +411,6 @@ pub mod std_mod {
id: ChannelId, id: ChannelId,
name: &'static str, name: &'static str,
receiver: mpsc::Receiver<TcAddrWithToken>, receiver: mpsc::Receiver<TcAddrWithToken>,
pub ignore_poison_errors: bool,
} }
impl EcssChannel for MpscTcInStoreReceiver { impl EcssChannel for MpscTcInStoreReceiver {
@ -427,12 +444,7 @@ pub mod std_mod {
name: &'static str, name: &'static str,
receiver: mpsc::Receiver<TcAddrWithToken>, receiver: mpsc::Receiver<TcAddrWithToken>,
) -> Self { ) -> Self {
Self { Self { id, name, receiver }
id,
name,
receiver,
ignore_poison_errors: false,
}
} }
} }
@ -447,8 +459,8 @@ pub mod std_mod {
name: &'static str, name: &'static str,
} }
impl From<SendError<Vec<u8>>> for EcssTmtcError { impl From<mpsc::SendError<Vec<u8>>> for EcssTmtcError {
fn from(_: SendError<Vec<u8>>) -> Self { fn from(_: mpsc::SendError<Vec<u8>>) -> Self {
Self::Send(GenericSendError::RxDisconnected) Self::Send(GenericSendError::RxDisconnected)
} }
} }
@ -482,6 +494,94 @@ pub mod std_mod {
} }
} }
#[derive(Clone)]
pub struct CrossbeamTmInStoreSender {
id: ChannelId,
name: &'static str,
shared_tm_store: SharedTmStore,
sender: crossbeam_channel::Sender<StoreAddr>,
}
impl CrossbeamTmInStoreSender {
pub fn new(
id: ChannelId,
name: &'static str,
shared_tm_store: SharedTmStore,
sender: crossbeam_channel::Sender<StoreAddr>,
) -> Self {
Self {
id,
name,
shared_tm_store,
sender,
}
}
}
impl EcssChannel for CrossbeamTmInStoreSender {
fn id(&self) -> ChannelId {
self.id
}
fn name(&self) -> &'static str {
self.name
}
}
impl EcssTmSenderCore for CrossbeamTmInStoreSender {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
match tm {
PusTmWrapper::InStore(addr) => self.sender.try_send(addr)?,
PusTmWrapper::Direct(tm) => {
let addr = self.shared_tm_store.add_pus_tm(&tm)?;
self.sender.try_send(addr)?;
}
}
Ok(())
}
}
pub struct CrossbeamTcInStoreReceiver {
id: ChannelId,
name: &'static str,
receiver: cb::Receiver<TcAddrWithToken>,
}
impl CrossbeamTcInStoreReceiver {
pub fn new(
id: ChannelId,
name: &'static str,
receiver: cb::Receiver<TcAddrWithToken>,
) -> Self {
Self { id, name, receiver }
}
}
impl EcssChannel for CrossbeamTcInStoreReceiver {
fn id(&self) -> ChannelId {
self.id
}
fn name(&self) -> &'static str {
self.name
}
}
impl EcssTcReceiverCore for CrossbeamTcInStoreReceiver {
fn recv_tc(&self) -> Result<ReceivedTcWrapper, TryRecvTmtcError> {
let (store_addr, token) = self.receiver.try_recv().map_err(|e| match e {
cb::TryRecvError::Empty => TryRecvTmtcError::Empty,
cb::TryRecvError::Disconnected => {
TryRecvTmtcError::Error(EcssTmtcError::from(GenericRecvError::TxDisconnected))
}
})?;
Ok(ReceivedTcWrapper {
store_addr,
token: Some(token),
})
}
}
#[derive(Debug, Clone, Error)] #[derive(Debug, Clone, Error)]
pub enum PusPacketHandlingError { pub enum PusPacketHandlingError {
#[error("generic PUS error: {0}")] #[error("generic PUS error: {0}")]

View File

@ -1,5 +1,3 @@
// TODO: Refactor this to also test the STD impl using mpsc
// TODO: Change back to cross-beam as soon as STD impl was added back for TM.
#[cfg(feature = "crossbeam")] #[cfg(feature = "crossbeam")]
pub mod crossbeam_test { pub mod crossbeam_test {
use hashbrown::HashMap; use hashbrown::HashMap;
@ -7,13 +5,13 @@ pub mod crossbeam_test {
use satrs_core::pus::verification::{ use satrs_core::pus::verification::{
FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender, FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender,
}; };
use satrs_core::pus::MpscTmInStoreSender; use satrs_core::pus::CrossbeamTmInStoreSender;
use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::tm_helper::SharedTmStore;
use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader}; use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader};
use spacepackets::ecss::tm::PusTmReader; use spacepackets::ecss::tm::PusTmReader;
use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, SerializablePusPacket}; use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, SerializablePusPacket};
use spacepackets::SpHeader; use spacepackets::SpHeader;
use std::sync::{mpsc, Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
@ -39,9 +37,9 @@ pub mod crossbeam_test {
let shared_tm_store = SharedTmStore::new(Box::new(LocalPool::new(pool_cfg.clone()))); let shared_tm_store = SharedTmStore::new(Box::new(LocalPool::new(pool_cfg.clone())));
let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg)));
let shared_tc_pool_1 = shared_tc_pool_0.clone(); let shared_tc_pool_1 = shared_tc_pool_0.clone();
let (tx, rx) = mpsc::channel(); let (tx, rx) = crossbeam_channel::bounded(10);
let sender = let sender =
MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tx.clone()); CrossbeamTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tx.clone());
let mut reporter_with_sender_0 = let mut reporter_with_sender_0 =
VerificationReporterWithSender::new(&cfg, Box::new(sender)); VerificationReporterWithSender::new(&cfg, Box::new(sender));
let mut reporter_with_sender_1 = reporter_with_sender_0.clone(); let mut reporter_with_sender_1 = reporter_with_sender_0.clone();