From 1f2bd0fd54195a469841abab5e7b631e65e59447 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Sat, 8 Jul 2023 14:57:11 +0200 Subject: [PATCH] improved and unified TM sending API --- satrs-core/src/pool.rs | 10 + satrs-core/src/pus/event.rs | 50 ++- satrs-core/src/pus/mod.rs | 141 ++++--- satrs-core/src/pus/test.rs | 6 +- satrs-core/src/pus/verification.rs | 531 ++++++++++++++------------- satrs-core/tests/pus_verification.rs | 11 +- satrs-example/src/main.rs | 10 +- 7 files changed, 419 insertions(+), 340 deletions(-) diff --git a/satrs-core/src/pool.rs b/satrs-core/src/pool.rs index afc4f73..d7253d5 100644 --- a/satrs-core/src/pool.rs +++ b/satrs-core/src/pool.rs @@ -138,6 +138,16 @@ pub struct StoreAddr { pub(crate) packet_idx: NumBlocks, } +impl Display for StoreAddr { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + write!( + f, + "StoreAddr(pool index: {}, packet index: {})", + self.pool_idx, self.packet_idx + ) + } +} + impl StoreAddr { pub const INVALID_ADDR: u32 = 0xFFFFFFFF; diff --git a/satrs-core/src/pus/event.rs b/satrs-core/src/pus/event.rs index 9bea69c..57f378a 100644 --- a/satrs-core/src/pus/event.rs +++ b/satrs-core/src/pus/event.rs @@ -110,7 +110,7 @@ impl EventReporterBase { ) -> Result<(), EcssTmtcErrorWithSend> { let tm = self.generate_generic_event_tm(buf, subservice, time_stamp, event_id, aux_data)?; sender - .send_tm(tm) + .send_tm(tm.into()) .map_err(|e| EcssTmtcErrorWithSend::SendError(e))?; self.msg_count += 1; Ok(()) @@ -243,9 +243,10 @@ mod tests { use super::*; use crate::events::{EventU32, Severity}; use crate::pus::tests::CommonTmInfo; - use crate::pus::EcssSender; + use crate::pus::{EcssSender, PusTmWrapper}; use crate::SenderId; use spacepackets::ByteConversionError; + use std::cell::RefCell; use std::collections::VecDeque; use std::vec::Vec; @@ -264,7 +265,7 @@ mod tests { #[derive(Default, Clone)] struct TestSender { - pub service_queue: VecDeque, + pub service_queue: RefCell>, } impl EcssSender for TestSender { @@ -276,21 +277,29 @@ mod tests { impl EcssTmSenderCore for TestSender { type Error = (); - fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { - assert!(tm.source_data().is_some()); - let src_data = tm.source_data().unwrap(); - assert!(src_data.len() >= 4); - let event = EventU32::from(u32::from_be_bytes(src_data[0..4].try_into().unwrap())); - let mut aux_data = Vec::new(); - if src_data.len() > 4 { - aux_data.extend_from_slice(&src_data[4..]); + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error> { + match tm { + PusTmWrapper::InStore(_) => { + panic!("TestSender: unexpected call with address"); + } + PusTmWrapper::Direct(tm) => { + assert!(tm.source_data().is_some()); + let src_data = tm.source_data().unwrap(); + assert!(src_data.len() >= 4); + let event = + EventU32::from(u32::from_be_bytes(src_data[0..4].try_into().unwrap())); + let mut aux_data = Vec::new(); + if src_data.len() > 4 { + aux_data.extend_from_slice(&src_data[4..]); + } + self.service_queue.borrow_mut().push_back(TmInfo { + common: CommonTmInfo::new_from_tm(&tm), + event, + aux_data, + }); + Ok(()) + } } - self.service_queue.push_back(TmInfo { - common: CommonTmInfo::new_from_tm(&tm), - event, - aux_data, - }); - Ok(()) } } @@ -359,8 +368,9 @@ mod tests { severity, error_data, ); - assert_eq!(sender.service_queue.len(), 1); - let tm_info = sender.service_queue.pop_front().unwrap(); + let mut service_queue = sender.service_queue.borrow_mut(); + assert_eq!(service_queue.len(), 1); + let tm_info = service_queue.pop_front().unwrap(); assert_eq!( tm_info.common.subservice, severity_to_subservice(severity) as u8 @@ -417,7 +427,7 @@ mod tests { let err = reporter.event_info(sender, &time_stamp_empty, event, None); assert!(err.is_err()); let err = err.unwrap_err(); - if let EcssTmtcErrorWithSend::EcssTmtcError(EcssTmtcError::ByteConversionError( + if let EcssTmtcErrorWithSend::EcssTmtcError(EcssTmtcError::ByteConversion( ByteConversionError::ToSliceTooSmall(missmatch), )) = err { diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index 0ec74be..cc5f9ce 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -2,6 +2,8 @@ //! //! This module contains structures to make working with the PUS C standard easier. //! The satrs-example application contains various usage examples of these components. +use crate::pus::verification::TcStateToken; +use crate::SenderId; #[cfg(feature = "alloc")] use downcast_rs::{impl_downcast, Downcast}; #[cfg(feature = "alloc")] @@ -26,10 +28,28 @@ pub mod verification; #[cfg(feature = "alloc")] pub use alloc_mod::*; -use crate::SenderId; +use crate::pool::StoreAddr; #[cfg(feature = "std")] pub use std_mod::*; +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum PusTmWrapper<'tm> { + InStore(StoreAddr), + Direct(PusTm<'tm>), +} + +impl From for PusTmWrapper<'_> { + fn from(value: StoreAddr) -> Self { + Self::InStore(value) + } +} + +impl<'tm> From> for PusTmWrapper<'tm> { + fn from(value: PusTm<'tm>) -> Self { + Self::Direct(value) + } +} + #[derive(Debug, Clone)] pub enum EcssTmtcErrorWithSend { /// Errors related to sending the telemetry to a TMTC recipient @@ -47,22 +67,22 @@ impl From for EcssTmtcErrorWithSend { #[derive(Debug, Clone)] pub enum EcssTmtcError { /// Errors related to the time stamp format of the telemetry - TimestampError(TimestampError), + Timestamp(TimestampError), /// Errors related to byte conversion, for example insufficient buffer size for given data - ByteConversionError(ByteConversionError), + ByteConversion(ByteConversionError), /// Errors related to PUS packet format - PusError(PusError), + Pus(PusError), } impl From for EcssTmtcError { fn from(e: PusError) -> Self { - EcssTmtcError::PusError(e) + EcssTmtcError::Pus(e) } } impl From for EcssTmtcError { fn from(e: ByteConversionError) -> Self { - EcssTmtcError::ByteConversionError(e) + EcssTmtcError::ByteConversion(e) } } @@ -79,16 +99,17 @@ pub trait EcssSender: Send { pub trait EcssTmSenderCore: EcssSender { type Error; - fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>; + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error>; } /// Generic trait for a user supplied sender object. /// -/// This sender object is responsible for sending PUS telecommands to a TC recipient. +/// This sender object is responsible for sending PUS telecommands to a TC recipient. Each +/// telecommand can optionally have a token which contains its verification state. pub trait EcssTcSenderCore: EcssSender { type Error; - fn send_tc(&mut self, tc: PusTc) -> Result<(), Self::Error>; + fn send_tc(&self, tc: PusTc, token: Option) -> Result<(), Self::Error>; } #[cfg(feature = "alloc")] @@ -142,15 +163,13 @@ pub mod std_mod { use crate::pus::verification::{ StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; - use crate::pus::{EcssSender, EcssTcSenderCore, EcssTmSenderCore}; + use crate::pus::{EcssSender, EcssTmSenderCore, PusTmWrapper}; use crate::tmtc::tm_helper::SharedTmStore; use crate::SenderId; use alloc::vec::Vec; use spacepackets::ecss::{PusError, SerializablePusPacket}; - use spacepackets::tc::PusTc; use spacepackets::time::cds::TimeProvider; use spacepackets::time::{StdTimestampError, TimeWriter}; - use spacepackets::tm::PusTm; use std::cell::RefCell; use std::format; use std::string::String; @@ -158,21 +177,21 @@ pub mod std_mod { use thiserror::Error; #[derive(Debug, Clone, Error)] - pub enum MpscPusInStoreSendError { + pub enum MpscTmInStoreSenderError { #[error("RwGuard lock error")] - LockError, + StoreLock, #[error("Generic PUS error: {0}")] - PusError(#[from] PusError), + Pus(#[from] PusError), #[error("Generic store error: {0}")] - StoreError(#[from] StoreError), - #[error("Generic send error: {0}")] - SendError(#[from] mpsc::SendError), + Store(#[from] StoreError), + #[error("MPSC channel send error: {0}")] + Send(#[from] mpsc::SendError), #[error("RX handle has disconnected")] - RxDisconnected(StoreAddr), + RxDisconnected, } #[derive(Clone)] - pub struct MpscTmtcInStoreSender { + pub struct MpscTmInStoreSender { id: SenderId, name: &'static str, store_helper: SharedPool, @@ -180,7 +199,7 @@ pub mod std_mod { pub ignore_poison_errors: bool, } - impl EcssSender for MpscTmtcInStoreSender { + impl EcssSender for MpscTmInStoreSender { fn id(&self) -> SenderId { self.id } @@ -190,11 +209,11 @@ pub mod std_mod { } } - impl MpscTmtcInStoreSender { - pub fn send_tmtc( - &mut self, + impl MpscTmInStoreSender { + pub fn send_direct_tm( + &self, tmtc: impl SerializablePusPacket, - ) -> Result<(), MpscPusInStoreSendError> { + ) -> Result<(), MpscTmInStoreSenderError> { let operation = |mut store: RwLockWriteGuard| { let (addr, slice) = store.free_element(tmtc.len_packed())?; tmtc.write_to_bytes(slice)?; @@ -207,30 +226,28 @@ pub mod std_mod { if self.ignore_poison_errors { operation(e.into_inner()) } else { - Err(MpscPusInStoreSendError::LockError) + Err(MpscTmInStoreSenderError::StoreLock) } } } } } - impl EcssTmSenderCore for MpscTmtcInStoreSender { - type Error = MpscPusInStoreSendError; + impl EcssTmSenderCore for MpscTmInStoreSender { + type Error = MpscTmInStoreSenderError; - fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { - self.send_tmtc(tm) + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error> { + match tm { + PusTmWrapper::InStore(addr) => self + .sender + .send(addr) + .map_err(MpscTmInStoreSenderError::Send), + PusTmWrapper::Direct(tm) => self.send_direct_tm(tm), + } } } - impl EcssTcSenderCore for MpscTmtcInStoreSender { - type Error = MpscPusInStoreSendError; - - fn send_tc(&mut self, tc: PusTc) -> Result<(), Self::Error> { - self.send_tmtc(tc) - } - } - - impl MpscTmtcInStoreSender { + impl MpscTmInStoreSender { pub fn new( id: SenderId, name: &'static str, @@ -247,13 +264,23 @@ pub mod std_mod { } } - #[derive(Debug, Clone)] - pub enum MpscAsVecSenderError { - PusError(PusError), - SendError(mpsc::SendError>), + #[derive(Debug, Clone, Error)] + pub enum MpscTmAsVecSenderError { + #[error("Generic PUS error: {0}")] + Pus(#[from] PusError), + #[error("MPSC channel send error: {0}")] + Send(#[from] mpsc::SendError>), + #[error("can not handle store addresses")] + CantSendAddr(StoreAddr), + #[error("RX handle has disconnected")] + RxDisconnected, } - #[derive(Debug, Clone)] + /// This class can be used if frequent heap allocations during run-time are not an issue. + /// PUS TM packets will be sent around as [Vec]s. Please note that the current implementation + /// of this class can not deal with store addresses, so it is assumed that is is always + /// going to be called with direct packets. + #[derive(Clone)] pub struct MpscTmAsVecSender { id: SenderId, sender: mpsc::Sender>, @@ -276,16 +303,21 @@ pub mod std_mod { } impl EcssTmSenderCore for MpscTmAsVecSender { - type Error = MpscAsVecSenderError; + type Error = MpscTmAsVecSenderError; - fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { - let mut vec = Vec::new(); - tm.append_to_vec(&mut vec) - .map_err(MpscAsVecSenderError::PusError)?; - self.sender - .send(vec) - .map_err(MpscAsVecSenderError::SendError)?; - Ok(()) + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error> { + match tm { + PusTmWrapper::InStore(addr) => Err(MpscTmAsVecSenderError::CantSendAddr(addr)), + PusTmWrapper::Direct(tm) => { + let mut vec = Vec::new(); + tm.append_to_vec(&mut vec) + .map_err(MpscTmAsVecSenderError::Pus)?; + self.sender + .send(vec) + .map_err(MpscTmAsVecSenderError::Send)?; + Ok(()) + } + } } } @@ -388,6 +420,7 @@ pub mod std_mod { let time_provider = TimeProvider::from_now_with_u16_days().map_err(PartialPusHandlingError::Time); if let Ok(time_provider) = time_provider { + // Can't fail, we have a buffer with the exact required size. time_provider.write_to_bytes(&mut time_stamp).unwrap(); } else { *partial_error = Some(time_provider.unwrap_err()); @@ -439,7 +472,7 @@ pub mod std_mod { pub(crate) fn source_buffer_large_enough(cap: usize, len: usize) -> Result<(), EcssTmtcError> { if len > cap { - return Err(EcssTmtcError::ByteConversionError( + return Err(EcssTmtcError::ByteConversion( ByteConversionError::ToSliceTooSmall(SizeMissmatch { found: cap, expected: len, diff --git a/satrs-core/src/pus/test.rs b/satrs-core/src/pus/test.rs index ef12ab4..0e596fa 100644 --- a/satrs-core/src/pus/test.rs +++ b/satrs-core/src/pus/test.rs @@ -116,9 +116,9 @@ mod tests { use crate::pool::{LocalPool, PoolCfg, SharedPool}; use crate::pus::test::PusService17TestHandler; use crate::pus::verification::{ - MpscVerifSender, RequestId, StdVerifReporterWithSender, VerificationReporterCfg, + RequestId, StdVerifReporterWithSender, VerificationReporterCfg, }; - use crate::pus::PusServiceHandler; + use crate::pus::{MpscTmInStoreSender, PusServiceHandler}; use crate::tmtc::tm_helper::SharedTmStore; use spacepackets::ecss::{PusPacket, SerializablePusPacket}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; @@ -141,7 +141,7 @@ mod tests { let shared_tm_store = SharedTmStore::new(tm_pool_shared.clone()); let (test_srv_tx, test_srv_rx) = mpsc::channel(); let (tm_tx, tm_rx) = mpsc::channel(); - let verif_sender = MpscVerifSender::new( + let verif_sender = MpscTmInStoreSender::new( 0, "verif_sender", shared_tm_store.backing_pool(), diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index 7d6b136..445f10b 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -16,8 +16,9 @@ //! use std::sync::{Arc, mpsc, RwLock}; //! use std::time::Duration; //! use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; -//! use satrs_core::pus::verification::{MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender}; +//! use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; //! use satrs_core::seq_count::SeqCountProviderSimple; +//! use satrs_core::pus::MpscTmInStoreSender; //! use spacepackets::ecss::PusPacket; //! use spacepackets::SpHeader; //! use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; @@ -29,7 +30,7 @@ //! let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); //! let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); //! let (verif_tx, verif_rx) = mpsc::channel(); -//! let sender = MpscVerifSender::new(0, "Test Sender", shared_tm_pool.clone(), verif_tx); +//! let sender = MpscTmInStoreSender::new(0, "Test Sender", shared_tm_pool.clone(), verif_tx); //! let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); //! let mut reporter = VerificationReporterWithSender::new(&cfg , Box::new(sender)); //! @@ -96,11 +97,8 @@ pub use spacepackets::ecss::verification::*; pub use alloc_mod::{ VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender, }; - -#[cfg(all(feature = "crossbeam", feature = "std"))] -pub use stdmod::CrossbeamVerifSender; #[cfg(feature = "std")] -pub use stdmod::{MpscVerifSender, SharedStdVerifReporterWithSender, StdVerifReporterWithSender}; +pub use std_mod::*; /// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard. /// @@ -523,7 +521,7 @@ impl VerificationReporterCore { ) -> Result, VerificationOrSendErrorWithToken> { sender - .send_tm(sendable.pus_tm.take().unwrap()) + .send_tm(sendable.pus_tm.take().unwrap().into()) .map_err(|e| { VerificationOrSendErrorWithToken( EcssTmtcErrorWithSend::SendError(e), @@ -539,7 +537,7 @@ impl VerificationReporterCore { sender: &mut (impl EcssTmSenderCore + ?Sized), ) -> Result<(), VerificationOrSendErrorWithToken> { sender - .send_tm(sendable.pus_tm.take().unwrap()) + .send_tm(sendable.pus_tm.take().unwrap().into()) .map_err(|e| { VerificationOrSendErrorWithToken( EcssTmtcErrorWithSend::SendError(e), @@ -606,7 +604,7 @@ impl VerificationReporterCore { VerificationOrSendErrorWithToken, > { sender - .send_tm(sendable.pus_tm.take().unwrap()) + .send_tm(sendable.pus_tm.take().unwrap().into()) .map_err(|e| { VerificationOrSendErrorWithToken( EcssTmtcErrorWithSend::SendError(e), @@ -648,7 +646,7 @@ impl VerificationReporterCore { sender: &mut (impl EcssTmSenderCore + ?Sized), ) -> Result<(), VerificationOrSendErrorWithToken> { sender - .send_tm(sendable.pus_tm.take().unwrap()) + .send_tm(sendable.pus_tm.take().unwrap().into()) .map_err(|e| { VerificationOrSendErrorWithToken( EcssTmtcErrorWithSend::SendError(e), @@ -771,7 +769,7 @@ impl VerificationReporterCore { sender: &mut (impl EcssTmSenderCore + ?Sized), ) -> Result<(), VerificationOrSendErrorWithToken> { sender - .send_tm(sendable.pus_tm.take().unwrap()) + .send_tm(sendable.pus_tm.take().unwrap().into()) .map_err(|e| { VerificationOrSendErrorWithToken( EcssTmtcErrorWithSend::SendError(e), @@ -788,7 +786,7 @@ impl VerificationReporterCore { sender: &mut (impl EcssTmSenderCore + ?Sized), ) -> Result<(), VerificationOrSendErrorWithToken> { sender - .send_tm(sendable.pus_tm.take().unwrap()) + .send_tm(sendable.pus_tm.take().unwrap().into()) .map_err(|e| { VerificationOrSendErrorWithToken( EcssTmtcErrorWithSend::SendError(e), @@ -1329,180 +1327,197 @@ mod alloc_mod { } #[cfg(feature = "std")] -mod stdmod { - use super::alloc_mod::VerificationReporterWithSender; - use super::*; - use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr}; - use crate::pus::{EcssSender, MpscPusInStoreSendError}; - use crate::SenderId; - use delegate::delegate; - use spacepackets::ecss::SerializablePusPacket; - use spacepackets::tm::PusTm; - use std::sync::{mpsc, Arc, Mutex, RwLockWriteGuard}; +mod std_mod { + use crate::pus::verification::VerificationReporterWithSender; + use crate::pus::MpscTmInStoreSenderError; + use std::sync::{Arc, Mutex}; - pub type StdVerifReporterWithSender = VerificationReporterWithSender; + // use super::alloc_mod::VerificationReporterWithSender; + // use super::*; + // use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr}; + // use crate::pus::{EcssSender, MpscPusInStoreSendError, PusTmWrapper}; + // use crate::SenderId; + // use delegate::delegate; + // use spacepackets::ecss::SerializablePusPacket; + // use std::sync::{mpsc, Arc, Mutex, RwLockWriteGuard}; + // + pub type StdVerifReporterWithSender = VerificationReporterWithSender; pub type SharedStdVerifReporterWithSender = Arc>; - - trait SendBackend: Send { - fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr>; - } - - #[derive(Clone)] - struct StdSenderBase { - id: SenderId, - name: &'static str, - tm_store: SharedPool, - tx: S, - pub ignore_poison_error: bool, - } - - impl StdSenderBase { - pub fn new(id: SenderId, name: &'static str, tm_store: SharedPool, tx: S) -> Self { - Self { - id, - name, - tm_store, - tx, - ignore_poison_error: false, - } - } - } - - unsafe impl Sync for StdSenderBase {} - unsafe impl Send for StdSenderBase {} - - impl SendBackend for mpsc::Sender { - fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr> { - self.send(addr).map_err(|_| addr) - } - } - - #[derive(Clone)] - pub struct MpscVerifSender { - base: StdSenderBase>, - } - - /// Verification sender with a [mpsc::Sender] backend. - /// It implements the [EcssTmSenderCore] trait to be used as PUS Verification TM sender. - impl MpscVerifSender { - pub fn new( - id: SenderId, - name: &'static str, - tm_store: SharedPool, - tx: mpsc::Sender, - ) -> Self { - Self { - base: StdSenderBase::new(id, name, tm_store, tx), - } - } - } - - //noinspection RsTraitImplementation - impl EcssSender for MpscVerifSender { - delegate!( - to self.base { - fn id(&self) -> SenderId; - fn name(&self) -> &'static str; - } - ); - } - - //noinspection RsTraitImplementation - impl EcssTmSenderCore for MpscVerifSender { - type Error = MpscPusInStoreSendError; - - delegate!( - to self.base { - fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>; - } - ); - } - - impl SendBackend for crossbeam_channel::Sender { - fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr> { - self.send(addr).map_err(|_| addr) - } - } - - /// Verification sender with a [crossbeam_channel::Sender] backend. - /// It implements the [EcssTmSenderCore] trait to be used as PUS Verification TM sender - #[cfg(feature = "crossbeam")] - #[derive(Clone)] - pub struct CrossbeamVerifSender { - base: StdSenderBase>, - } - - #[cfg(feature = "crossbeam")] - impl CrossbeamVerifSender { - pub fn new( - id: SenderId, - name: &'static str, - tm_store: SharedPool, - tx: crossbeam_channel::Sender, - ) -> Self { - Self { - base: StdSenderBase::new(id, name, tm_store, tx), - } - } - } - - //noinspection RsTraitImplementation - #[cfg(feature = "crossbeam")] - impl EcssSender for CrossbeamVerifSender { - delegate!( - to self.base { - fn id(&self) -> SenderId; - fn name(&self) -> &'static str; - } - ); - } - - //noinspection RsTraitImplementation - #[cfg(feature = "crossbeam")] - impl EcssTmSenderCore for CrossbeamVerifSender { - type Error = MpscPusInStoreSendError; - - delegate!( - to self.base { - fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>; - } - ); - } - - impl EcssSender for StdSenderBase { - fn id(&self) -> SenderId { - self.id - } - fn name(&self) -> &'static str { - self.name - } - } - impl EcssTmSenderCore for StdSenderBase { - type Error = MpscPusInStoreSendError; - - fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { - let operation = |mut mg: RwLockWriteGuard| { - let (addr, buf) = mg.free_element(tm.len_packed())?; - tm.write_to_bytes(buf) - .map_err(MpscPusInStoreSendError::PusError)?; - drop(mg); - self.tx - .send(addr) - .map_err(|_| MpscPusInStoreSendError::RxDisconnected(addr))?; - Ok(()) - }; - match self.tm_store.write() { - Ok(lock) => operation(lock), - Err(poison_error) => { - if self.ignore_poison_error { - operation(poison_error.into_inner()) - } else { - Err(MpscPusInStoreSendError::LockError) - } - } - } - } - } + // + // trait SendBackend: Send { + // type SendError: Debug; + // + // fn send(&self, addr: StoreAddr) -> Result<(), Self::SendError>; + // } + // + // #[derive(Clone)] + // struct StdSenderBase { + // id: SenderId, + // name: &'static str, + // tm_store: SharedPool, + // tx: S, + // pub ignore_poison_error: bool, + // } + // + // impl StdSenderBase { + // pub fn new(id: SenderId, name: &'static str, tm_store: SharedPool, tx: S) -> Self { + // Self { + // id, + // name, + // tm_store, + // tx, + // ignore_poison_error: false, + // } + // } + // } + // + // unsafe impl Sync for StdSenderBase {} + // unsafe impl Send for StdSenderBase {} + // + // impl SendBackend for mpsc::Sender { + // type SendError = mpsc::SendError; + // + // fn send(&self, addr: StoreAddr) -> Result<(), Self::SendError> { + // self.send(addr) + // } + // } + // + // #[derive(Clone)] + // pub struct MpscVerifSender { + // base: StdSenderBase>, + // } + // + // /// Verification sender with a [mpsc::Sender] backend. + // /// It implements the [EcssTmSenderCore] trait to be used as PUS Verification TM sender. + // impl MpscVerifSender { + // pub fn new( + // id: SenderId, + // name: &'static str, + // tm_store: SharedPool, + // tx: mpsc::Sender, + // ) -> Self { + // Self { + // base: StdSenderBase::new(id, name, tm_store, tx), + // } + // } + // } + // + // //noinspection RsTraitImplementation + // impl EcssSender for MpscVerifSender { + // delegate!( + // to self.base { + // fn id(&self) -> SenderId; + // fn name(&self) -> &'static str; + // } + // ); + // } + // + // //noinspection RsTraitImplementation + // impl EcssTmSenderCore for MpscVerifSender { + // type Error = MpscPusInStoreSendError; + // + // delegate!( + // to self.base { + // fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error>; + // } + // ); + // } + // + // impl SendBackend for crossbeam_channel::Sender { + // type SendError = crossbeam_channel::SendError; + // + // fn send(&self, addr: StoreAddr) -> Result<(), Self::SendError> { + // self.send(addr) + // } + // } + // + // /// Verification sender with a [crossbeam_channel::Sender] backend. + // /// It implements the [EcssTmSenderCore] trait to be used as PUS Verification TM sender + // #[cfg(feature = "crossbeam")] + // #[derive(Clone)] + // pub struct CrossbeamVerifSender { + // base: StdSenderBase>, + // } + // + // #[cfg(feature = "crossbeam")] + // impl CrossbeamVerifSender { + // pub fn new( + // id: SenderId, + // name: &'static str, + // tm_store: SharedPool, + // tx: crossbeam_channel::Sender, + // ) -> Self { + // Self { + // base: StdSenderBase::new(id, name, tm_store, tx), + // } + // } + // } + // + // //noinspection RsTraitImplementation + // #[cfg(feature = "crossbeam")] + // impl EcssSender for CrossbeamVerifSender { + // delegate!( + // to self.base { + // fn id(&self) -> SenderId; + // fn name(&self) -> &'static str; + // } + // ); + // } + // + // //noinspection RsTraitImplementation + // #[cfg(feature = "crossbeam")] + // impl EcssTmSenderCore for CrossbeamVerifSender { + // type Error = MpscPusInStoreSendError; + // + // delegate!( + // to self.base { + // fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>; + // } + // ); + // } + // + // impl EcssSender for StdSenderBase { + // fn id(&self) -> SenderId { + // self.id + // } + // fn name(&self) -> &'static str { + // self.name + // } + // } + // impl EcssTmSenderCore for StdSenderBase { + // type Error = MpscPusInStoreSendError; + // + // fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error> { + // match tm { + // PusTmWrapper::InStore(addr) => { + // self.tx.send(addr).unwrap(); + // Ok(()) + // } + // PusTmWrapper::Direct(tm) => { + // let operation = |mut mg: RwLockWriteGuard| { + // let (addr, buf) = mg.free_element(tm.len_packed())?; + // tm.write_to_bytes(buf) + // .map_err(MpscPusInStoreSendError::Pus)?; + // drop(mg); + // self.tx + // .send(addr) + // .map_err(|_| MpscPusInStoreSendError::RxDisconnected(addr))?; + // Ok(()) + // }; + // match self.tm_store.write() { + // Ok(lock) => operation(lock), + // Err(poison_error) => { + // if self.ignore_poison_error { + // operation(poison_error.into_inner()) + // } else { + // Err(MpscPusInStoreSendError::StoreLock) + // } + // } + // } + // } + // } + // } + // } } #[cfg(test)] @@ -1510,11 +1525,11 @@ mod tests { use crate::pool::{LocalPool, PoolCfg, SharedPool}; use crate::pus::tests::CommonTmInfo; use crate::pus::verification::{ - EcssTmSenderCore, EcssTmtcError, FailParams, FailParamsWithStep, MpscVerifSender, - RequestId, TcStateNone, VerificationReporter, VerificationReporterCfg, - VerificationReporterWithSender, VerificationToken, + EcssTmSenderCore, EcssTmtcError, FailParams, FailParamsWithStep, RequestId, TcStateNone, + VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender, + VerificationToken, }; - use crate::pus::{EcssSender, EcssTmtcErrorWithSend}; + use crate::pus::{EcssSender, EcssTmtcErrorWithSend, MpscTmInStoreSender, PusTmWrapper}; use crate::SenderId; use alloc::boxed::Box; use alloc::format; @@ -1523,6 +1538,7 @@ mod tests { use spacepackets::tm::PusTm; use spacepackets::util::UnsignedEnum; use spacepackets::{ByteConversionError, CcsdsPacket, SpHeader}; + use std::cell::RefCell; use std::collections::VecDeque; use std::sync::{mpsc, Arc, RwLock}; use std::time::Duration; @@ -1545,7 +1561,7 @@ mod tests { #[derive(Default, Clone)] struct TestSender { - pub service_queue: VecDeque, + pub service_queue: RefCell>, } impl EcssSender for TestSender { @@ -1560,26 +1576,34 @@ mod tests { impl EcssTmSenderCore for TestSender { type Error = (); - fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { - assert_eq!(PusPacket::service(&tm), 1); - assert!(tm.source_data().is_some()); - let mut time_stamp = [0; 7]; - time_stamp.clone_from_slice(&tm.timestamp().unwrap()[0..7]); - let src_data = tm.source_data().unwrap(); - assert!(src_data.len() >= 4); - let req_id = RequestId::from_bytes(&src_data[0..RequestId::SIZE_AS_BYTES]).unwrap(); - let mut vec = None; - if src_data.len() > 4 { - let mut new_vec = Vec::new(); - new_vec.extend_from_slice(&src_data[RequestId::SIZE_AS_BYTES..]); - vec = Some(new_vec); + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error> { + match tm { + PusTmWrapper::InStore(_) => { + panic!("TestSender: Can not deal with addresses"); + } + PusTmWrapper::Direct(tm) => { + assert_eq!(PusPacket::service(&tm), 1); + assert!(tm.source_data().is_some()); + let mut time_stamp = [0; 7]; + time_stamp.clone_from_slice(&tm.timestamp().unwrap()[0..7]); + let src_data = tm.source_data().unwrap(); + assert!(src_data.len() >= 4); + let req_id = + RequestId::from_bytes(&src_data[0..RequestId::SIZE_AS_BYTES]).unwrap(); + let mut vec = None; + if src_data.len() > 4 { + let mut new_vec = Vec::new(); + new_vec.extend_from_slice(&src_data[RequestId::SIZE_AS_BYTES..]); + vec = Some(new_vec); + } + self.service_queue.borrow_mut().push_back(TmInfo { + common: CommonTmInfo::new_from_tm(&tm), + req_id, + additional_data: vec, + }); + Ok(()) + } } - self.service_queue.push_back(TmInfo { - common: CommonTmInfo::new_from_tm(&tm), - req_id, - additional_data: vec, - }); - Ok(()) } } @@ -1595,7 +1619,7 @@ mod tests { } impl EcssTmSenderCore for FallibleSender { type Error = DummyError; - fn send_tm(&mut self, _: PusTm) -> Result<(), Self::Error> { + fn send_tm(&self, _: PusTmWrapper) -> Result<(), Self::Error> { Err(DummyError {}) } } @@ -1672,8 +1696,9 @@ mod tests { additional_data: None, req_id: req_id.clone(), }; - assert_eq!(sender.service_queue.len(), 1); - let info = sender.service_queue.pop_front().unwrap(); + let mut service_queue = sender.service_queue.borrow_mut(); + assert_eq!(service_queue.len(), 1); + let info = service_queue.pop_front().unwrap(); assert_eq!(info, cmp_info); } @@ -1682,7 +1707,7 @@ mod tests { let pool = LocalPool::new(PoolCfg::new(vec![(8, 8)])); let shared_pool: SharedPool = Arc::new(RwLock::new(Box::new(pool))); let (tx, _) = mpsc::channel(); - let mpsc_verif_sender = MpscVerifSender::new(0, "verif_sender", shared_pool, tx); + let mpsc_verif_sender = MpscTmInStoreSender::new(0, "verif_sender", shared_pool, tx); is_send(&mpsc_verif_sender); } @@ -1742,8 +1767,9 @@ mod tests { additional_data: Some([0, 2].to_vec()), req_id, }; - assert_eq!(sender.service_queue.len(), 1); - let info = sender.service_queue.pop_front().unwrap(); + let mut service_queue = sender.service_queue.borrow_mut(); + assert_eq!(service_queue.len(), 1); + let info = service_queue.pop_front().unwrap(); assert_eq!(info, cmp_info); } @@ -1793,20 +1819,18 @@ mod tests { let err_with_token = res.unwrap_err(); assert_eq!(err_with_token.1, tok); match err_with_token.0 { - EcssTmtcErrorWithSend::EcssTmtcError(EcssTmtcError::ByteConversionError(e)) => { - match e { - ByteConversionError::ToSliceTooSmall(missmatch) => { - assert_eq!( - missmatch.expected, - fail_data.len() + RequestId::SIZE_AS_BYTES + fail_code.size() - ); - assert_eq!(missmatch.found, b.rep().allowed_source_data_len()); - } - _ => { - panic!("{}", format!("Unexpected error {:?}", e)) - } + EcssTmtcErrorWithSend::EcssTmtcError(EcssTmtcError::ByteConversion(e)) => match e { + ByteConversionError::ToSliceTooSmall(missmatch) => { + assert_eq!( + missmatch.expected, + fail_data.len() + RequestId::SIZE_AS_BYTES + fail_code.size() + ); + assert_eq!(missmatch.found, b.rep().allowed_source_data_len()); } - } + _ => { + panic!("{}", format!("Unexpected error {:?}", e)) + } + }, _ => { panic!("{}", format!("Unexpected error {:?}", err_with_token.0)) } @@ -1839,13 +1863,15 @@ mod tests { additional_data: Some([10, 0, 0, 0, 12].to_vec()), req_id: tok.req_id, }; - assert_eq!(sender.service_queue.len(), 1); - let info = sender.service_queue.pop_front().unwrap(); + let mut service_queue = sender.service_queue.borrow_mut(); + assert_eq!(service_queue.len(), 1); + let info = service_queue.pop_front().unwrap(); assert_eq!(info, cmp_info); } fn start_fail_check(sender: &mut TestSender, req_id: RequestId, fail_data_raw: [u8; 4]) { - assert_eq!(sender.service_queue.len(), 2); + let mut srv_queue = sender.service_queue.borrow_mut(); + assert_eq!(srv_queue.len(), 2); let mut cmp_info = TmInfo { common: CommonTmInfo { subservice: 1, @@ -1857,7 +1883,7 @@ mod tests { additional_data: None, req_id, }; - let mut info = sender.service_queue.pop_front().unwrap(); + let mut info = srv_queue.pop_front().unwrap(); assert_eq!(info, cmp_info); cmp_info = TmInfo { @@ -1871,7 +1897,7 @@ mod tests { additional_data: Some([&[22], fail_data_raw.as_slice()].concat().to_vec()), req_id, }; - info = sender.service_queue.pop_front().unwrap(); + info = srv_queue.pop_front().unwrap(); assert_eq!(info, cmp_info); } @@ -1937,7 +1963,8 @@ mod tests { additional_data: None, req_id, }; - let mut info = sender.service_queue.pop_front().unwrap(); + let mut srv_queue = sender.service_queue.borrow_mut(); + let mut info = srv_queue.pop_front().unwrap(); assert_eq!(info, cmp_info); cmp_info = TmInfo { common: CommonTmInfo { @@ -1950,7 +1977,7 @@ mod tests { additional_data: None, req_id, }; - info = sender.service_queue.pop_front().unwrap(); + info = srv_queue.pop_front().unwrap(); assert_eq!(info, cmp_info); cmp_info = TmInfo { common: CommonTmInfo { @@ -1963,7 +1990,7 @@ mod tests { additional_data: Some([0].to_vec()), req_id, }; - info = sender.service_queue.pop_front().unwrap(); + info = srv_queue.pop_front().unwrap(); assert_eq!(info, cmp_info); cmp_info = TmInfo { common: CommonTmInfo { @@ -1976,7 +2003,7 @@ mod tests { additional_data: Some([1].to_vec()), req_id, }; - info = sender.service_queue.pop_front().unwrap(); + info = srv_queue.pop_front().unwrap(); assert_eq!(info, cmp_info); } @@ -2011,7 +2038,7 @@ mod tests { ) .expect("Sending step 1 success failed"); assert_eq!(empty, ()); - assert_eq!(sender.service_queue.len(), 4); + assert_eq!(sender.service_queue.borrow().len(), 4); step_success_check(&mut sender, tok.req_id); } @@ -2037,12 +2064,12 @@ mod tests { .expect("Sending step 1 success failed"); assert_eq!(empty, ()); let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); - assert_eq!(sender.service_queue.len(), 4); + assert_eq!(sender.service_queue.borrow().len(), 4); step_success_check(sender, tok.req_id); } fn check_step_failure(sender: &mut TestSender, req_id: RequestId, fail_data_raw: [u8; 4]) { - assert_eq!(sender.service_queue.len(), 4); + assert_eq!(sender.service_queue.borrow().len(), 4); let mut cmp_info = TmInfo { common: CommonTmInfo { subservice: 1, @@ -2054,7 +2081,7 @@ mod tests { additional_data: None, req_id, }; - let mut info = sender.service_queue.pop_front().unwrap(); + let mut info = sender.service_queue.borrow_mut().pop_front().unwrap(); assert_eq!(info, cmp_info); cmp_info = TmInfo { @@ -2068,7 +2095,7 @@ mod tests { additional_data: None, req_id, }; - info = sender.service_queue.pop_front().unwrap(); + info = sender.service_queue.borrow_mut().pop_front().unwrap(); assert_eq!(info, cmp_info); cmp_info = TmInfo { @@ -2082,7 +2109,7 @@ mod tests { additional_data: Some([0].to_vec()), req_id, }; - info = sender.service_queue.pop_front().unwrap(); + info = sender.service_queue.get_mut().pop_front().unwrap(); assert_eq!(info, cmp_info); cmp_info = TmInfo { @@ -2104,7 +2131,7 @@ mod tests { ), req_id, }; - info = sender.service_queue.pop_front().unwrap(); + info = sender.service_queue.get_mut().pop_front().unwrap(); assert_eq!(info, cmp_info); } @@ -2186,7 +2213,7 @@ mod tests { } fn completion_fail_check(sender: &mut TestSender, req_id: RequestId) { - assert_eq!(sender.service_queue.len(), 3); + assert_eq!(sender.service_queue.borrow().len(), 3); let mut cmp_info = TmInfo { common: CommonTmInfo { @@ -2199,7 +2226,7 @@ mod tests { additional_data: None, req_id, }; - let mut info = sender.service_queue.pop_front().unwrap(); + let mut info = sender.service_queue.get_mut().pop_front().unwrap(); assert_eq!(info, cmp_info); cmp_info = TmInfo { @@ -2213,7 +2240,7 @@ mod tests { additional_data: None, req_id, }; - info = sender.service_queue.pop_front().unwrap(); + info = sender.service_queue.get_mut().pop_front().unwrap(); assert_eq!(info, cmp_info); cmp_info = TmInfo { @@ -2227,7 +2254,7 @@ mod tests { additional_data: Some([0, 0, 0x10, 0x20].to_vec()), req_id, }; - info = sender.service_queue.pop_front().unwrap(); + info = sender.service_queue.get_mut().pop_front().unwrap(); assert_eq!(info, cmp_info); } @@ -2277,7 +2304,7 @@ mod tests { } fn completion_success_check(sender: &mut TestSender, req_id: RequestId) { - assert_eq!(sender.service_queue.len(), 3); + assert_eq!(sender.service_queue.borrow().len(), 3); let cmp_info = TmInfo { common: CommonTmInfo { subservice: 1, @@ -2289,7 +2316,7 @@ mod tests { additional_data: None, req_id, }; - let mut info = sender.service_queue.pop_front().unwrap(); + let mut info = sender.service_queue.borrow_mut().pop_front().unwrap(); assert_eq!(info, cmp_info); let cmp_info = TmInfo { @@ -2303,7 +2330,7 @@ mod tests { additional_data: None, req_id, }; - info = sender.service_queue.pop_front().unwrap(); + info = sender.service_queue.borrow_mut().pop_front().unwrap(); assert_eq!(info, cmp_info); let cmp_info = TmInfo { common: CommonTmInfo { @@ -2316,7 +2343,7 @@ mod tests { additional_data: None, req_id, }; - info = sender.service_queue.pop_front().unwrap(); + info = sender.service_queue.borrow_mut().pop_front().unwrap(); assert_eq!(info, cmp_info); } @@ -2365,7 +2392,7 @@ mod tests { Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); let (verif_tx, verif_rx) = mpsc::channel(); let sender = - MpscVerifSender::new(0, "Verification Sender", shared_tm_pool.clone(), verif_tx); + MpscTmInStoreSender::new(0, "Verification Sender", shared_tm_pool.clone(), verif_tx); let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let mut reporter = VerificationReporterWithSender::new(&cfg, Box::new(sender)); diff --git a/satrs-core/tests/pus_verification.rs b/satrs-core/tests/pus_verification.rs index cd68e98..313bc84 100644 --- a/satrs-core/tests/pus_verification.rs +++ b/satrs-core/tests/pus_verification.rs @@ -1,17 +1,18 @@ // TODO: Refactor this to also test the STD impl using mpsc +// TODO: Change back to cross-beam as soon as STD impl was added back for TM. #[cfg(feature = "crossbeam")] pub mod crossbeam_test { use hashbrown::HashMap; use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; use satrs_core::pus::verification::{ - CrossbeamVerifSender, FailParams, RequestId, VerificationReporterCfg, - VerificationReporterWithSender, + FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender, }; + use satrs_core::pus::MpscTmInStoreSender; use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, SerializablePusPacket}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tm::PusTm; use spacepackets::SpHeader; - use std::sync::{Arc, RwLock}; + use std::sync::{mpsc, Arc, RwLock}; use std::thread; use std::time::Duration; @@ -38,9 +39,9 @@ pub mod crossbeam_test { Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); let shared_tc_pool_1 = shared_tc_pool_0.clone(); - let (tx, rx) = crossbeam_channel::bounded(5); + let (tx, rx) = mpsc::channel(); let sender = - CrossbeamVerifSender::new(0, "verif_sender", shared_tm_pool.clone(), tx.clone()); + MpscTmInStoreSender::new(0, "verif_sender", shared_tm_pool.clone(), tx.clone()); let mut reporter_with_sender_0 = VerificationReporterWithSender::new(&cfg, Box::new(sender)); let mut reporter_with_sender_1 = reporter_with_sender_0.clone(); diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 743a148..85346c3 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -32,10 +32,8 @@ use satrs_core::pus::hk::Subservice as HkSubservice; use satrs_core::pus::scheduler::PusScheduler; use satrs_core::pus::scheduler_srv::PusService11SchedHandler; use satrs_core::pus::test::PusService17TestHandler; -use satrs_core::pus::verification::{ - MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender, -}; -use satrs_core::pus::MpscTmtcInStoreSender; +use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; +use satrs_core::pus::MpscTmInStoreSender; use satrs_core::seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}; use satrs_core::spacepackets::tm::PusTmZeroCopyWriter; use satrs_core::spacepackets::{ @@ -85,7 +83,7 @@ fn main() { let (tc_source_tx, tc_source_rx) = channel(); let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel(); - let verif_sender = MpscVerifSender::new( + let verif_sender = MpscTmInStoreSender::new( 0, "verif_sender", tm_store.backing_pool(), @@ -271,7 +269,7 @@ fn main() { .name("Event".to_string()) .spawn(move || { let mut timestamp: [u8; 7] = [0; 7]; - let mut sender = MpscTmtcInStoreSender::new( + let mut sender = MpscTmInStoreSender::new( 1, "event_sender", tm_store_event.backing_pool(),