From 8b077c47b27d6b18e24f20d3d0b99028481f8ee0 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 11 Jul 2023 22:48:43 +0200 Subject: [PATCH 1/2] added first crossbeam API to allow bounded API --- satrs-core/src/pus/mod.rs | 141 ++++++++++++++++++++++++++++++++++---- 1 file changed, 126 insertions(+), 15 deletions(-) diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index bb0cd17..e4b2db4 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -61,7 +61,7 @@ pub type AcceptedTc = (StoreAddr, VerificationToken); #[derive(Debug, Copy, Clone)] pub enum GenericSendError { RxDisconnected, - QueueFull(u32), + QueueFull(Option), } impl Display for GenericSendError { @@ -71,7 +71,7 @@ impl Display for GenericSendError { write!(f, "rx side has disconnected") } GenericSendError::QueueFull(max_cap) => { - write!(f, "queue with max capacity of {max_cap} is full") + write!(f, "queue with max capacity of {max_cap:?} is full") } } } @@ -327,8 +327,30 @@ pub mod std_mod { use std::cell::RefCell; use std::string::String; use std::sync::mpsc; - use std::sync::mpsc::{SendError, TryRecvError}; + use std::sync::mpsc::TryRecvError; use thiserror::Error; + use crossbeam_channel as cb; + + impl From> for EcssTmtcError { + fn from(_: mpsc::SendError) -> Self { + Self::Send(GenericSendError::RxDisconnected) + } + } + + impl From> for EcssTmtcError { + fn from(_: cb::SendError) -> Self { + Self::Send(GenericSendError::RxDisconnected) + } + } + + impl From> for EcssTmtcError { + fn from(value: cb::TrySendError) -> Self { + match value { + cb::TrySendError::Full(_) => Self::Send(GenericSendError::QueueFull(None)), + cb::TrySendError::Disconnected(_) => Self::Send(GenericSendError::RxDisconnected), + } + } + } #[derive(Clone)] pub struct MpscTmInStoreSender { @@ -336,7 +358,6 @@ pub mod std_mod { name: &'static str, shared_tm_store: SharedTmStore, sender: mpsc::Sender, - pub ignore_poison_errors: bool, } impl EcssChannel for MpscTmInStoreSender { @@ -349,11 +370,6 @@ pub mod std_mod { } } - impl From> for EcssTmtcError { - fn from(_: SendError) -> Self { - Self::Send(GenericSendError::RxDisconnected) - } - } impl MpscTmInStoreSender { pub fn send_direct_tm(&self, tm: PusTmCreator) -> Result<(), EcssTmtcError> { let addr = self.shared_tm_store.add_pus_tm(&tm)?; @@ -366,7 +382,10 @@ pub mod std_mod { impl EcssTmSenderCore for MpscTmInStoreSender { fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { match tm { - PusTmWrapper::InStore(addr) => self.sender.send(addr).map_err(|e| e.into()), + PusTmWrapper::InStore(addr) => { + self.sender.send(addr)?; + Ok(()) + } PusTmWrapper::Direct(tm) => self.send_direct_tm(tm), } } @@ -384,7 +403,6 @@ pub mod std_mod { name, shared_tm_store, sender, - ignore_poison_errors: false, } } } @@ -393,7 +411,6 @@ pub mod std_mod { id: ChannelId, name: &'static str, receiver: mpsc::Receiver, - pub ignore_poison_errors: bool, } impl EcssChannel for MpscTcInStoreReceiver { @@ -431,7 +448,6 @@ pub mod std_mod { id, name, receiver, - ignore_poison_errors: false, } } } @@ -447,8 +463,8 @@ pub mod std_mod { name: &'static str, } - impl From>> for EcssTmtcError { - fn from(_: SendError>) -> Self { + impl From>> for EcssTmtcError { + fn from(_: mpsc::SendError>) -> Self { Self::Send(GenericSendError::RxDisconnected) } } @@ -482,6 +498,101 @@ pub mod std_mod { } } + #[derive(Clone)] + pub struct CrossbeamTmInStoreSender { + id: ChannelId, + name: &'static str, + shared_tm_store: SharedTmStore, + sender: crossbeam_channel::Sender, + pub ignore_poison_errors: bool, + } + + impl CrossbeamTmInStoreSender { + pub fn new( + id: ChannelId, + name: &'static str, + shared_tm_store: SharedTmStore, + sender: crossbeam_channel::Sender, + ignore_poison_errors: bool, + ) -> Self { + Self { + id, + name, + shared_tm_store, + sender, + ignore_poison_errors, + } + } + } + + impl EcssChannel for CrossbeamTmInStoreSender { + fn id(&self) -> ChannelId { + self.id + } + + fn name(&self) -> &'static str { + self.name + } + } + + impl EcssTmSenderCore for CrossbeamTmInStoreSender { + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { + match tm { + PusTmWrapper::InStore(addr) => self.sender.try_send(addr)?, + PusTmWrapper::Direct(tm) => { + let addr = self.shared_tm_store.add_pus_tm(&tm)?; + self.sender.try_send(addr)?; + } + } + Ok(()) + } + } + + pub struct CrossbeamTcInStoreReceiver { + id: ChannelId, + name: &'static str, + receiver: cb::Receiver, + } + + impl CrossbeamTcInStoreReceiver { + pub fn new( + id: ChannelId, + name: &'static str, + receiver: cb::Receiver + ) -> Self { + Self { + id, + name, + receiver, + } + } + } + + impl EcssChannel for CrossbeamTcInStoreReceiver { + fn id(&self) -> ChannelId { + self.id + } + + fn name(&self) -> &'static str { + self.name + } + } + + impl EcssTcReceiverCore for CrossbeamTcInStoreReceiver { + fn recv_tc(&self) -> Result { + let (store_addr, token) = self.receiver.try_recv().map_err(|e| match e { + cb::TryRecvError::Empty => TryRecvTmtcError::Empty, + cb::TryRecvError::Disconnected => { + TryRecvTmtcError::Error(EcssTmtcError::from(GenericRecvError::TxDisconnected)) + } + })?; + Ok(ReceivedTcWrapper { + store_addr, + token: Some(token), + }) + } + } + #[derive(Debug, Clone, Error)] pub enum PusPacketHandlingError { #[error("generic PUS error: {0}")] From df346467bc8c4d6086028ef5c637618df4571e71 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 11 Jul 2023 22:52:06 +0200 Subject: [PATCH 2/2] seems to work well --- satrs-core/src/pus/mod.rs | 19 ++++--------------- satrs-core/tests/pus_verification.rs | 10 ++++------ 2 files changed, 8 insertions(+), 21 deletions(-) diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index e4b2db4..f03ab0f 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -319,6 +319,7 @@ pub mod std_mod { use crate::ChannelId; use alloc::boxed::Box; use alloc::vec::Vec; + use crossbeam_channel as cb; use spacepackets::ecss::tm::PusTmCreator; use spacepackets::ecss::PusError; use spacepackets::time::cds::TimeProvider; @@ -329,7 +330,6 @@ pub mod std_mod { use std::sync::mpsc; use std::sync::mpsc::TryRecvError; use thiserror::Error; - use crossbeam_channel as cb; impl From> for EcssTmtcError { fn from(_: mpsc::SendError) -> Self { @@ -444,11 +444,7 @@ pub mod std_mod { name: &'static str, receiver: mpsc::Receiver, ) -> Self { - Self { - id, - name, - receiver, - } + Self { id, name, receiver } } } @@ -504,7 +500,6 @@ pub mod std_mod { name: &'static str, shared_tm_store: SharedTmStore, sender: crossbeam_channel::Sender, - pub ignore_poison_errors: bool, } impl CrossbeamTmInStoreSender { @@ -513,14 +508,12 @@ pub mod std_mod { name: &'static str, shared_tm_store: SharedTmStore, sender: crossbeam_channel::Sender, - ignore_poison_errors: bool, ) -> Self { Self { id, name, shared_tm_store, sender, - ignore_poison_errors, } } } @@ -558,13 +551,9 @@ pub mod std_mod { pub fn new( id: ChannelId, name: &'static str, - receiver: cb::Receiver + receiver: cb::Receiver, ) -> Self { - Self { - id, - name, - receiver, - } + Self { id, name, receiver } } } diff --git a/satrs-core/tests/pus_verification.rs b/satrs-core/tests/pus_verification.rs index 443ba89..097eba8 100644 --- a/satrs-core/tests/pus_verification.rs +++ b/satrs-core/tests/pus_verification.rs @@ -1,5 +1,3 @@ -// TODO: Refactor this to also test the STD impl using mpsc -// TODO: Change back to cross-beam as soon as STD impl was added back for TM. #[cfg(feature = "crossbeam")] pub mod crossbeam_test { use hashbrown::HashMap; @@ -7,13 +5,13 @@ pub mod crossbeam_test { use satrs_core::pus::verification::{ FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender, }; - use satrs_core::pus::MpscTmInStoreSender; + use satrs_core::pus::CrossbeamTmInStoreSender; use satrs_core::tmtc::tm_helper::SharedTmStore; use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader}; use spacepackets::ecss::tm::PusTmReader; use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, SerializablePusPacket}; use spacepackets::SpHeader; - use std::sync::{mpsc, Arc, RwLock}; + use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; @@ -39,9 +37,9 @@ pub mod crossbeam_test { let shared_tm_store = SharedTmStore::new(Box::new(LocalPool::new(pool_cfg.clone()))); let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); let shared_tc_pool_1 = shared_tc_pool_0.clone(); - let (tx, rx) = mpsc::channel(); + let (tx, rx) = crossbeam_channel::bounded(10); let sender = - MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tx.clone()); + CrossbeamTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tx.clone()); let mut reporter_with_sender_0 = VerificationReporterWithSender::new(&cfg, Box::new(sender)); let mut reporter_with_sender_1 = reporter_with_sender_0.clone();