diff --git a/Cargo.lock b/Cargo.lock index ea90067..bfb4844 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -284,6 +284,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650" +[[package]] +name = "dyn-clone" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f94fa09c2aeea5b8839e414b7b841bf429fd25b9c522116ac97ee87856d88b2" + [[package]] name = "embed-doc-image" version = "0.1.4" @@ -597,6 +603,7 @@ dependencies = [ "crossbeam-channel", "delegate 0.8.0", "downcast-rs", + "dyn-clone", "embed-doc-image", "hashbrown", "heapless", diff --git a/satrs-core/Cargo.toml b/satrs-core/Cargo.toml index 6574eee..e57e4ce 100644 --- a/satrs-core/Cargo.toml +++ b/satrs-core/Cargo.toml @@ -10,6 +10,7 @@ delegate = "0.8" hashbrown = "0.13" heapless = "0.7" paste = "1.0" +dyn-clone = "1.0.9" embed-doc-image = "0.1" [dependencies.num-traits] diff --git a/satrs-core/src/lib.rs b/satrs-core/src/lib.rs index 8655fe6..4fda814 100644 --- a/satrs-core/src/lib.rs +++ b/satrs-core/src/lib.rs @@ -29,4 +29,5 @@ pub mod params; #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] pub mod pool; pub mod pus; +pub mod seq_count; pub mod tmtc; diff --git a/satrs-core/src/pus/event.rs b/satrs-core/src/pus/event.rs index 10e0618..7e80482 100644 --- a/satrs-core/src/pus/event.rs +++ b/satrs-core/src/pus/event.rs @@ -298,14 +298,14 @@ mod tests { #[allow(dead_code)] const EXAMPLE_EVENT_ID_1: u16 = 2; - #[derive(Debug, Eq, PartialEq)] + #[derive(Debug, Eq, PartialEq, Clone)] struct TmInfo { pub common: CommonTmInfo, pub event: EventU32, pub aux_data: Vec, } - #[derive(Default)] + #[derive(Default, Clone)] struct TestSender { pub service_queue: VecDeque, } diff --git a/satrs-core/src/pus/event_man.rs b/satrs-core/src/pus/event_man.rs index 4232258..d747222 100644 --- a/satrs-core/src/pus/event_man.rs +++ b/satrs-core/src/pus/event_man.rs @@ -233,6 +233,7 @@ mod tests { const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5); const EMPTY_STAMP: [u8; 7] = [0; 7]; + #[derive(Clone)] struct EventTmSender { sender: std::sync::mpsc::Sender>, } diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index 0703a8e..fa0e396 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -4,6 +4,7 @@ //! //! 1. PUS Verification Service 1 module inside [verification]. Requires [alloc] support. use downcast_rs::{impl_downcast, Downcast}; +use dyn_clone::DynClone; use spacepackets::ecss::PusError; use spacepackets::time::TimestampError; use spacepackets::tm::PusTm; @@ -43,13 +44,14 @@ impl From for EcssTmError { /// This sender object is responsible for sending telemetry to a TM sink. The [Downcast] trait /// is implemented to allow passing the sender as a boxed trait object and still retrieve the /// concrete type at a later point. -pub trait EcssTmSender: Downcast + Send { +pub trait EcssTmSender: Downcast + Send + DynClone { type Error; fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmError>; } impl_downcast!(EcssTmSender assoc Error); +dyn_clone::clone_trait_object!( EcssTmSender); pub(crate) fn source_buffer_large_enough(cap: usize, len: usize) -> Result<(), EcssTmError> { if len > cap { @@ -68,7 +70,7 @@ pub(crate) mod tests { use spacepackets::tm::{PusTm, PusTmSecondaryHeaderT}; use spacepackets::CcsdsPacket; - #[derive(Debug, Eq, PartialEq)] + #[derive(Debug, Eq, PartialEq, Clone)] pub(crate) struct CommonTmInfo { pub subservice: u8, pub apid: u16, diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index b9b0949..750cabd 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -17,6 +17,7 @@ //! use std::time::Duration; //! use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; //! use satrs_core::pus::verification::{CrossbeamVerifSender, VerificationReporterCfg, VerificationReporterWithSender}; +//! use satrs_core::seq_count::SimpleSeqCountProvider; //! use spacepackets::ecss::PusPacket; //! use spacepackets::SpHeader; //! use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; @@ -29,8 +30,8 @@ //! let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); //! let (verif_tx, verif_rx) = crossbeam_channel::bounded(10); //! let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), verif_tx); -//! let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); -//! let mut reporter = VerificationReporterWithSender::new(cfg , Box::new(sender)); +//! let cfg = VerificationReporterCfg::new(TEST_APID, Box::new(SimpleSeqCountProvider::default()), 1, 2, 8).unwrap(); +//! let mut reporter = VerificationReporterWithSender::new(&cfg , Box::new(sender)); //! //! let mut sph = SpHeader::tc(TEST_APID, 0, 0).unwrap(); //! let tc_header = PusTcSecondaryHeader::new_simple(17, 1); @@ -83,9 +84,14 @@ use spacepackets::tm::{PusTm, PusTmSecondaryHeader}; use spacepackets::{CcsdsPacket, PacketId, PacketSequenceCtrl}; use spacepackets::{SpHeader, MAX_APID}; -#[cfg(feature = "alloc")] -pub use allocmod::{VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender}; +pub use crate::seq_count::SimpleSeqCountProvider; +#[cfg(feature = "alloc")] +pub use allocmod::{ + VerificationReporterCfg, VerificationReporterWithBuf, VerificationReporterWithSender, +}; + +use crate::seq_count::SequenceCountProvider; #[cfg(feature = "std")] pub use stdmod::{ CrossbeamVerifSender, MpscVerifSender, SharedStdVerifReporterWithSender, @@ -276,10 +282,10 @@ impl<'a> FailParamsWithStep<'a> { } } +#[derive(Clone)] pub struct VerificationReporterBasic { pub dest_id: u16, apid: u16, - msg_count: u16, } impl VerificationReporterBasic { @@ -287,11 +293,7 @@ impl VerificationReporterBasic { if apid > MAX_APID { return None; } - Some(Self { - apid, - msg_count: 0, - dest_id: 0, - }) + Some(Self { apid, dest_id: 0 }) } pub fn set_apid(&mut self, apid: u16) -> bool { @@ -332,6 +334,7 @@ impl VerificationReporterBasic { buf: &mut [u8], token: VerificationToken, sender: &mut (impl EcssTmSender + ?Sized), + seq_counter: &mut (impl SequenceCountProvider + ?Sized), time_stamp: &[u8], ) -> Result, VerificationErrorWithToken> { @@ -339,6 +342,7 @@ impl VerificationReporterBasic { .create_pus_verif_success_tm( buf, Subservices::TmAcceptanceSuccess.into(), + seq_counter.get(), &token.req_id, time_stamp, None::<&dyn EcssEnumeration>, @@ -347,7 +351,8 @@ impl VerificationReporterBasic { sender .send_tm(tm) .map_err(|e| VerificationErrorWithToken(e, token))?; - self.msg_count += 1; + seq_counter.increment(); + //seq_counter.get_and_increment() Ok(VerificationToken { state: PhantomData, req_id: token.req_id, @@ -360,12 +365,14 @@ impl VerificationReporterBasic { buf: &mut [u8], token: VerificationToken, sender: &mut (impl EcssTmSender + ?Sized), + seq_counter: &mut (impl SequenceCountProvider + ?Sized), params: FailParams, ) -> Result<(), VerificationErrorWithToken> { let tm = self .create_pus_verif_fail_tm( buf, Subservices::TmAcceptanceFailure.into(), + seq_counter.get(), &token.req_id, None::<&dyn EcssEnumeration>, ¶ms, @@ -374,7 +381,7 @@ impl VerificationReporterBasic { sender .send_tm(tm) .map_err(|e| VerificationErrorWithToken(e, token))?; - self.msg_count += 1; + seq_counter.increment(); Ok(()) } @@ -386,6 +393,7 @@ impl VerificationReporterBasic { buf: &mut [u8], token: VerificationToken, sender: &mut (impl EcssTmSender + ?Sized), + seq_counter: &mut (impl SequenceCountProvider + ?Sized), time_stamp: &[u8], ) -> Result, VerificationErrorWithToken> { @@ -393,6 +401,7 @@ impl VerificationReporterBasic { .create_pus_verif_success_tm( buf, Subservices::TmStartSuccess.into(), + seq_counter.get(), &token.req_id, time_stamp, None::<&dyn EcssEnumeration>, @@ -401,7 +410,7 @@ impl VerificationReporterBasic { sender .send_tm(tm) .map_err(|e| VerificationErrorWithToken(e, token))?; - self.msg_count += 1; + seq_counter.increment(); Ok(VerificationToken { state: PhantomData, req_id: token.req_id, @@ -417,12 +426,14 @@ impl VerificationReporterBasic { buf: &mut [u8], token: VerificationToken, sender: &mut (impl EcssTmSender + ?Sized), + seq_counter: &mut (impl SequenceCountProvider + ?Sized), params: FailParams, ) -> Result<(), VerificationErrorWithToken> { let tm = self .create_pus_verif_fail_tm( buf, Subservices::TmStartFailure.into(), + seq_counter.get(), &token.req_id, None::<&dyn EcssEnumeration>, ¶ms, @@ -431,7 +442,7 @@ impl VerificationReporterBasic { sender .send_tm(tm) .map_err(|e| VerificationErrorWithToken(e, token))?; - self.msg_count += 1; + seq_counter.increment(); Ok(()) } @@ -443,18 +454,20 @@ impl VerificationReporterBasic { buf: &mut [u8], token: &VerificationToken, sender: &mut (impl EcssTmSender + ?Sized), + seq_counter: &mut (impl SequenceCountProvider + ?Sized), time_stamp: &[u8], step: impl EcssEnumeration, ) -> Result<(), EcssTmError> { let tm = self.create_pus_verif_success_tm( buf, Subservices::TmStepSuccess.into(), + seq_counter.get(), &token.req_id, time_stamp, Some(&step), )?; sender.send_tm(tm)?; - self.msg_count += 1; + seq_counter.increment(); Ok(()) } @@ -467,12 +480,14 @@ impl VerificationReporterBasic { buf: &mut [u8], token: VerificationToken, sender: &mut (impl EcssTmSender + ?Sized), + seq_counter: &mut (impl SequenceCountProvider + ?Sized), params: FailParamsWithStep, ) -> Result<(), VerificationErrorWithToken> { let tm = self .create_pus_verif_fail_tm( buf, Subservices::TmStepFailure.into(), + seq_counter.get(), &token.req_id, Some(params.step), ¶ms.bp, @@ -481,7 +496,7 @@ impl VerificationReporterBasic { sender .send_tm(tm) .map_err(|e| VerificationErrorWithToken(e, token))?; - self.msg_count += 1; + seq_counter.increment(); Ok(()) } @@ -494,12 +509,14 @@ impl VerificationReporterBasic { buf: &mut [u8], token: VerificationToken, sender: &mut (impl EcssTmSender + ?Sized), + seq_counter: &mut (impl SequenceCountProvider + ?Sized), time_stamp: &[u8], ) -> Result<(), VerificationErrorWithToken> { let tm = self .create_pus_verif_success_tm( buf, Subservices::TmCompletionSuccess.into(), + seq_counter.get(), &token.req_id, time_stamp, None::<&dyn EcssEnumeration>, @@ -508,7 +525,7 @@ impl VerificationReporterBasic { sender .send_tm(tm) .map_err(|e| VerificationErrorWithToken(e, token))?; - self.msg_count += 1; + seq_counter.increment(); Ok(()) } @@ -521,12 +538,14 @@ impl VerificationReporterBasic { buf: &mut [u8], token: VerificationToken, sender: &mut (impl EcssTmSender + ?Sized), + seq_counter: &mut (impl SequenceCountProvider + ?Sized), params: FailParams, ) -> Result<(), VerificationErrorWithToken> { let tm = self .create_pus_verif_fail_tm( buf, Subservices::TmCompletionFailure.into(), + seq_counter.get(), &token.req_id, None::<&dyn EcssEnumeration>, ¶ms, @@ -535,7 +554,7 @@ impl VerificationReporterBasic { sender .send_tm(tm) .map_err(|e| VerificationErrorWithToken(e, token))?; - self.msg_count += 1; + seq_counter.increment(); Ok(()) } @@ -543,6 +562,7 @@ impl VerificationReporterBasic { &'a mut self, buf: &'a mut [u8], subservice: u8, + msg_counter: u16, req_id: &RequestId, time_stamp: &'a [u8], step: Option<&(impl EcssEnumeration + ?Sized)>, @@ -564,6 +584,7 @@ impl VerificationReporterBasic { Ok(self.create_pus_verif_tm_base( buf, subservice, + msg_counter, &mut sp_header, time_stamp, source_data_len, @@ -574,6 +595,7 @@ impl VerificationReporterBasic { &'a mut self, buf: &'a mut [u8], subservice: u8, + msg_counter: u16, req_id: &RequestId, step: Option<&(impl EcssEnumeration + ?Sized)>, params: &'a FailParams, @@ -607,6 +629,7 @@ impl VerificationReporterBasic { Ok(self.create_pus_verif_tm_base( buf, subservice, + msg_counter, &mut sp_header, params.time_stamp, source_data_len, @@ -617,12 +640,13 @@ impl VerificationReporterBasic { &'a mut self, buf: &'a mut [u8], subservice: u8, + msg_counter: u16, sp_header: &mut SpHeader, time_stamp: &'a [u8], source_data_len: usize, ) -> PusTm { let tm_sec_header = - PusTmSecondaryHeader::new(1, subservice, self.msg_count, self.dest_id, time_stamp); + PusTmSecondaryHeader::new(1, subservice, msg_counter, self.dest_id, time_stamp); PusTm::new( sp_header, tm_sec_header, @@ -639,8 +663,10 @@ mod allocmod { use alloc::vec; use alloc::vec::Vec; + #[derive(Clone)] pub struct VerificationReporterCfg { apid: u16, + seq_counter: Box + Send>, pub step_field_width: usize, pub fail_code_field_width: usize, pub max_fail_data_len: usize, @@ -649,6 +675,7 @@ mod allocmod { impl VerificationReporterCfg { pub fn new( apid: u16, + seq_counter: Box + Send>, step_field_width: usize, fail_code_field_width: usize, max_fail_data_len: usize, @@ -658,6 +685,7 @@ mod allocmod { } Some(Self { apid, + seq_counter, step_field_width, fail_code_field_width, max_fail_data_len, @@ -667,13 +695,15 @@ mod allocmod { /// Primary verification handler. It provides an API to send PUS 1 verification telemetry packets /// and verify the various steps of telecommand handling as specified in the PUS standard. - pub struct VerificationReporter { + #[derive(Clone)] + pub struct VerificationReporterWithBuf { source_data_buf: Vec, + seq_counter: Box + Send + 'static>, pub reporter: VerificationReporterBasic, } - impl VerificationReporter { - pub fn new(cfg: VerificationReporterCfg) -> Self { + impl VerificationReporterWithBuf { + pub fn new(cfg: &VerificationReporterCfg) -> Self { let reporter = VerificationReporterBasic::new(cfg.apid).unwrap(); Self { source_data_buf: vec![ @@ -683,6 +713,7 @@ mod allocmod { + cfg.fail_code_field_width as usize + cfg.max_fail_data_len ], + seq_counter: cfg.seq_counter.clone(), reporter, } } @@ -714,6 +745,7 @@ mod allocmod { self.source_data_buf.as_mut_slice(), token, sender, + self.seq_counter.as_mut(), time_stamp, ) } @@ -729,6 +761,7 @@ mod allocmod { self.source_data_buf.as_mut_slice(), token, sender, + self.seq_counter.as_mut(), params, ) } @@ -747,6 +780,7 @@ mod allocmod { self.source_data_buf.as_mut_slice(), token, sender, + self.seq_counter.as_mut(), time_stamp, ) } @@ -761,8 +795,13 @@ mod allocmod { sender: &mut (impl EcssTmSender + ?Sized), params: FailParams, ) -> Result<(), VerificationErrorWithToken> { - self.reporter - .start_failure(self.source_data_buf.as_mut_slice(), token, sender, params) + self.reporter.start_failure( + self.source_data_buf.as_mut_slice(), + token, + sender, + self.seq_counter.as_mut(), + params, + ) } /// Package and send a PUS TM\[1, 5\] packet, see 8.1.2.5 of the PUS standard. @@ -779,6 +818,7 @@ mod allocmod { self.source_data_buf.as_mut_slice(), token, sender, + self.seq_counter.as_mut(), time_stamp, step, ) @@ -794,8 +834,13 @@ mod allocmod { sender: &mut (impl EcssTmSender + ?Sized), params: FailParamsWithStep, ) -> Result<(), VerificationErrorWithToken> { - self.reporter - .step_failure(self.source_data_buf.as_mut_slice(), token, sender, params) + self.reporter.step_failure( + self.source_data_buf.as_mut_slice(), + token, + sender, + self.seq_counter.as_mut(), + params, + ) } /// Package and send a PUS TM\[1, 7\] packet, see 8.1.2.7 of the PUS standard. @@ -812,6 +857,7 @@ mod allocmod { self.source_data_buf.as_mut_slice(), token, sender, + self.seq_counter.as_mut(), time_stamp, ) } @@ -830,6 +876,7 @@ mod allocmod { self.source_data_buf.as_mut_slice(), token, sender, + self.seq_counter.as_mut(), params, ) } @@ -837,19 +884,23 @@ mod allocmod { /// Helper object which caches the sender passed as a trait object. Provides the same /// API as [VerificationReporter] but without the explicit sender arguments. + #[derive(Clone)] pub struct VerificationReporterWithSender { - pub reporter: VerificationReporter, + pub reporter: VerificationReporterWithBuf, pub sender: Box>, } impl VerificationReporterWithSender { - pub fn new(cfg: VerificationReporterCfg, sender: Box>) -> Self { - let reporter = VerificationReporter::new(cfg); + pub fn new( + cfg: &VerificationReporterCfg, + sender: Box>, + ) -> Self { + let reporter = VerificationReporterWithBuf::new(cfg); Self::new_from_reporter(reporter, sender) } pub fn new_from_reporter( - reporter: VerificationReporter, + reporter: VerificationReporterWithBuf, sender: Box>, ) -> Self { Self { reporter, sender } @@ -955,7 +1006,7 @@ mod stdmod { pub type StdVerifReporterWithSender = VerificationReporterWithSender; pub type SharedStdVerifReporterWithSender = Arc>; - #[derive(Debug, Eq, PartialEq)] + #[derive(Debug, Eq, PartialEq, Clone)] pub enum StdVerifSenderError { PoisonError, StoreError(StoreError), @@ -978,6 +1029,7 @@ mod stdmod { fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr>; } + #[derive(Clone)] struct StdSenderBase { pub ignore_poison_error: bool, tm_store: SharedPool, @@ -1003,6 +1055,7 @@ mod stdmod { } } + #[derive(Clone)] pub struct MpscVerifSender { base: StdSenderBase>, } @@ -1038,6 +1091,7 @@ mod stdmod { /// Verification sender with a [crossbeam_channel::Sender] backend. /// It implements the [EcssTmSender] trait to be used as PUS Verification TM sender + #[derive(Clone)] pub struct CrossbeamVerifSender { base: StdSenderBase>, } @@ -1064,7 +1118,7 @@ mod stdmod { unsafe impl Sync for CrossbeamVerifSender {} unsafe impl Send for CrossbeamVerifSender {} - impl EcssTmSender for StdSenderBase { + impl EcssTmSender for StdSenderBase { type Error = StdVerifSenderError; fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmError> { let operation = |mut mg: RwLockWriteGuard| { @@ -1095,9 +1149,10 @@ mod tests { use crate::pus::tests::CommonTmInfo; use crate::pus::verification::{ EcssTmError, EcssTmSender, FailParams, FailParamsWithStep, RequestId, TcStateNone, - VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender, + VerificationReporterCfg, VerificationReporterWithBuf, VerificationReporterWithSender, VerificationToken, }; + use crate::seq_count::SimpleSeqCountProvider; use alloc::boxed::Box; use alloc::format; use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU8, EcssEnumeration, PusPacket}; @@ -1110,14 +1165,14 @@ mod tests { const TEST_APID: u16 = 0x02; const EMPTY_STAMP: [u8; 7] = [0; 7]; - #[derive(Debug, Eq, PartialEq)] + #[derive(Debug, Eq, PartialEq, Clone)] struct TmInfo { pub common: CommonTmInfo, pub req_id: RequestId, pub additional_data: Option>, } - #[derive(Default)] + #[derive(Default, Clone)] struct TestSender { pub service_queue: VecDeque, } @@ -1149,7 +1204,7 @@ mod tests { #[derive(Debug, Copy, Clone, Eq, PartialEq)] struct DummyError {} - #[derive(Default)] + #[derive(Default, Clone)] struct FallibleSender {} impl EcssTmSender for FallibleSender { @@ -1160,13 +1215,13 @@ mod tests { } struct TestBase<'a> { - vr: VerificationReporter, + vr: VerificationReporterWithBuf, #[allow(dead_code)] tc: PusTc<'a>, } impl<'a> TestBase<'a> { - fn rep(&mut self) -> &mut VerificationReporter { + fn rep(&mut self) -> &mut VerificationReporterWithBuf { &mut self.vr } } @@ -1177,14 +1232,21 @@ mod tests { } impl<'a, E> TestBaseWithHelper<'a, E> { - fn rep(&mut self) -> &mut VerificationReporter { + fn rep(&mut self) -> &mut VerificationReporterWithBuf { &mut self.helper.reporter } } - fn base_reporter() -> VerificationReporter { - let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); - VerificationReporter::new(cfg) + fn base_reporter() -> VerificationReporterWithBuf { + let cfg = VerificationReporterCfg::new( + TEST_APID, + Box::new(SimpleSeqCountProvider::default()), + 1, + 2, + 8, + ) + .unwrap(); + VerificationReporterWithBuf::new(&cfg) } fn base_tc_init(app_data: Option<&[u8]>) -> (PusTc, RequestId) { diff --git a/satrs-core/src/seq_count.rs b/satrs-core/src/seq_count.rs new file mode 100644 index 0000000..ff27168 --- /dev/null +++ b/satrs-core/src/seq_count.rs @@ -0,0 +1,60 @@ +use dyn_clone::DynClone; +#[cfg(feature = "std")] +pub use stdmod::*; + +pub trait SequenceCountProvider: DynClone { + fn get(&self) -> Raw; + fn increment(&mut self); + fn get_and_increment(&mut self) -> Raw { + let val = self.get(); + self.increment(); + val + } +} + +#[derive(Default, Clone)] +pub struct SimpleSeqCountProvider { + seq_count: u16, +} + +dyn_clone::clone_trait_object!(SequenceCountProvider); + +impl SequenceCountProvider for SimpleSeqCountProvider { + fn get(&self) -> u16 { + self.seq_count + } + + fn increment(&mut self) { + if self.seq_count == u16::MAX { + self.seq_count = 0; + return; + } + self.seq_count += 1; + } +} + +#[cfg(feature = "std")] +pub mod stdmod { + use super::*; + use std::sync::atomic::{AtomicU16, Ordering}; + use std::sync::Arc; + + #[derive(Clone, Default)] + pub struct SyncSeqCountProvider { + seq_count: Arc, + } + + impl SequenceCountProvider for SyncSeqCountProvider { + fn get(&self) -> u16 { + self.seq_count.load(Ordering::SeqCst) + } + + fn increment(&mut self) { + self.seq_count.fetch_add(1, Ordering::SeqCst); + } + + fn get_and_increment(&mut self) -> u16 { + self.seq_count.fetch_add(1, Ordering::SeqCst) + } + } +} diff --git a/satrs-core/tests/pus_events.rs b/satrs-core/tests/pus_events.rs index 77dc8f2..82fabd7 100644 --- a/satrs-core/tests/pus_events.rs +++ b/satrs-core/tests/pus_events.rs @@ -4,7 +4,9 @@ use satrs_core::event_man::{ use satrs_core::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo}; use satrs_core::params::U32Pair; use satrs_core::params::{Params, ParamsHeapless, WritableToBeBytes}; -use satrs_core::pus::event_man::{DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher}; +use satrs_core::pus::event_man::{ + DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher, +}; use satrs_core::pus::{EcssTmError, EcssTmSender}; use spacepackets::ecss::PusPacket; use spacepackets::tm::PusTm; @@ -16,6 +18,7 @@ const INFO_EVENT: EventU32TypedSev = const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5); const EMPTY_STAMP: [u8; 7] = [0; 7]; +#[derive(Clone)] struct EventTmSender { sender: std::sync::mpsc::Sender>, } diff --git a/satrs-core/tests/pus_verification.rs b/satrs-core/tests/pus_verification.rs index b6c36ff..a6a5219 100644 --- a/satrs-core/tests/pus_verification.rs +++ b/satrs-core/tests/pus_verification.rs @@ -1,14 +1,15 @@ +use hashbrown::HashMap; use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; use satrs_core::pus::verification::{ CrossbeamVerifSender, FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender, }; -use hashbrown::HashMap; +use satrs_core::seq_count::SyncSeqCountProvider; use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tm::PusTm; use spacepackets::SpHeader; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; @@ -25,7 +26,17 @@ const PACKETS_SENT: u8 = 8; /// threads have sent the correct expected verification reports #[test] fn test_shared_reporter() { - let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); + // We use a synced sequence count provider here because both verification reporters have the + // the same APID. If they had distinct APIDs, the more correct approach would be to have + // each reporter have an own sequence count provider. + let cfg = VerificationReporterCfg::new( + TEST_APID, + Box::new(SyncSeqCountProvider::default()), + 1, + 2, + 8, + ) + .unwrap(); // Shared pool object to store the verification PUS telemetry let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); let shared_tm_pool: SharedPool = @@ -34,11 +45,8 @@ fn test_shared_reporter() { let shared_tc_pool_1 = shared_tc_pool_0.clone(); let (tx, rx) = crossbeam_channel::bounded(5); let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), tx.clone()); - let reporter_with_sender_0 = Arc::new(Mutex::new(VerificationReporterWithSender::new( - cfg, - Box::new(sender), - ))); - let reporter_with_sender_1 = reporter_with_sender_0.clone(); + let mut reporter_with_sender_0 = VerificationReporterWithSender::new(&cfg, Box::new(sender)); + let mut reporter_with_sender_1 = reporter_with_sender_0.clone(); // For test purposes, we retrieve the request ID from the TCs and pass them to the receiver // tread. let req_id_0; @@ -78,29 +86,28 @@ fn test_shared_reporter() { } let (_tc, _) = PusTc::from_bytes(&tc_buf[0..tc_len]).unwrap(); let accepted_token; - { - let mut mg = reporter_with_sender_0.lock().expect("Locking mutex failed"); - let token = mg.add_tc_with_req_id(req_id_0); - accepted_token = mg - .acceptance_success(token, &FIXED_STAMP) - .expect("Acceptance success failed"); - } + + let token = reporter_with_sender_0.add_tc_with_req_id(req_id_0); + accepted_token = reporter_with_sender_0 + .acceptance_success(token, &FIXED_STAMP) + .expect("Acceptance success failed"); + // Do some start handling here let started_token; - { - let mut mg = reporter_with_sender_0.lock().expect("Locking mutex failed"); - started_token = mg - .start_success(accepted_token, &FIXED_STAMP) - .expect("Start success failed"); - // Do some step handling here - mg.step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(0)) - .expect("Start success failed"); - } - // Finish up - let mut mg = reporter_with_sender_0.lock().expect("Locking mutex failed"); - mg.step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(1)) + started_token = reporter_with_sender_0 + .start_success(accepted_token, &FIXED_STAMP) .expect("Start success failed"); - mg.completion_success(started_token, &FIXED_STAMP) + // Do some step handling here + reporter_with_sender_0 + .step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(0)) + .expect("Start success failed"); + + // Finish up + reporter_with_sender_0 + .step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(1)) + .expect("Start success failed"); + reporter_with_sender_0 + .completion_success(started_token, &FIXED_STAMP) .expect("Completion success failed"); }); @@ -118,19 +125,17 @@ fn test_shared_reporter() { tc_buf[0..tc_len].copy_from_slice(buf); } let (tc, _) = PusTc::from_bytes(&tc_buf[0..tc_len]).unwrap(); - let mut mg = reporter_with_sender_1 - .lock() - .expect("Locking reporter failed"); - let token = mg.add_tc(&tc); - let accepted_token = mg + let token = reporter_with_sender_1.add_tc(&tc); + let accepted_token = reporter_with_sender_1 .acceptance_success(token, &FIXED_STAMP) .expect("Acceptance success failed"); - let started_token = mg + let started_token = reporter_with_sender_1 .start_success(accepted_token, &FIXED_STAMP) .expect("Start success failed"); let fail_code = EcssEnumU16::new(2); let params = FailParams::new(&FIXED_STAMP, &fail_code, None); - mg.completion_failure(started_token, params) + reporter_with_sender_1 + .completion_failure(started_token, params) .expect("Completion success failed"); }); diff --git a/satrs-example/src/bin/test.rs b/satrs-example/src/bin/test.rs index d88c902..41fb8e3 100644 --- a/satrs-example/src/bin/test.rs +++ b/satrs-example/src/bin/test.rs @@ -1,5 +1,7 @@ #![allow(dead_code)] + use crossbeam_channel::{bounded, Receiver, Sender}; +use std::sync::atomic::{AtomicU16, Ordering}; use std::thread; use zerocopy::{AsBytes, FromBytes, NetworkEndian, Unaligned, U16}; @@ -52,4 +54,13 @@ fn main() { let jh1 = thread::spawn(|| {}); jh0.join().unwrap(); jh1.join().unwrap(); + //let mut max_val: u16 = u16::MAX; + //max_val += 1; + //println!("Max val: {}", max_val); + let atomic_u16: AtomicU16 = AtomicU16::new(u16::MAX); + atomic_u16.fetch_add(1, Ordering::SeqCst); + println!( + "atomic after overflow: {}", + atomic_u16.load(Ordering::SeqCst) + ); } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index e8ab556..836d9ab 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -17,6 +17,7 @@ use satrs_core::pus::verification::{ MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender, }; use satrs_core::pus::{EcssTmError, EcssTmSender}; +use satrs_core::seq_count::SimpleSeqCountProvider; use satrs_core::tmtc::CcsdsError; use satrs_example::{OBSW_SERVER_ADDR, SERVER_PORT}; use spacepackets::time::{CdsShortTimeProvider, TimeWriter}; @@ -39,6 +40,7 @@ struct UdpTmtcServer { unsafe impl Send for UdpTmtcServer {} +#[derive(Clone)] struct EventTmSender { store_helper: TmStore, sender: mpsc::Sender, @@ -73,9 +75,16 @@ fn main() { let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel(); let sender = MpscVerifSender::new(tm_store.clone(), tm_funnel_tx.clone()); - let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap(); + let verif_cfg = VerificationReporterCfg::new( + PUS_APID, + Box::new(SimpleSeqCountProvider::default()), + 1, + 2, + 8, + ) + .unwrap(); let reporter_with_sender_0 = Arc::new(Mutex::new(VerificationReporterWithSender::new( - verif_cfg, + &verif_cfg, Box::new(sender), )));