diff --git a/satrs-core/src/lib.rs b/satrs-core/src/lib.rs index 61d569d..4fda814 100644 --- a/satrs-core/src/lib.rs +++ b/satrs-core/src/lib.rs @@ -29,5 +29,5 @@ pub mod params; #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] pub mod pool; pub mod pus; -pub mod tmtc; pub mod seq_count; +pub mod tmtc; diff --git a/satrs-core/src/pus/event.rs b/satrs-core/src/pus/event.rs index 10e0618..7e80482 100644 --- a/satrs-core/src/pus/event.rs +++ b/satrs-core/src/pus/event.rs @@ -298,14 +298,14 @@ mod tests { #[allow(dead_code)] const EXAMPLE_EVENT_ID_1: u16 = 2; - #[derive(Debug, Eq, PartialEq)] + #[derive(Debug, Eq, PartialEq, Clone)] struct TmInfo { pub common: CommonTmInfo, pub event: EventU32, pub aux_data: Vec, } - #[derive(Default)] + #[derive(Default, Clone)] struct TestSender { pub service_queue: VecDeque, } diff --git a/satrs-core/src/pus/event_man.rs b/satrs-core/src/pus/event_man.rs index 4232258..d747222 100644 --- a/satrs-core/src/pus/event_man.rs +++ b/satrs-core/src/pus/event_man.rs @@ -233,6 +233,7 @@ mod tests { const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5); const EMPTY_STAMP: [u8; 7] = [0; 7]; + #[derive(Clone)] struct EventTmSender { sender: std::sync::mpsc::Sender>, } diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index 0703a8e..fa0e396 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -4,6 +4,7 @@ //! //! 1. PUS Verification Service 1 module inside [verification]. Requires [alloc] support. use downcast_rs::{impl_downcast, Downcast}; +use dyn_clone::DynClone; use spacepackets::ecss::PusError; use spacepackets::time::TimestampError; use spacepackets::tm::PusTm; @@ -43,13 +44,14 @@ impl From for EcssTmError { /// This sender object is responsible for sending telemetry to a TM sink. The [Downcast] trait /// is implemented to allow passing the sender as a boxed trait object and still retrieve the /// concrete type at a later point. -pub trait EcssTmSender: Downcast + Send { +pub trait EcssTmSender: Downcast + Send + DynClone { type Error; fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmError>; } impl_downcast!(EcssTmSender assoc Error); +dyn_clone::clone_trait_object!( EcssTmSender); pub(crate) fn source_buffer_large_enough(cap: usize, len: usize) -> Result<(), EcssTmError> { if len > cap { @@ -68,7 +70,7 @@ pub(crate) mod tests { use spacepackets::tm::{PusTm, PusTmSecondaryHeaderT}; use spacepackets::CcsdsPacket; - #[derive(Debug, Eq, PartialEq)] + #[derive(Debug, Eq, PartialEq, Clone)] pub(crate) struct CommonTmInfo { pub subservice: u8, pub apid: u16, diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index 56611ff..750cabd 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -31,7 +31,7 @@ //! let (verif_tx, verif_rx) = crossbeam_channel::bounded(10); //! let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), verif_tx); //! let cfg = VerificationReporterCfg::new(TEST_APID, Box::new(SimpleSeqCountProvider::default()), 1, 2, 8).unwrap(); -//! let mut reporter = VerificationReporterWithSender::new(cfg , Box::new(sender)); +//! let mut reporter = VerificationReporterWithSender::new(&cfg , Box::new(sender)); //! //! let mut sph = SpHeader::tc(TEST_APID, 0, 0).unwrap(); //! let tc_header = PusTcSecondaryHeader::new_simple(17, 1); @@ -87,14 +87,16 @@ use spacepackets::{SpHeader, MAX_APID}; pub use crate::seq_count::SimpleSeqCountProvider; #[cfg(feature = "alloc")] -pub use allocmod::{VerificationReporterWithBuf, VerificationReporterCfg, VerificationReporterWithSender}; +pub use allocmod::{ + VerificationReporterCfg, VerificationReporterWithBuf, VerificationReporterWithSender, +}; +use crate::seq_count::SequenceCountProvider; #[cfg(feature = "std")] pub use stdmod::{ CrossbeamVerifSender, MpscVerifSender, SharedStdVerifReporterWithSender, StdVerifReporterWithSender, StdVerifSenderError, }; -use crate::seq_count::SequenceCountProvider; #[derive(Debug, Eq, PartialEq, Copy, Clone)] pub enum Subservices { @@ -291,10 +293,7 @@ impl VerificationReporterBasic { if apid > MAX_APID { return None; } - Some(Self { - apid, - dest_id: 0, - }) + Some(Self { apid, dest_id: 0 }) } pub fn set_apid(&mut self, apid: u16) -> bool { @@ -696,7 +695,7 @@ mod allocmod { /// Primary verification handler. It provides an API to send PUS 1 verification telemetry packets /// and verify the various steps of telecommand handling as specified in the PUS standard. - #[derive (Clone)] + #[derive(Clone)] pub struct VerificationReporterWithBuf { source_data_buf: Vec, seq_counter: Box + Send + 'static>, @@ -796,8 +795,13 @@ mod allocmod { sender: &mut (impl EcssTmSender + ?Sized), params: FailParams, ) -> Result<(), VerificationErrorWithToken> { - self.reporter - .start_failure(self.source_data_buf.as_mut_slice(), token, sender, self.seq_counter.as_mut(), params) + self.reporter.start_failure( + self.source_data_buf.as_mut_slice(), + token, + sender, + self.seq_counter.as_mut(), + params, + ) } /// Package and send a PUS TM\[1, 5\] packet, see 8.1.2.5 of the PUS standard. @@ -830,8 +834,13 @@ mod allocmod { sender: &mut (impl EcssTmSender + ?Sized), params: FailParamsWithStep, ) -> Result<(), VerificationErrorWithToken> { - self.reporter - .step_failure(self.source_data_buf.as_mut_slice(), token, sender, self.seq_counter.as_mut(), params) + self.reporter.step_failure( + self.source_data_buf.as_mut_slice(), + token, + sender, + self.seq_counter.as_mut(), + params, + ) } /// Package and send a PUS TM\[1, 7\] packet, see 8.1.2.7 of the PUS standard. @@ -875,13 +884,17 @@ mod allocmod { /// Helper object which caches the sender passed as a trait object. Provides the same /// API as [VerificationReporter] but without the explicit sender arguments. + #[derive(Clone)] pub struct VerificationReporterWithSender { pub reporter: VerificationReporterWithBuf, pub sender: Box>, } impl VerificationReporterWithSender { - pub fn new(cfg: &VerificationReporterCfg, sender: Box>) -> Self { + pub fn new( + cfg: &VerificationReporterCfg, + sender: Box>, + ) -> Self { let reporter = VerificationReporterWithBuf::new(cfg); Self::new_from_reporter(reporter, sender) } @@ -993,7 +1006,7 @@ mod stdmod { pub type StdVerifReporterWithSender = VerificationReporterWithSender; pub type SharedStdVerifReporterWithSender = Arc>; - #[derive(Debug, Eq, PartialEq)] + #[derive(Debug, Eq, PartialEq, Clone)] pub enum StdVerifSenderError { PoisonError, StoreError(StoreError), @@ -1016,6 +1029,7 @@ mod stdmod { fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr>; } + #[derive(Clone)] struct StdSenderBase { pub ignore_poison_error: bool, tm_store: SharedPool, @@ -1041,6 +1055,7 @@ mod stdmod { } } + #[derive(Clone)] pub struct MpscVerifSender { base: StdSenderBase>, } @@ -1076,6 +1091,7 @@ mod stdmod { /// Verification sender with a [crossbeam_channel::Sender] backend. /// It implements the [EcssTmSender] trait to be used as PUS Verification TM sender + #[derive(Clone)] pub struct CrossbeamVerifSender { base: StdSenderBase>, } @@ -1102,7 +1118,7 @@ mod stdmod { unsafe impl Sync for CrossbeamVerifSender {} unsafe impl Send for CrossbeamVerifSender {} - impl EcssTmSender for StdSenderBase { + impl EcssTmSender for StdSenderBase { type Error = StdVerifSenderError; fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmError> { let operation = |mut mg: RwLockWriteGuard| { @@ -1133,9 +1149,10 @@ mod tests { use crate::pus::tests::CommonTmInfo; use crate::pus::verification::{ EcssTmError, EcssTmSender, FailParams, FailParamsWithStep, RequestId, TcStateNone, - VerificationReporterWithBuf, VerificationReporterCfg, VerificationReporterWithSender, - VerificationToken + VerificationReporterCfg, VerificationReporterWithBuf, VerificationReporterWithSender, + VerificationToken, }; + use crate::seq_count::SimpleSeqCountProvider; use alloc::boxed::Box; use alloc::format; use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU8, EcssEnumeration, PusPacket}; @@ -1144,19 +1161,18 @@ mod tests { use spacepackets::{ByteConversionError, SpHeader}; use std::collections::VecDeque; use std::vec::Vec; - use crate::seq_count::SimpleSeqCountProvider; const TEST_APID: u16 = 0x02; const EMPTY_STAMP: [u8; 7] = [0; 7]; - #[derive(Debug, Eq, PartialEq)] + #[derive(Debug, Eq, PartialEq, Clone)] struct TmInfo { pub common: CommonTmInfo, pub req_id: RequestId, pub additional_data: Option>, } - #[derive(Default)] + #[derive(Default, Clone)] struct TestSender { pub service_queue: VecDeque, } @@ -1188,7 +1204,7 @@ mod tests { #[derive(Debug, Copy, Clone, Eq, PartialEq)] struct DummyError {} - #[derive(Default)] + #[derive(Default, Clone)] struct FallibleSender {} impl EcssTmSender for FallibleSender { @@ -1222,7 +1238,14 @@ mod tests { } fn base_reporter() -> VerificationReporterWithBuf { - let cfg = VerificationReporterCfg::new(TEST_APID, Box::new(SimpleSeqCountProvider::default()), 1, 2, 8).unwrap(); + let cfg = VerificationReporterCfg::new( + TEST_APID, + Box::new(SimpleSeqCountProvider::default()), + 1, + 2, + 8, + ) + .unwrap(); VerificationReporterWithBuf::new(&cfg) } diff --git a/satrs-core/src/seq_count.rs b/satrs-core/src/seq_count.rs index cd57f48..1e2ff57 100644 --- a/satrs-core/src/seq_count.rs +++ b/satrs-core/src/seq_count.rs @@ -14,7 +14,7 @@ pub trait SequenceCountProvider: DynClone { #[derive(Default, Clone)] pub struct SimpleSeqCountProvider { - seq_count: u16 + seq_count: u16, } dyn_clone::clone_trait_object!(SequenceCountProvider); @@ -32,12 +32,12 @@ impl SequenceCountProvider for SimpleSeqCountProvider { #[cfg(feature = "std")] pub mod stdmod { use super::*; - use std::sync::Arc; use std::sync::atomic::{AtomicU16, Ordering}; + use std::sync::Arc; #[derive(Clone)] pub struct SyncSeqCountProvider { - seq_count: Arc + seq_count: Arc, } impl SequenceCountProvider for SyncSeqCountProvider { diff --git a/satrs-core/tests/pus_events.rs b/satrs-core/tests/pus_events.rs index 77dc8f2..82fabd7 100644 --- a/satrs-core/tests/pus_events.rs +++ b/satrs-core/tests/pus_events.rs @@ -4,7 +4,9 @@ use satrs_core::event_man::{ use satrs_core::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo}; use satrs_core::params::U32Pair; use satrs_core::params::{Params, ParamsHeapless, WritableToBeBytes}; -use satrs_core::pus::event_man::{DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher}; +use satrs_core::pus::event_man::{ + DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher, +}; use satrs_core::pus::{EcssTmError, EcssTmSender}; use spacepackets::ecss::PusPacket; use spacepackets::tm::PusTm; @@ -16,6 +18,7 @@ const INFO_EVENT: EventU32TypedSev = const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5); const EMPTY_STAMP: [u8; 7] = [0; 7]; +#[derive(Clone)] struct EventTmSender { sender: std::sync::mpsc::Sender>, } diff --git a/satrs-core/tests/pus_verification.rs b/satrs-core/tests/pus_verification.rs index 6210902..fa9cdba 100644 --- a/satrs-core/tests/pus_verification.rs +++ b/satrs-core/tests/pus_verification.rs @@ -1,12 +1,15 @@ -use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; -use satrs_core::pus::verification::{CrossbeamVerifSender, FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender}; use hashbrown::HashMap; +use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; +use satrs_core::pus::verification::{ + CrossbeamVerifSender, FailParams, RequestId, VerificationReporterCfg, + VerificationReporterWithSender, +}; use satrs_core::seq_count::SimpleSeqCountProvider; use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tm::PusTm; use spacepackets::SpHeader; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; @@ -23,7 +26,14 @@ const PACKETS_SENT: u8 = 8; /// threads have sent the correct expected verification reports #[test] fn test_shared_reporter() { - let cfg = VerificationReporterCfg::new(TEST_APID, Box::new(SimpleSeqCountProvider::default()), 1, 2, 8).unwrap(); + let cfg = VerificationReporterCfg::new( + TEST_APID, + Box::new(SimpleSeqCountProvider::default()), + 1, + 2, + 8, + ) + .unwrap(); // Shared pool object to store the verification PUS telemetry let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); let shared_tm_pool: SharedPool = @@ -32,11 +42,8 @@ fn test_shared_reporter() { let shared_tc_pool_1 = shared_tc_pool_0.clone(); let (tx, rx) = crossbeam_channel::bounded(5); let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), tx.clone()); - let reporter_with_sender_0 = Arc::new(Mutex::new(VerificationReporterWithSender::new( - &cfg, - Box::new(sender), - ))); - let reporter_with_sender_1 = reporter_with_sender_0.clone(); + let mut reporter_with_sender_0 = VerificationReporterWithSender::new(&cfg, Box::new(sender)); + let mut reporter_with_sender_1 = reporter_with_sender_0.clone(); // For test purposes, we retrieve the request ID from the TCs and pass them to the receiver // tread. let req_id_0; @@ -76,29 +83,28 @@ fn test_shared_reporter() { } let (_tc, _) = PusTc::from_bytes(&tc_buf[0..tc_len]).unwrap(); let accepted_token; - { - let mut mg = reporter_with_sender_0.lock().expect("Locking mutex failed"); - let token = mg.add_tc_with_req_id(req_id_0); - accepted_token = mg - .acceptance_success(token, &FIXED_STAMP) - .expect("Acceptance success failed"); - } + + let token = reporter_with_sender_0.add_tc_with_req_id(req_id_0); + accepted_token = reporter_with_sender_0 + .acceptance_success(token, &FIXED_STAMP) + .expect("Acceptance success failed"); + // Do some start handling here let started_token; - { - let mut mg = reporter_with_sender_0.lock().expect("Locking mutex failed"); - started_token = mg - .start_success(accepted_token, &FIXED_STAMP) - .expect("Start success failed"); - // Do some step handling here - mg.step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(0)) - .expect("Start success failed"); - } - // Finish up - let mut mg = reporter_with_sender_0.lock().expect("Locking mutex failed"); - mg.step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(1)) + started_token = reporter_with_sender_0 + .start_success(accepted_token, &FIXED_STAMP) .expect("Start success failed"); - mg.completion_success(started_token, &FIXED_STAMP) + // Do some step handling here + reporter_with_sender_0 + .step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(0)) + .expect("Start success failed"); + + // Finish up + reporter_with_sender_0 + .step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(1)) + .expect("Start success failed"); + reporter_with_sender_0 + .completion_success(started_token, &FIXED_STAMP) .expect("Completion success failed"); }); @@ -116,19 +122,17 @@ fn test_shared_reporter() { tc_buf[0..tc_len].copy_from_slice(buf); } let (tc, _) = PusTc::from_bytes(&tc_buf[0..tc_len]).unwrap(); - let mut mg = reporter_with_sender_1 - .lock() - .expect("Locking reporter failed"); - let token = mg.add_tc(&tc); - let accepted_token = mg + let token = reporter_with_sender_1.add_tc(&tc); + let accepted_token = reporter_with_sender_1 .acceptance_success(token, &FIXED_STAMP) .expect("Acceptance success failed"); - let started_token = mg + let started_token = reporter_with_sender_1 .start_success(accepted_token, &FIXED_STAMP) .expect("Start success failed"); let fail_code = EcssEnumU16::new(2); let params = FailParams::new(&FIXED_STAMP, &fail_code, None); - mg.completion_failure(started_token, params) + reporter_with_sender_1 + .completion_failure(started_token, params) .expect("Completion success failed"); }); diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 4c613c1..836d9ab 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -13,8 +13,11 @@ use satrs_core::pus::event_man::{ DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, PusEventDispatcher, }; -use satrs_core::pus::verification::{MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender}; +use satrs_core::pus::verification::{ + MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender, +}; use satrs_core::pus::{EcssTmError, EcssTmSender}; +use satrs_core::seq_count::SimpleSeqCountProvider; use satrs_core::tmtc::CcsdsError; use satrs_example::{OBSW_SERVER_ADDR, SERVER_PORT}; use spacepackets::time::{CdsShortTimeProvider, TimeWriter}; @@ -23,7 +26,6 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::mpsc::channel; use std::sync::{mpsc, Arc, Mutex, RwLock}; use std::thread; -use satrs_core::seq_count::SimpleSeqCountProvider; struct TmFunnel { tm_funnel_rx: mpsc::Receiver, @@ -38,6 +40,7 @@ struct UdpTmtcServer { unsafe impl Send for UdpTmtcServer {} +#[derive(Clone)] struct EventTmSender { store_helper: TmStore, sender: mpsc::Sender, @@ -72,7 +75,14 @@ fn main() { let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel(); let sender = MpscVerifSender::new(tm_store.clone(), tm_funnel_tx.clone()); - let verif_cfg = VerificationReporterCfg::new(PUS_APID, Box::new(SimpleSeqCountProvider::default()), 1, 2, 8).unwrap(); + let verif_cfg = VerificationReporterCfg::new( + PUS_APID, + Box::new(SimpleSeqCountProvider::default()), + 1, + 2, + 8, + ) + .unwrap(); let reporter_with_sender_0 = Arc::new(Mutex::new(VerificationReporterWithSender::new( &verif_cfg, Box::new(sender),