Merge pull request 'Sender abstraction has ID and name' (#39) from feature_sender_abstraction_has_id_and_name into main

Reviewed-on: #39
Reviewed-by: lkoester <st167799@stud.uni-stuttgart.de>
This commit is contained in:
Robin Müller 2023-02-28 19:12:45 +01:00
commit c00fa8df1b
7 changed files with 106 additions and 38 deletions

View File

@ -243,6 +243,7 @@ mod tests {
use super::*; use super::*;
use crate::events::{EventU32, Severity}; use crate::events::{EventU32, Severity};
use crate::pus::tests::CommonTmInfo; use crate::pus::tests::CommonTmInfo;
use crate::SenderId;
use spacepackets::ByteConversionError; use spacepackets::ByteConversionError;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::vec::Vec; use std::vec::Vec;
@ -268,6 +269,9 @@ mod tests {
impl EcssTmSenderCore for TestSender { impl EcssTmSenderCore for TestSender {
type Error = (); type Error = ();
fn id(&self) -> SenderId {
0
}
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
assert!(tm.source_data().is_some()); assert!(tm.source_data().is_some());
let src_data = tm.source_data().unwrap(); let src_data = tm.source_data().unwrap();

View File

@ -256,7 +256,7 @@ mod tests {
fn test_basic() { fn test_basic() {
let mut event_man = create_basic_man(); let mut event_man = create_basic_man();
let (event_tx, event_rx) = channel(); let (event_tx, event_rx) = channel();
let mut sender = MpscTmAsVecSender::new(event_tx); let mut sender = MpscTmAsVecSender::new(0, "test_sender", event_tx);
let event_sent = event_man let event_sent = event_man
.generate_pus_event_tm(&mut sender, &EMPTY_STAMP, INFO_EVENT, None) .generate_pus_event_tm(&mut sender, &EMPTY_STAMP, INFO_EVENT, None)
.expect("Sending info event failed"); .expect("Sending info event failed");
@ -270,7 +270,7 @@ mod tests {
fn test_disable_event() { fn test_disable_event() {
let mut event_man = create_basic_man(); let mut event_man = create_basic_man();
let (event_tx, event_rx) = channel(); let (event_tx, event_rx) = channel();
let mut sender = MpscTmAsVecSender::new(event_tx); let mut sender = MpscTmAsVecSender::new(0, "test", event_tx);
let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT); let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT);
assert!(res.is_ok()); assert!(res.is_ok());
assert!(res.unwrap()); assert!(res.unwrap());
@ -293,7 +293,7 @@ mod tests {
fn test_reenable_event() { fn test_reenable_event() {
let mut event_man = create_basic_man(); let mut event_man = create_basic_man();
let (event_tx, event_rx) = channel(); let (event_tx, event_rx) = channel();
let mut sender = MpscTmAsVecSender::new(event_tx); let mut sender = MpscTmAsVecSender::new(0, "test", event_tx);
let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT); let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT);
assert!(res.is_ok()); assert!(res.is_ok());
assert!(res.unwrap()); assert!(res.unwrap());

View File

@ -19,6 +19,7 @@ pub mod verification;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub use alloc_mod::*; pub use alloc_mod::*;
use crate::SenderId;
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub use std_mod::*; pub use std_mod::*;
@ -64,7 +65,12 @@ impl From<ByteConversionError> for EcssTmError {
pub trait EcssTmSenderCore: Send { pub trait EcssTmSenderCore: Send {
type Error; type Error;
/// Each sender can have an ID associated with it
fn id(&self) -> SenderId;
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>; fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>;
fn name(&self) -> &'static str {
"unset"
}
} }
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
@ -96,6 +102,7 @@ mod alloc_mod {
pub mod std_mod { pub mod std_mod {
use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError}; use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError};
use crate::pus::EcssTmSenderCore; use crate::pus::EcssTmSenderCore;
use crate::SenderId;
use alloc::vec::Vec; use alloc::vec::Vec;
use spacepackets::ecss::PusError; use spacepackets::ecss::PusError;
use spacepackets::tm::PusTm; use spacepackets::tm::PusTm;
@ -129,6 +136,8 @@ pub mod std_mod {
#[derive(Clone)] #[derive(Clone)]
pub struct MpscTmInStoreSender { pub struct MpscTmInStoreSender {
id: SenderId,
name: &'static str,
store_helper: SharedPool, store_helper: SharedPool,
sender: mpsc::Sender<StoreAddr>, sender: mpsc::Sender<StoreAddr>,
pub ignore_poison_errors: bool, pub ignore_poison_errors: bool,
@ -137,6 +146,10 @@ pub mod std_mod {
impl EcssTmSenderCore for MpscTmInStoreSender { impl EcssTmSenderCore for MpscTmInStoreSender {
type Error = MpscPusInStoreSendError; type Error = MpscPusInStoreSendError;
fn id(&self) -> SenderId {
self.id
}
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
let operation = |mut store: RwLockWriteGuard<ShareablePoolProvider>| { let operation = |mut store: RwLockWriteGuard<ShareablePoolProvider>| {
let (addr, slice) = store.free_element(tm.len_packed())?; let (addr, slice) = store.free_element(tm.len_packed())?;
@ -155,11 +168,22 @@ pub mod std_mod {
} }
} }
} }
fn name(&self) -> &'static str {
self.name
}
} }
impl MpscTmInStoreSender { impl MpscTmInStoreSender {
pub fn new(store_helper: SharedPool, sender: mpsc::Sender<StoreAddr>) -> Self { pub fn new(
id: SenderId,
name: &'static str,
store_helper: SharedPool,
sender: mpsc::Sender<StoreAddr>,
) -> Self {
Self { Self {
id,
name,
store_helper, store_helper,
sender, sender,
ignore_poison_errors: false, ignore_poison_errors: false,
@ -175,16 +199,22 @@ pub mod std_mod {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct MpscTmAsVecSender { pub struct MpscTmAsVecSender {
id: SenderId,
sender: mpsc::Sender<Vec<u8>>, sender: mpsc::Sender<Vec<u8>>,
name: &'static str,
} }
impl MpscTmAsVecSender { impl MpscTmAsVecSender {
pub fn new(sender: mpsc::Sender<Vec<u8>>) -> Self { pub fn new(id: u32, name: &'static str, sender: mpsc::Sender<Vec<u8>>) -> Self {
Self { sender } Self { id, sender, name }
} }
} }
impl EcssTmSenderCore for MpscTmAsVecSender { impl EcssTmSenderCore for MpscTmAsVecSender {
type Error = MpscAsVecSenderError; type Error = MpscAsVecSenderError;
fn id(&self) -> SenderId {
self.id
}
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
let mut vec = Vec::new(); let mut vec = Vec::new();
tm.append_to_vec(&mut vec) tm.append_to_vec(&mut vec)
@ -194,6 +224,10 @@ pub mod std_mod {
.map_err(MpscAsVecSenderError::SendError)?; .map_err(MpscAsVecSenderError::SendError)?;
Ok(()) Ok(())
} }
fn name(&self) -> &'static str {
self.name
}
} }
} }

View File

@ -29,7 +29,7 @@
//! let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); //! let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]);
//! let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); //! let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone()))));
//! let (verif_tx, verif_rx) = mpsc::channel(); //! let (verif_tx, verif_rx) = mpsc::channel();
//! let sender = MpscVerifSender::new(shared_tm_pool.clone(), verif_tx); //! let sender = MpscVerifSender::new(0, "verif_tm", shared_tm_pool.clone(), verif_tx);
//! let cfg = VerificationReporterCfg::new(TEST_APID, Box::new(SeqCountProviderSimple::default()), 1, 2, 8).unwrap(); //! let cfg = VerificationReporterCfg::new(TEST_APID, Box::new(SeqCountProviderSimple::default()), 1, 2, 8).unwrap();
//! let mut reporter = VerificationReporterWithSender::new(&cfg , Box::new(sender)); //! let mut reporter = VerificationReporterWithSender::new(&cfg , Box::new(sender));
//! //!
@ -1259,6 +1259,7 @@ mod stdmod {
use super::*; use super::*;
use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr}; use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr};
use crate::pus::MpscPusInStoreSendError; use crate::pus::MpscPusInStoreSendError;
use crate::SenderId;
use delegate::delegate; use delegate::delegate;
use spacepackets::tm::PusTm; use spacepackets::tm::PusTm;
use std::sync::{mpsc, Arc, Mutex, RwLockWriteGuard}; use std::sync::{mpsc, Arc, Mutex, RwLockWriteGuard};
@ -1272,17 +1273,21 @@ mod stdmod {
#[derive(Clone)] #[derive(Clone)]
struct StdSenderBase<S> { struct StdSenderBase<S> {
pub ignore_poison_error: bool, id: SenderId,
name: &'static str,
tm_store: SharedPool, tm_store: SharedPool,
tx: S, tx: S,
pub ignore_poison_error: bool,
} }
impl<S: SendBackend> StdSenderBase<S> { impl<S: SendBackend> StdSenderBase<S> {
pub fn new(tm_store: SharedPool, tx: S) -> Self { pub fn new(id: SenderId, name: &'static str, tm_store: SharedPool, tx: S) -> Self {
Self { Self {
ignore_poison_error: false, id,
name,
tm_store, tm_store,
tx, tx,
ignore_poison_error: false,
} }
} }
} }
@ -1304,9 +1309,14 @@ mod stdmod {
/// Verification sender with a [mpsc::Sender] backend. /// Verification sender with a [mpsc::Sender] backend.
/// It implements the [EcssTmSenderCore] trait to be used as PUS Verification TM sender. /// It implements the [EcssTmSenderCore] trait to be used as PUS Verification TM sender.
impl MpscVerifSender { impl MpscVerifSender {
pub fn new(tm_store: SharedPool, tx: mpsc::Sender<StoreAddr>) -> Self { pub fn new(
id: SenderId,
name: &'static str,
tm_store: SharedPool,
tx: mpsc::Sender<StoreAddr>,
) -> Self {
Self { Self {
base: StdSenderBase::new(tm_store, tx), base: StdSenderBase::new(id, name, tm_store, tx),
} }
} }
} }
@ -1317,6 +1327,8 @@ mod stdmod {
delegate!( delegate!(
to self.base { to self.base {
fn id(&self) -> SenderId;
fn name(&self) -> &'static str;
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>; fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>;
} }
); );
@ -1338,9 +1350,14 @@ mod stdmod {
#[cfg(feature = "crossbeam")] #[cfg(feature = "crossbeam")]
impl CrossbeamVerifSender { impl CrossbeamVerifSender {
pub fn new(tm_store: SharedPool, tx: crossbeam_channel::Sender<StoreAddr>) -> Self { pub fn new(
id: SenderId,
name: &'static str,
tm_store: SharedPool,
tx: crossbeam_channel::Sender<StoreAddr>,
) -> Self {
Self { Self {
base: StdSenderBase::new(tm_store, tx), base: StdSenderBase::new(id, name, tm_store, tx),
} }
} }
} }
@ -1352,6 +1369,8 @@ mod stdmod {
delegate!( delegate!(
to self.base { to self.base {
fn id(&self) -> SenderId;
fn name(&self) -> &'static str;
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>; fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>;
} }
); );
@ -1359,6 +1378,11 @@ mod stdmod {
impl<S: SendBackend + Clone + 'static> EcssTmSenderCore for StdSenderBase<S> { impl<S: SendBackend + Clone + 'static> EcssTmSenderCore for StdSenderBase<S> {
type Error = MpscPusInStoreSendError; type Error = MpscPusInStoreSendError;
fn id(&self) -> SenderId {
self.id
}
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
let operation = |mut mg: RwLockWriteGuard<ShareablePoolProvider>| { let operation = |mut mg: RwLockWriteGuard<ShareablePoolProvider>| {
let (addr, buf) = mg.free_element(tm.len_packed())?; let (addr, buf) = mg.free_element(tm.len_packed())?;
@ -1381,6 +1405,10 @@ mod stdmod {
} }
} }
} }
fn name(&self) -> &'static str {
self.name
}
} }
} }
@ -1410,6 +1438,7 @@ mod tests {
}; };
use crate::pus::EcssTmErrorWithSend; use crate::pus::EcssTmErrorWithSend;
use crate::seq_count::SeqCountProviderSimple; use crate::seq_count::SeqCountProviderSimple;
use crate::SenderId;
use alloc::boxed::Box; use alloc::boxed::Box;
use alloc::format; use alloc::format;
use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU8, EcssEnumeration, PusPacket}; use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU8, EcssEnumeration, PusPacket};
@ -1442,6 +1471,11 @@ mod tests {
impl EcssTmSenderCore for TestSender { impl EcssTmSenderCore for TestSender {
type Error = (); type Error = ();
fn id(&self) -> SenderId {
0
}
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
assert_eq!(PusPacket::service(&tm), 1); assert_eq!(PusPacket::service(&tm), 1);
assert!(tm.source_data().is_some()); assert!(tm.source_data().is_some());
@ -1463,6 +1497,10 @@ mod tests {
}); });
Ok(()) Ok(())
} }
fn name(&self) -> &'static str {
"test_sender"
}
} }
#[derive(Debug, Copy, Clone, Eq, PartialEq)] #[derive(Debug, Copy, Clone, Eq, PartialEq)]
@ -1472,6 +1510,9 @@ mod tests {
impl EcssTmSenderCore for FallibleSender { impl EcssTmSenderCore for FallibleSender {
type Error = DummyError; type Error = DummyError;
fn id(&self) -> SenderId {
0
}
fn send_tm(&mut self, _: PusTm) -> Result<(), Self::Error> { fn send_tm(&mut self, _: PusTm) -> Result<(), Self::Error> {
Err(DummyError {}) Err(DummyError {})
} }
@ -1566,7 +1607,7 @@ mod tests {
let pool = LocalPool::new(PoolCfg::new(vec![(8, 8)])); let pool = LocalPool::new(PoolCfg::new(vec![(8, 8)]));
let shared_pool: SharedPool = Arc::new(RwLock::new(Box::new(pool))); let shared_pool: SharedPool = Arc::new(RwLock::new(Box::new(pool)));
let (tx, _) = mpsc::channel(); let (tx, _) = mpsc::channel();
let mpsc_verif_sender = MpscVerifSender::new(shared_pool, tx); let mpsc_verif_sender = MpscVerifSender::new(0, "verif_sender", shared_pool, tx);
is_send(&mpsc_verif_sender); is_send(&mpsc_verif_sender);
} }

View File

@ -7,7 +7,7 @@ use satrs_core::params::{Params, ParamsHeapless, WritableToBeBytes};
use satrs_core::pus::event_man::{ use satrs_core::pus::event_man::{
DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher, DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher,
}; };
use satrs_core::pus::EcssTmSenderCore; use satrs_core::pus::MpscTmAsVecSender;
use spacepackets::ecss::{PusError, PusPacket}; use spacepackets::ecss::{PusError, PusPacket};
use spacepackets::tm::PusTm; use spacepackets::tm::PusTm;
use std::sync::mpsc::{channel, SendError, TryRecvError}; use std::sync::mpsc::{channel, SendError, TryRecvError};
@ -24,24 +24,6 @@ pub enum CustomTmSenderError {
PusError(PusError), PusError(PusError),
} }
#[derive(Clone)]
struct EventTmSender {
sender: std::sync::mpsc::Sender<Vec<u8>>,
}
impl EcssTmSenderCore for EventTmSender {
type Error = CustomTmSenderError;
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
let mut vec = Vec::new();
tm.append_to_vec(&mut vec)
.map_err(|e| CustomTmSenderError::PusError(e))?;
self.sender
.send(vec)
.map_err(CustomTmSenderError::SendError)?;
Ok(())
}
}
#[test] #[test]
fn test_threaded_usage() { fn test_threaded_usage() {
let (event_sender, event_man_receiver) = channel(); let (event_sender, event_man_receiver) = channel();
@ -58,7 +40,7 @@ fn test_threaded_usage() {
let mut pus_event_man = PusEventDispatcher::new(reporter, Box::new(backend)); let mut pus_event_man = PusEventDispatcher::new(reporter, Box::new(backend));
// PUS + Generic event manager thread // PUS + Generic event manager thread
let jh0 = thread::spawn(move || { let jh0 = thread::spawn(move || {
let mut sender = EventTmSender { sender: event_tx }; let mut sender = MpscTmAsVecSender::new(0, "event_sender", event_tx);
let mut event_cnt = 0; let mut event_cnt = 0;
let mut params_array: [u8; 128] = [0; 128]; let mut params_array: [u8; 128] = [0; 128];
loop { loop {

View File

@ -47,7 +47,8 @@ pub mod crossbeam_test {
let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg)));
let shared_tc_pool_1 = shared_tc_pool_0.clone(); let shared_tc_pool_1 = shared_tc_pool_0.clone();
let (tx, rx) = crossbeam_channel::bounded(5); let (tx, rx) = crossbeam_channel::bounded(5);
let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), tx.clone()); let sender =
CrossbeamVerifSender::new(0, "verif_sender", shared_tm_pool.clone(), tx.clone());
let mut reporter_with_sender_0 = let mut reporter_with_sender_0 =
VerificationReporterWithSender::new(&cfg, Box::new(sender)); VerificationReporterWithSender::new(&cfg, Box::new(sender));
let mut reporter_with_sender_1 = reporter_with_sender_0.clone(); let mut reporter_with_sender_1 = reporter_with_sender_0.clone();

View File

@ -74,7 +74,12 @@ fn main() {
let (tc_source_tx, tc_source_rx) = channel(); let (tc_source_tx, tc_source_rx) = channel();
let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_funnel_tx, tm_funnel_rx) = channel();
let (tm_server_tx, tm_server_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel();
let verif_sender = MpscVerifSender::new(tm_store.pool.clone(), tm_funnel_tx.clone()); let verif_sender = MpscVerifSender::new(
0,
"verif_sender",
tm_store.pool.clone(),
tm_funnel_tx.clone(),
);
let verif_cfg = VerificationReporterCfg::new( let verif_cfg = VerificationReporterCfg::new(
PUS_APID, PUS_APID,
#[allow(clippy::box_default)] #[allow(clippy::box_default)]
@ -174,7 +179,8 @@ fn main() {
.name("Event".to_string()) .name("Event".to_string())
.spawn(move || { .spawn(move || {
let mut timestamp: [u8; 7] = [0; 7]; let mut timestamp: [u8; 7] = [0; 7];
let mut sender = MpscTmInStoreSender::new(tm_store.pool, tm_funnel_tx); let mut sender =
MpscTmInStoreSender::new(1, "event_sender", tm_store.pool, tm_funnel_tx);
let mut time_provider = TimeProvider::new_with_u16_days(0, 0); let mut time_provider = TimeProvider::new_with_u16_days(0, 0);
let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| {
reporter_event_handler reporter_event_handler