diff --git a/satrs-core/src/pus/event.rs b/satrs-core/src/pus/event.rs index 5e4c58b..5bbc63b 100644 --- a/satrs-core/src/pus/event.rs +++ b/satrs-core/src/pus/event.rs @@ -243,6 +243,7 @@ mod tests { use super::*; use crate::events::{EventU32, Severity}; use crate::pus::tests::CommonTmInfo; + use crate::SenderId; use spacepackets::ByteConversionError; use std::collections::VecDeque; use std::vec::Vec; @@ -268,6 +269,9 @@ mod tests { impl EcssTmSenderCore for TestSender { type Error = (); + fn id(&self) -> SenderId { + 0 + } fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { assert!(tm.source_data().is_some()); let src_data = tm.source_data().unwrap(); diff --git a/satrs-core/src/pus/event_man.rs b/satrs-core/src/pus/event_man.rs index a463813..ecb4b59 100644 --- a/satrs-core/src/pus/event_man.rs +++ b/satrs-core/src/pus/event_man.rs @@ -256,7 +256,7 @@ mod tests { fn test_basic() { let mut event_man = create_basic_man(); let (event_tx, event_rx) = channel(); - let mut sender = MpscTmAsVecSender::new(event_tx); + let mut sender = MpscTmAsVecSender::new(0, "test_sender", event_tx); let event_sent = event_man .generate_pus_event_tm(&mut sender, &EMPTY_STAMP, INFO_EVENT, None) .expect("Sending info event failed"); @@ -270,7 +270,7 @@ mod tests { fn test_disable_event() { let mut event_man = create_basic_man(); let (event_tx, event_rx) = channel(); - let mut sender = MpscTmAsVecSender::new(event_tx); + let mut sender = MpscTmAsVecSender::new(0, "test", event_tx); let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT); assert!(res.is_ok()); assert!(res.unwrap()); @@ -293,7 +293,7 @@ mod tests { fn test_reenable_event() { let mut event_man = create_basic_man(); let (event_tx, event_rx) = channel(); - let mut sender = MpscTmAsVecSender::new(event_tx); + let mut sender = MpscTmAsVecSender::new(0, "test", event_tx); let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT); assert!(res.is_ok()); assert!(res.unwrap()); diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index a558b41..4543fb4 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -19,6 +19,7 @@ pub mod verification; #[cfg(feature = "alloc")] pub use alloc_mod::*; +use crate::SenderId; #[cfg(feature = "std")] pub use std_mod::*; @@ -64,7 +65,12 @@ impl From for EcssTmError { pub trait EcssTmSenderCore: Send { type Error; + /// Each sender can have an ID associated with it + fn id(&self) -> SenderId; fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>; + fn name(&self) -> &'static str { + "unset" + } } #[cfg(feature = "alloc")] @@ -96,6 +102,7 @@ mod alloc_mod { pub mod std_mod { use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError}; use crate::pus::EcssTmSenderCore; + use crate::SenderId; use alloc::vec::Vec; use spacepackets::ecss::PusError; use spacepackets::tm::PusTm; @@ -129,6 +136,8 @@ pub mod std_mod { #[derive(Clone)] pub struct MpscTmInStoreSender { + id: SenderId, + name: &'static str, store_helper: SharedPool, sender: mpsc::Sender, pub ignore_poison_errors: bool, @@ -137,6 +146,10 @@ pub mod std_mod { impl EcssTmSenderCore for MpscTmInStoreSender { type Error = MpscPusInStoreSendError; + fn id(&self) -> SenderId { + self.id + } + fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { let operation = |mut store: RwLockWriteGuard| { let (addr, slice) = store.free_element(tm.len_packed())?; @@ -155,11 +168,22 @@ pub mod std_mod { } } } + + fn name(&self) -> &'static str { + self.name + } } impl MpscTmInStoreSender { - pub fn new(store_helper: SharedPool, sender: mpsc::Sender) -> Self { + pub fn new( + id: SenderId, + name: &'static str, + store_helper: SharedPool, + sender: mpsc::Sender, + ) -> Self { Self { + id, + name, store_helper, sender, ignore_poison_errors: false, @@ -175,16 +199,22 @@ pub mod std_mod { #[derive(Debug, Clone)] pub struct MpscTmAsVecSender { + id: SenderId, sender: mpsc::Sender>, + name: &'static str, } impl MpscTmAsVecSender { - pub fn new(sender: mpsc::Sender>) -> Self { - Self { sender } + pub fn new(id: u32, name: &'static str, sender: mpsc::Sender>) -> Self { + Self { id, sender, name } } } impl EcssTmSenderCore for MpscTmAsVecSender { type Error = MpscAsVecSenderError; + fn id(&self) -> SenderId { + self.id + } + fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { let mut vec = Vec::new(); tm.append_to_vec(&mut vec) @@ -194,6 +224,10 @@ pub mod std_mod { .map_err(MpscAsVecSenderError::SendError)?; Ok(()) } + + fn name(&self) -> &'static str { + self.name + } } } diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index 75ca496..613b3ab 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -29,7 +29,7 @@ //! let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); //! let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); //! let (verif_tx, verif_rx) = mpsc::channel(); -//! let sender = MpscVerifSender::new(shared_tm_pool.clone(), verif_tx); +//! let sender = MpscVerifSender::new(0, "verif_tm", shared_tm_pool.clone(), verif_tx); //! let cfg = VerificationReporterCfg::new(TEST_APID, Box::new(SeqCountProviderSimple::default()), 1, 2, 8).unwrap(); //! let mut reporter = VerificationReporterWithSender::new(&cfg , Box::new(sender)); //! @@ -1252,6 +1252,7 @@ mod stdmod { use super::*; use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr}; use crate::pus::MpscPusInStoreSendError; + use crate::SenderId; use delegate::delegate; use spacepackets::tm::PusTm; use std::sync::{mpsc, Arc, Mutex, RwLockWriteGuard}; @@ -1265,17 +1266,21 @@ mod stdmod { #[derive(Clone)] struct StdSenderBase { - pub ignore_poison_error: bool, + id: SenderId, + name: &'static str, tm_store: SharedPool, tx: S, + pub ignore_poison_error: bool, } impl StdSenderBase { - pub fn new(tm_store: SharedPool, tx: S) -> Self { + pub fn new(id: SenderId, name: &'static str, tm_store: SharedPool, tx: S) -> Self { Self { - ignore_poison_error: false, + id, + name, tm_store, tx, + ignore_poison_error: false, } } } @@ -1297,9 +1302,14 @@ mod stdmod { /// Verification sender with a [mpsc::Sender] backend. /// It implements the [EcssTmSenderCore] trait to be used as PUS Verification TM sender. impl MpscVerifSender { - pub fn new(tm_store: SharedPool, tx: mpsc::Sender) -> Self { + pub fn new( + id: SenderId, + name: &'static str, + tm_store: SharedPool, + tx: mpsc::Sender, + ) -> Self { Self { - base: StdSenderBase::new(tm_store, tx), + base: StdSenderBase::new(id, name, tm_store, tx), } } } @@ -1310,6 +1320,8 @@ mod stdmod { delegate!( to self.base { + fn id(&self) -> SenderId; + fn name(&self) -> &'static str; fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>; } ); @@ -1331,9 +1343,14 @@ mod stdmod { #[cfg(feature = "crossbeam")] impl CrossbeamVerifSender { - pub fn new(tm_store: SharedPool, tx: crossbeam_channel::Sender) -> Self { + pub fn new( + id: SenderId, + name: &'static str, + tm_store: SharedPool, + tx: crossbeam_channel::Sender, + ) -> Self { Self { - base: StdSenderBase::new(tm_store, tx), + base: StdSenderBase::new(id, name, tm_store, tx), } } } @@ -1345,6 +1362,8 @@ mod stdmod { delegate!( to self.base { + fn id(&self) -> SenderId; + fn name(&self) -> &'static str; fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>; } ); @@ -1352,6 +1371,11 @@ mod stdmod { impl EcssTmSenderCore for StdSenderBase { type Error = MpscPusInStoreSendError; + + fn id(&self) -> SenderId { + self.id + } + fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { let operation = |mut mg: RwLockWriteGuard| { let (addr, buf) = mg.free_element(tm.len_packed())?; @@ -1374,6 +1398,10 @@ mod stdmod { } } } + + fn name(&self) -> &'static str { + self.name + } } } @@ -1403,6 +1431,7 @@ mod tests { }; use crate::pus::EcssTmErrorWithSend; use crate::seq_count::SeqCountProviderSimple; + use crate::SenderId; use alloc::boxed::Box; use alloc::format; use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU8, EcssEnumeration, PusPacket}; @@ -1435,6 +1464,11 @@ mod tests { impl EcssTmSenderCore for TestSender { type Error = (); + + fn id(&self) -> SenderId { + 0 + } + fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { assert_eq!(PusPacket::service(&tm), 1); assert!(tm.source_data().is_some()); @@ -1456,6 +1490,10 @@ mod tests { }); Ok(()) } + + fn name(&self) -> &'static str { + "test_sender" + } } #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -1465,6 +1503,9 @@ mod tests { impl EcssTmSenderCore for FallibleSender { type Error = DummyError; + fn id(&self) -> SenderId { + 0 + } fn send_tm(&mut self, _: PusTm) -> Result<(), Self::Error> { Err(DummyError {}) } @@ -1559,7 +1600,7 @@ mod tests { let pool = LocalPool::new(PoolCfg::new(vec![(8, 8)])); let shared_pool: SharedPool = Arc::new(RwLock::new(Box::new(pool))); let (tx, _) = mpsc::channel(); - let mpsc_verif_sender = MpscVerifSender::new(shared_pool, tx); + let mpsc_verif_sender = MpscVerifSender::new(0, "verif_sender", shared_pool, tx); is_send(&mpsc_verif_sender); } diff --git a/satrs-core/tests/pus_events.rs b/satrs-core/tests/pus_events.rs index 64eb48b..695e1a8 100644 --- a/satrs-core/tests/pus_events.rs +++ b/satrs-core/tests/pus_events.rs @@ -7,7 +7,7 @@ use satrs_core::params::{Params, ParamsHeapless, WritableToBeBytes}; use satrs_core::pus::event_man::{ DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher, }; -use satrs_core::pus::EcssTmSenderCore; +use satrs_core::pus::MpscTmAsVecSender; use spacepackets::ecss::{PusError, PusPacket}; use spacepackets::tm::PusTm; use std::sync::mpsc::{channel, SendError, TryRecvError}; @@ -24,23 +24,23 @@ pub enum CustomTmSenderError { PusError(PusError), } -#[derive(Clone)] -struct EventTmSender { - sender: std::sync::mpsc::Sender>, -} - -impl EcssTmSenderCore for EventTmSender { - type Error = CustomTmSenderError; - fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { - let mut vec = Vec::new(); - tm.append_to_vec(&mut vec) - .map_err(|e| CustomTmSenderError::PusError(e))?; - self.sender - .send(vec) - .map_err(CustomTmSenderError::SendError)?; - Ok(()) - } -} +// #[derive(Clone)] +// struct EventTmSender { +// sender: std::sync::mpsc::Sender>, +// } +// +// impl EcssTmSenderCore for EventTmSender { +// type Error = CustomTmSenderError; +// fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { +// let mut vec = Vec::new(); +// tm.append_to_vec(&mut vec) +// .map_err(|e| CustomTmSenderError::PusError(e))?; +// self.sender +// .send(vec) +// .map_err(CustomTmSenderError::SendError)?; +// Ok(()) +// } +// } #[test] fn test_threaded_usage() { @@ -58,7 +58,7 @@ fn test_threaded_usage() { let mut pus_event_man = PusEventDispatcher::new(reporter, Box::new(backend)); // PUS + Generic event manager thread let jh0 = thread::spawn(move || { - let mut sender = EventTmSender { sender: event_tx }; + let mut sender = MpscTmAsVecSender::new(0, "event_sender", event_tx); let mut event_cnt = 0; let mut params_array: [u8; 128] = [0; 128]; loop { diff --git a/satrs-core/tests/pus_verification.rs b/satrs-core/tests/pus_verification.rs index e5ffad0..dee2d7b 100644 --- a/satrs-core/tests/pus_verification.rs +++ b/satrs-core/tests/pus_verification.rs @@ -47,7 +47,8 @@ pub mod crossbeam_test { let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); let shared_tc_pool_1 = shared_tc_pool_0.clone(); let (tx, rx) = crossbeam_channel::bounded(5); - let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), tx.clone()); + let sender = + CrossbeamVerifSender::new(0, "verif_sender", shared_tm_pool.clone(), tx.clone()); let mut reporter_with_sender_0 = VerificationReporterWithSender::new(&cfg, Box::new(sender)); let mut reporter_with_sender_1 = reporter_with_sender_0.clone(); diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 244b085..cdc455c 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -73,7 +73,12 @@ fn main() { let (tc_source_tx, tc_source_rx) = channel(); let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel(); - let verif_sender = MpscVerifSender::new(tm_store.pool.clone(), tm_funnel_tx.clone()); + let verif_sender = MpscVerifSender::new( + 0, + "verif_sender", + tm_store.pool.clone(), + tm_funnel_tx.clone(), + ); let verif_cfg = VerificationReporterCfg::new( PUS_APID, #[allow(clippy::box_default)] @@ -173,7 +178,8 @@ fn main() { .name("Event".to_string()) .spawn(move || { let mut timestamp: [u8; 7] = [0; 7]; - let mut sender = MpscTmInStoreSender::new(tm_store.pool, tm_funnel_tx); + let mut sender = + MpscTmInStoreSender::new(1, "event_sender", tm_store.pool, tm_funnel_tx); let mut time_provider = TimeProvider::new_with_u16_days(0, 0); let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { reporter_event_handler