From 6072bc765743d05f5d61e0fe72702b36d4f1a4e5 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 27 Feb 2023 17:00:21 +0100 Subject: [PATCH] new generic Mpsc PUS senders --- satrs-core/src/event_man.rs | 3 +- satrs-core/src/lib.rs | 3 + satrs-core/src/power.rs | 11 ++- satrs-core/src/pus/event.rs | 6 +- satrs-core/src/pus/event_man.rs | 29 ++------ satrs-core/src/pus/mod.rs | 114 ++++++++++++++++++++++++++++- satrs-core/src/pus/verification.rs | 102 ++++++++++++++------------ satrs-core/tests/pus_events.rs | 18 +++-- satrs-example/src/main.rs | 34 +-------- 9 files changed, 200 insertions(+), 120 deletions(-) diff --git a/satrs-core/src/event_man.rs b/satrs-core/src/event_man.rs index 5d2db19..ca47c39 100644 --- a/satrs-core/src/event_man.rs +++ b/satrs-core/src/event_man.rs @@ -66,6 +66,7 @@ use core::slice::Iter; #[cfg(feature = "alloc")] use hashbrown::HashMap; +use crate::SenderId; #[cfg(feature = "std")] pub use stdmod::*; @@ -84,8 +85,6 @@ pub type EventWithAuxData = (Event, Option); pub type EventU32WithAuxData = EventWithAuxData; pub type EventU16WithAuxData = EventWithAuxData; -pub type SenderId = u32; - pub trait SendEventProvider { type Error; diff --git a/satrs-core/src/lib.rs b/satrs-core/src/lib.rs index 3b87c69..4fed5c6 100644 --- a/satrs-core/src/lib.rs +++ b/satrs-core/src/lib.rs @@ -44,3 +44,6 @@ pub mod seq_count; pub mod tmtc; pub use spacepackets; + +// Generic sender ID type. +pub type SenderId = u32; diff --git a/satrs-core/src/power.rs b/satrs-core/src/power.rs index 810ed72..1675c01 100644 --- a/satrs-core/src/power.rs +++ b/satrs-core/src/power.rs @@ -51,19 +51,17 @@ pub trait PowerSwitchInfo { fn switch_delay_ms(&self) -> u32; } -pub trait PowerSwitchProvider: PowerSwitcherCommandSender + PowerSwitchInfo { - type Error; -} - #[cfg(test)] mod tests { + #![allow(dead_code)] use super::*; - use crate::power::PowerSwitcherCommandSender; use std::boxed::Box; struct Pcdu { switch_rx: std::sync::mpsc::Receiver<(SwitchId, u16)>, } + + #[derive(Eq, PartialEq)] enum DeviceState { OFF, SwitchingPower, @@ -73,6 +71,7 @@ mod tests { } struct MyComplexDevice { power_switcher: Box>, + power_info: Box>, switch_id: SwitchId, some_state: u16, dev_state: DeviceState, @@ -92,7 +91,7 @@ mod tests { self.dev_state = DeviceState::SwitchingPower; } if self.dev_state == DeviceState::SwitchingPower { - if self.power_switcher.get_is_switch_on() { + if self.power_info.get_is_switch_on(0).unwrap() { self.dev_state = DeviceState::ON; self.mode = 1; } diff --git a/satrs-core/src/pus/event.rs b/satrs-core/src/pus/event.rs index cc85df6..5e4c58b 100644 --- a/satrs-core/src/pus/event.rs +++ b/satrs-core/src/pus/event.rs @@ -109,7 +109,9 @@ impl EventReporterBase { aux_data: Option<&[u8]>, ) -> Result<(), EcssTmErrorWithSend> { let tm = self.generate_generic_event_tm(buf, subservice, time_stamp, event_id, aux_data)?; - sender.send_tm(tm)?; + sender + .send_tm(tm) + .map_err(|e| EcssTmErrorWithSend::SendError(e))?; self.msg_count += 1; Ok(()) } @@ -266,7 +268,7 @@ mod tests { impl EcssTmSenderCore for TestSender { type Error = (); - fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<()>> { + fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { assert!(tm.source_data().is_some()); let src_data = tm.source_data().unwrap(); assert!(src_data.len() >= 4); diff --git a/satrs-core/src/pus/event_man.rs b/satrs-core/src/pus/event_man.rs index 9096792..a463813 100644 --- a/satrs-core/src/pus/event_man.rs +++ b/satrs-core/src/pus/event_man.rs @@ -238,33 +238,14 @@ pub mod alloc_mod { mod tests { use super::*; use crate::events::SeverityInfo; - use spacepackets::tm::PusTm; - use std::sync::mpsc::{channel, SendError, TryRecvError}; - use std::vec::Vec; + use crate::pus::MpscTmAsVecSender; + use std::sync::mpsc::{channel, TryRecvError}; const INFO_EVENT: EventU32TypedSev = EventU32TypedSev::::const_new(1, 0); const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5); const EMPTY_STAMP: [u8; 7] = [0; 7]; - #[derive(Clone)] - struct EventTmSender { - sender: std::sync::mpsc::Sender>, - } - - impl EcssTmSenderCore for EventTmSender { - type Error = SendError>; - fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend> { - let mut vec = Vec::new(); - tm.append_to_vec(&mut vec) - .map_err(|e| EcssTmErrorWithSend::EcssTmError(e.into()))?; - self.sender - .send(vec) - .map_err(EcssTmErrorWithSend::SendError)?; - Ok(()) - } - } - fn create_basic_man() -> PusEventDispatcher<(), EventU32> { let reporter = EventReporter::new(0x02, 128).expect("Creating event repoter failed"); let backend = DefaultPusMgmtBackendProvider::::default(); @@ -275,7 +256,7 @@ mod tests { fn test_basic() { let mut event_man = create_basic_man(); let (event_tx, event_rx) = channel(); - let mut sender = EventTmSender { sender: event_tx }; + let mut sender = MpscTmAsVecSender::new(event_tx); let event_sent = event_man .generate_pus_event_tm(&mut sender, &EMPTY_STAMP, INFO_EVENT, None) .expect("Sending info event failed"); @@ -289,7 +270,7 @@ mod tests { fn test_disable_event() { let mut event_man = create_basic_man(); let (event_tx, event_rx) = channel(); - let mut sender = EventTmSender { sender: event_tx }; + let mut sender = MpscTmAsVecSender::new(event_tx); let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT); assert!(res.is_ok()); assert!(res.unwrap()); @@ -312,7 +293,7 @@ mod tests { fn test_reenable_event() { let mut event_man = create_basic_man(); let (event_tx, event_rx) = channel(); - let mut sender = EventTmSender { sender: event_tx }; + let mut sender = MpscTmAsVecSender::new(event_tx); let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT); assert!(res.is_ok()); assert!(res.unwrap()); diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index 1abcab6..a558b41 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -19,9 +19,12 @@ pub mod verification; #[cfg(feature = "alloc")] pub use alloc_mod::*; +#[cfg(feature = "std")] +pub use std_mod::*; + #[derive(Debug, Clone)] pub enum EcssTmErrorWithSend { - /// Errors related to sending the verification telemetry to a TM recipient + /// Errors related to sending the telemetry to a TM recipient SendError(E), EcssTmError(EcssTmError), } @@ -32,7 +35,7 @@ impl From for EcssTmErrorWithSend { } } -/// Generic error type which is also able to wrap a user send error with the user supplied type E. +/// Generic error type for PUS TM handling. #[derive(Debug, Clone)] pub enum EcssTmError { /// Errors related to the time stamp format of the telemetry @@ -61,7 +64,7 @@ impl From for EcssTmError { pub trait EcssTmSenderCore: Send { type Error; - fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend>; + fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>; } #[cfg(feature = "alloc")] @@ -89,6 +92,111 @@ mod alloc_mod { impl_downcast!(EcssTmSender assoc Error); } +#[cfg(feature = "std")] +pub mod std_mod { + use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError}; + use crate::pus::EcssTmSenderCore; + use alloc::vec::Vec; + use spacepackets::ecss::PusError; + use spacepackets::tm::PusTm; + use std::sync::mpsc::SendError; + use std::sync::{mpsc, RwLockWriteGuard}; + + #[derive(Debug, Clone)] + pub enum MpscPusInStoreSendError { + LockError, + PusError(PusError), + StoreError(StoreError), + SendError(SendError), + RxDisconnected(StoreAddr), + } + + impl From for MpscPusInStoreSendError { + fn from(value: PusError) -> Self { + MpscPusInStoreSendError::PusError(value) + } + } + impl From> for MpscPusInStoreSendError { + fn from(value: SendError) -> Self { + MpscPusInStoreSendError::SendError(value) + } + } + impl From for MpscPusInStoreSendError { + fn from(value: StoreError) -> Self { + MpscPusInStoreSendError::StoreError(value) + } + } + + #[derive(Clone)] + pub struct MpscTmInStoreSender { + store_helper: SharedPool, + sender: mpsc::Sender, + pub ignore_poison_errors: bool, + } + + impl EcssTmSenderCore for MpscTmInStoreSender { + type Error = MpscPusInStoreSendError; + + fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { + let operation = |mut store: RwLockWriteGuard| { + let (addr, slice) = store.free_element(tm.len_packed())?; + tm.write_to_bytes(slice)?; + self.sender.send(addr)?; + Ok(()) + }; + match self.store_helper.write() { + Ok(pool) => operation(pool), + Err(e) => { + if self.ignore_poison_errors { + operation(e.into_inner()) + } else { + Err(MpscPusInStoreSendError::LockError) + } + } + } + } + } + + impl MpscTmInStoreSender { + pub fn new(store_helper: SharedPool, sender: mpsc::Sender) -> Self { + Self { + store_helper, + sender, + ignore_poison_errors: false, + } + } + } + + #[derive(Debug, Clone)] + pub enum MpscAsVecSenderError { + PusError(PusError), + SendError(SendError>), + } + + #[derive(Debug, Clone)] + pub struct MpscTmAsVecSender { + sender: mpsc::Sender>, + } + + impl MpscTmAsVecSender { + pub fn new(sender: mpsc::Sender>) -> Self { + Self { sender } + } + } + impl EcssTmSenderCore for MpscTmAsVecSender { + type Error = MpscAsVecSenderError; + fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { + let mut vec = Vec::new(); + tm.append_to_vec(&mut vec) + .map_err(MpscAsVecSenderError::PusError)?; + self.sender + .send(vec) + .map_err(MpscAsVecSenderError::SendError)?; + Ok(()) + } + } +} + #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum GenericTcCheckError { NotEnoughAppData, diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index dae6471..75ca496 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -102,10 +102,7 @@ use crate::seq_count::SequenceCountProviderCore; #[cfg(all(feature = "crossbeam", feature = "std"))] pub use stdmod::CrossbeamVerifSender; #[cfg(feature = "std")] -pub use stdmod::{ - MpscVerifSender, SharedStdVerifReporterWithSender, StdVerifReporterWithSender, - StdVerifSenderError, -}; +pub use stdmod::{MpscVerifSender, SharedStdVerifReporterWithSender, StdVerifReporterWithSender}; /// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard. /// @@ -520,7 +517,12 @@ impl VerificationReporterCore { { sender .send_tm(sendable.pus_tm.take().unwrap()) - .map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; + .map_err(|e| { + VerificationOrSendErrorWithToken( + EcssTmErrorWithSend::SendError(e), + sendable.token.unwrap(), + ) + })?; Ok(sendable.send_success_acceptance_success(Some(seq_counter))) } @@ -532,7 +534,12 @@ impl VerificationReporterCore { ) -> Result<(), VerificationOrSendErrorWithToken> { sender .send_tm(sendable.pus_tm.take().unwrap()) - .map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; + .map_err(|e| { + VerificationOrSendErrorWithToken( + EcssTmErrorWithSend::SendError(e), + sendable.token.unwrap(), + ) + })?; sendable.send_success_verif_failure(Some(seq_counter)); Ok(()) } @@ -591,7 +598,12 @@ impl VerificationReporterCore { > { sender .send_tm(sendable.pus_tm.take().unwrap()) - .map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; + .map_err(|e| { + VerificationOrSendErrorWithToken( + EcssTmErrorWithSend::SendError(e), + sendable.token.unwrap(), + ) + })?; Ok(sendable.send_success_start_success(Some(seq_counter))) } @@ -627,7 +639,12 @@ impl VerificationReporterCore { ) -> Result<(), VerificationOrSendErrorWithToken> { sender .send_tm(sendable.pus_tm.take().unwrap()) - .map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; + .map_err(|e| { + VerificationOrSendErrorWithToken( + EcssTmErrorWithSend::SendError(e), + sendable.token.unwrap(), + ) + })?; sendable.send_success_verif_failure(Some(seq_counter)); Ok(()) } @@ -738,7 +755,12 @@ impl VerificationReporterCore { ) -> Result<(), VerificationOrSendErrorWithToken> { sender .send_tm(sendable.pus_tm.take().unwrap()) - .map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; + .map_err(|e| { + VerificationOrSendErrorWithToken( + EcssTmErrorWithSend::SendError(e), + sendable.token.unwrap(), + ) + })?; sendable.send_success_step_or_completion_success(Some(seq_counter)); Ok(()) } @@ -751,7 +773,12 @@ impl VerificationReporterCore { ) -> Result<(), VerificationOrSendErrorWithToken> { sender .send_tm(sendable.pus_tm.take().unwrap()) - .map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; + .map_err(|e| { + VerificationOrSendErrorWithToken( + EcssTmErrorWithSend::SendError(e), + sendable.token.unwrap(), + ) + })?; sendable.send_success_verif_failure(Some(seq_counter)); Ok(()) } @@ -1223,33 +1250,15 @@ mod alloc_mod { mod stdmod { use super::alloc_mod::VerificationReporterWithSender; use super::*; - use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError}; + use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr}; + use crate::pus::MpscPusInStoreSendError; use delegate::delegate; use spacepackets::tm::PusTm; use std::sync::{mpsc, Arc, Mutex, RwLockWriteGuard}; - pub type StdVerifReporterWithSender = VerificationReporterWithSender; + pub type StdVerifReporterWithSender = VerificationReporterWithSender; pub type SharedStdVerifReporterWithSender = Arc>; - #[derive(Debug, Eq, PartialEq, Clone)] - pub enum StdVerifSenderError { - PoisonError, - StoreError(StoreError), - RxDisconnected(StoreAddr), - } - - impl From for StdVerifSenderError { - fn from(e: StoreError) -> Self { - StdVerifSenderError::StoreError(e) - } - } - - impl From for EcssTmErrorWithSend { - fn from(e: StoreError) -> Self { - EcssTmErrorWithSend::SendError(e.into()) - } - } - trait SendBackend: Send { fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr>; } @@ -1297,11 +1306,11 @@ mod stdmod { //noinspection RsTraitImplementation impl EcssTmSenderCore for MpscVerifSender { - type Error = StdVerifSenderError; + type Error = MpscPusInStoreSendError; delegate!( to self.base { - fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend>; + fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>; } ); } @@ -1332,25 +1341,26 @@ mod stdmod { //noinspection RsTraitImplementation #[cfg(feature = "crossbeam")] impl EcssTmSenderCore for CrossbeamVerifSender { - type Error = StdVerifSenderError; + type Error = MpscPusInStoreSendError; delegate!( to self.base { - fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend>; + fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>; } ); } impl EcssTmSenderCore for StdSenderBase { - type Error = StdVerifSenderError; - fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend> { + type Error = MpscPusInStoreSendError; + fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { let operation = |mut mg: RwLockWriteGuard| { let (addr, buf) = mg.free_element(tm.len_packed())?; - tm.write_to_bytes(buf).map_err(EcssTmError::PusError)?; + tm.write_to_bytes(buf) + .map_err(MpscPusInStoreSendError::PusError)?; drop(mg); - self.tx.send(addr).map_err(|_| { - EcssTmErrorWithSend::SendError(StdVerifSenderError::RxDisconnected(addr)) - })?; + self.tx + .send(addr) + .map_err(|_| MpscPusInStoreSendError::RxDisconnected(addr))?; Ok(()) }; match self.tm_store.write() { @@ -1359,9 +1369,7 @@ mod stdmod { if self.ignore_poison_error { operation(poison_error.into_inner()) } else { - Err(EcssTmErrorWithSend::SendError( - StdVerifSenderError::PoisonError, - )) + Err(MpscPusInStoreSendError::LockError) } } } @@ -1427,7 +1435,7 @@ mod tests { impl EcssTmSenderCore for TestSender { type Error = (); - fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend> { + fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { assert_eq!(PusPacket::service(&tm), 1); assert!(tm.source_data().is_some()); let mut time_stamp = [0; 7]; @@ -1457,8 +1465,8 @@ mod tests { impl EcssTmSenderCore for FallibleSender { type Error = DummyError; - fn send_tm(&mut self, _: PusTm) -> Result<(), EcssTmErrorWithSend> { - Err(EcssTmErrorWithSend::SendError(DummyError {})) + fn send_tm(&mut self, _: PusTm) -> Result<(), Self::Error> { + Err(DummyError {}) } } diff --git a/satrs-core/tests/pus_events.rs b/satrs-core/tests/pus_events.rs index 8c47162..64eb48b 100644 --- a/satrs-core/tests/pus_events.rs +++ b/satrs-core/tests/pus_events.rs @@ -7,8 +7,8 @@ use satrs_core::params::{Params, ParamsHeapless, WritableToBeBytes}; use satrs_core::pus::event_man::{ DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher, }; -use satrs_core::pus::{EcssTmErrorWithSend, EcssTmSenderCore}; -use spacepackets::ecss::PusPacket; +use satrs_core::pus::EcssTmSenderCore; +use spacepackets::ecss::{PusError, PusPacket}; use spacepackets::tm::PusTm; use std::sync::mpsc::{channel, SendError, TryRecvError}; use std::thread; @@ -18,20 +18,26 @@ const INFO_EVENT: EventU32TypedSev = const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5); const EMPTY_STAMP: [u8; 7] = [0; 7]; +#[derive(Debug, Clone)] +pub enum CustomTmSenderError { + SendError(SendError>), + PusError(PusError), +} + #[derive(Clone)] struct EventTmSender { sender: std::sync::mpsc::Sender>, } impl EcssTmSenderCore for EventTmSender { - type Error = SendError>; - fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend> { + type Error = CustomTmSenderError; + fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { let mut vec = Vec::new(); tm.append_to_vec(&mut vec) - .map_err(|e| EcssTmErrorWithSend::EcssTmError(e.into()))?; + .map_err(|e| CustomTmSenderError::PusError(e))?; self.sender .send(vec) - .map_err(EcssTmErrorWithSend::SendError)?; + .map_err(CustomTmSenderError::SendError)?; Ok(()) } } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index bcb08f8..244b085 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -18,7 +18,7 @@ use satrs_core::event_man::{ }; use satrs_core::events::EventU32; use satrs_core::hk::HkRequest; -use satrs_core::pool::{LocalPool, PoolCfg, StoreAddr}; +use satrs_core::pool::{LocalPool, PoolCfg}; use satrs_core::pus::event_man::{ DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, PusEventDispatcher, @@ -27,7 +27,7 @@ use satrs_core::pus::hk::Subservice as HkSubservice; use satrs_core::pus::verification::{ MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender, }; -use satrs_core::pus::{EcssTmErrorWithSend, EcssTmSenderCore}; +use satrs_core::pus::MpscTmInStoreSender; use satrs_core::seq_count::SeqCountProviderSimple; use satrs_core::spacepackets::{ time::cds::TimeProvider, @@ -39,36 +39,10 @@ use satrs_example::{RequestTargetId, OBSW_SERVER_ADDR, SERVER_PORT}; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::mpsc::{channel, TryRecvError}; -use std::sync::{mpsc, Arc, RwLock}; +use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; -#[derive(Clone)] -struct EventTmSender { - store_helper: TmStore, - sender: mpsc::Sender, -} - -impl EventTmSender { - fn new(store_helper: TmStore, sender: mpsc::Sender) -> Self { - Self { - store_helper, - sender, - } - } -} - -impl EcssTmSenderCore for EventTmSender { - type Error = mpsc::SendError; - - fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend> { - let addr = self.store_helper.add_pus_tm(&tm); - self.sender - .send(addr) - .map_err(EcssTmErrorWithSend::SendError) - } -} - fn main() { setup_logger().expect("setting up logging with fern failed"); println!("Running OBSW example"); @@ -199,7 +173,7 @@ fn main() { .name("Event".to_string()) .spawn(move || { let mut timestamp: [u8; 7] = [0; 7]; - let mut sender = EventTmSender::new(tm_store, tm_funnel_tx); + let mut sender = MpscTmInStoreSender::new(tm_store.pool, tm_funnel_tx); let mut time_provider = TimeProvider::new_with_u16_days(0, 0); let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { reporter_event_handler