From b0c831d0dc13f75a2bf1501b43ce31fb07984b69 Mon Sep 17 00:00:00 2001 From: lkoester Date: Thu, 9 Mar 2023 09:03:14 +0100 Subject: [PATCH] added sequence counter to verification helper, increments after succesful sending, added very basic test (does it even increment?), needs some more tests probably --- satrs-core/src/pus/verification.rs | 179 ++++++++++++++++++++++++--- satrs-core/tests/pus_verification.rs | 1 + satrs-example/src/main.rs | 2 + 3 files changed, 162 insertions(+), 20 deletions(-) diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index 8a71d00..d1338ff 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -30,7 +30,7 @@ //! let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); //! let (verif_tx, verif_rx) = mpsc::channel(); //! let sender = MpscVerifSender::new(shared_tm_pool.clone(), verif_tx); -//! let cfg = VerificationReporterCfg::new(TEST_APID, Box::new(SeqCountProviderSimple::default()), 1, 2, 8).unwrap(); +//! let cfg = VerificationReporterCfg::new(TEST_APID, Box::new(SeqCountProviderSimple::default()), Box::new(SeqCountProviderSimple::default()), 1, 2, 8).unwrap(); //! let mut reporter = VerificationReporterWithSender::new(&cfg , Box::new(sender)); //! //! let mut sph = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap(); @@ -349,8 +349,10 @@ impl<'src_data, State> VerificationSendable<'src_data, State, VerifFailure> { pub fn send_success_verif_failure( self, seq_counter: Option<&(impl SequenceCountProviderCore + ?Sized)>, + msg_counter: Option<&(impl SequenceCountProviderCore + ?Sized)>, ) { - increment_seq_counter(seq_counter) + increment_seq_counter(seq_counter); + increment_seq_counter(msg_counter); } } @@ -364,8 +366,10 @@ impl<'src_data> VerificationSendable<'src_data, TcStateNone, VerifSuccess> { pub fn send_success_acceptance_success( self, seq_counter: Option<&(impl SequenceCountProviderCore + ?Sized)>, + msg_counter: Option<&(impl SequenceCountProviderCore + ?Sized)>, ) -> VerificationToken { increment_seq_counter(seq_counter); + increment_seq_counter(msg_counter); VerificationToken { state: PhantomData, req_id: self.token.unwrap().req_id(), @@ -377,8 +381,10 @@ impl<'src_data> VerificationSendable<'src_data, TcStateAccepted, VerifSuccess> { pub fn send_success_start_success( self, seq_counter: Option<&(impl SequenceCountProviderCore + ?Sized)>, + msg_counter: Option<&(impl SequenceCountProviderCore + ?Sized)>, ) -> VerificationToken { increment_seq_counter(seq_counter); + increment_seq_counter(msg_counter); VerificationToken { state: PhantomData, req_id: self.token.unwrap().req_id(), @@ -392,8 +398,10 @@ impl<'src_data, TcState: WasAtLeastAccepted + Copy> pub fn send_success_step_or_completion_success( self, seq_counter: Option<&(impl SequenceCountProviderCore + ?Sized)>, + msg_counter: Option<&(impl SequenceCountProviderCore + ?Sized)>, ) { increment_seq_counter(seq_counter); + increment_seq_counter(msg_counter); } } @@ -450,6 +458,7 @@ impl VerificationReporterCore { subservice: u8, token: VerificationToken, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), time_stamp: Option<&'src_data [u8]>, ) -> Result< VerificationSendable<'src_data, State, VerifSuccess>, @@ -460,6 +469,7 @@ impl VerificationReporterCore { src_data_buf, subservice, seq_counter.get(), + msg_counter.get(), &token.req_id, time_stamp, None::<&dyn EcssEnumeration>, @@ -475,6 +485,7 @@ impl VerificationReporterCore { subservice: u8, token: VerificationToken, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), step: Option<&(impl EcssEnumeration + ?Sized)>, params: &FailParams<'src_data, '_>, ) -> Result< @@ -486,6 +497,7 @@ impl VerificationReporterCore { src_data_buf, subservice, seq_counter.get(), + msg_counter.get(), &token.req_id, step, params, @@ -501,6 +513,7 @@ impl VerificationReporterCore { src_data_buf: &'src_data mut [u8], token: VerificationToken, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), time_stamp: Option<&'src_data [u8]>, ) -> Result< VerificationSendable<'src_data, TcStateNone, VerifSuccess>, @@ -511,6 +524,7 @@ impl VerificationReporterCore { Subservice::TmAcceptanceSuccess.into(), token, seq_counter, + msg_counter, time_stamp, ) } @@ -519,6 +533,7 @@ impl VerificationReporterCore { &self, mut sendable: VerificationSendable<'_, TcStateNone, VerifSuccess>, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), sender: &mut (impl EcssTmSenderCore + ?Sized), ) -> Result, VerificationOrSendErrorWithToken> { @@ -530,13 +545,14 @@ impl VerificationReporterCore { sendable.token.unwrap(), ) })?; - Ok(sendable.send_success_acceptance_success(Some(seq_counter))) + Ok(sendable.send_success_acceptance_success(Some(seq_counter), Some(msg_counter))) } pub fn send_acceptance_failure( &self, mut sendable: VerificationSendable<'_, TcStateNone, VerifFailure>, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), sender: &mut (impl EcssTmSenderCore + ?Sized), ) -> Result<(), VerificationOrSendErrorWithToken> { sender @@ -547,7 +563,7 @@ impl VerificationReporterCore { sendable.token.unwrap(), ) })?; - sendable.send_success_verif_failure(Some(seq_counter)); + sendable.send_success_verif_failure(Some(seq_counter), Some(msg_counter)); Ok(()) } @@ -557,6 +573,7 @@ impl VerificationReporterCore { src_data_buf: &'src_data mut [u8], token: VerificationToken, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), params: FailParams<'src_data, '_>, ) -> Result< VerificationSendable<'src_data, TcStateNone, VerifFailure>, @@ -567,6 +584,7 @@ impl VerificationReporterCore { Subservice::TmAcceptanceFailure.into(), token, seq_counter, + msg_counter, None::<&dyn EcssEnumeration>, ¶ms, ) @@ -580,6 +598,7 @@ impl VerificationReporterCore { src_data_buf: &'src_data mut [u8], token: VerificationToken, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), time_stamp: Option<&'src_data [u8]>, ) -> Result< VerificationSendable<'src_data, TcStateAccepted, VerifSuccess>, @@ -590,6 +609,7 @@ impl VerificationReporterCore { Subservice::TmStartSuccess.into(), token, seq_counter, + msg_counter, time_stamp, ) } @@ -598,6 +618,7 @@ impl VerificationReporterCore { &self, mut sendable: VerificationSendable<'_, TcStateAccepted, VerifSuccess>, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), sender: &mut (impl EcssTmSenderCore + ?Sized), ) -> Result< VerificationToken, @@ -611,7 +632,7 @@ impl VerificationReporterCore { sendable.token.unwrap(), ) })?; - Ok(sendable.send_success_start_success(Some(seq_counter))) + Ok(sendable.send_success_start_success(Some(seq_counter), Some(msg_counter))) } /// Package and send a PUS TM\[1, 4\] packet, see 8.1.2.4 of the PUS standard. @@ -623,6 +644,7 @@ impl VerificationReporterCore { src_data_buf: &'src_data mut [u8], token: VerificationToken, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), params: FailParams<'src_data, '_>, ) -> Result< VerificationSendable<'src_data, TcStateAccepted, VerifFailure>, @@ -633,6 +655,7 @@ impl VerificationReporterCore { Subservice::TmStartFailure.into(), token, seq_counter, + msg_counter, None::<&dyn EcssEnumeration>, ¶ms, ) @@ -642,6 +665,7 @@ impl VerificationReporterCore { &self, mut sendable: VerificationSendable<'_, TcStateAccepted, VerifFailure>, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), sender: &mut (impl EcssTmSenderCore + ?Sized), ) -> Result<(), VerificationOrSendErrorWithToken> { sender @@ -652,7 +676,7 @@ impl VerificationReporterCore { sendable.token.unwrap(), ) })?; - sendable.send_success_verif_failure(Some(seq_counter)); + sendable.send_success_verif_failure(Some(seq_counter), Some(msg_counter)); Ok(()) } @@ -664,6 +688,7 @@ impl VerificationReporterCore { src_data_buf: &'src_data mut [u8], token: &VerificationToken, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), time_stamp: Option<&'src_data [u8]>, step: impl EcssEnumeration, ) -> Result, EcssTmError> { @@ -672,6 +697,7 @@ impl VerificationReporterCore { src_data_buf, Subservice::TmStepSuccess.into(), seq_counter.get(), + msg_counter.get(), &token.req_id, time_stamp, Some(&step), @@ -688,6 +714,7 @@ impl VerificationReporterCore { src_data_buf: &'src_data mut [u8], token: VerificationToken, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), params: FailParamsWithStep<'src_data, '_>, ) -> Result< VerificationSendable<'src_data, TcStateStarted, VerifFailure>, @@ -698,6 +725,7 @@ impl VerificationReporterCore { src_data_buf, Subservice::TmStepFailure.into(), seq_counter.get(), + msg_counter.get(), &token.req_id, Some(params.step), ¶ms.bp, @@ -716,6 +744,7 @@ impl VerificationReporterCore { src_data_buf: &'src_data mut [u8], token: VerificationToken, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), time_stamp: Option<&'src_data [u8]>, ) -> Result< VerificationSendable<'src_data, TcState, VerifSuccess>, @@ -726,6 +755,7 @@ impl VerificationReporterCore { Subservice::TmCompletionSuccess.into(), token, seq_counter, + msg_counter, time_stamp, ) } @@ -739,6 +769,7 @@ impl VerificationReporterCore { src_data_buf: &'src_data mut [u8], token: VerificationToken, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), params: FailParams<'src_data, '_>, ) -> Result< VerificationSendable<'src_data, TcState, VerifFailure>, @@ -749,6 +780,7 @@ impl VerificationReporterCore { Subservice::TmCompletionFailure.into(), token, seq_counter, + msg_counter, None::<&dyn EcssEnumeration>, ¶ms, ) @@ -758,6 +790,7 @@ impl VerificationReporterCore { &self, mut sendable: VerificationSendable<'_, TcState, VerifSuccess>, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), sender: &mut (impl EcssTmSenderCore + ?Sized), ) -> Result<(), VerificationOrSendErrorWithToken> { sender @@ -768,7 +801,7 @@ impl VerificationReporterCore { sendable.token.unwrap(), ) })?; - sendable.send_success_step_or_completion_success(Some(seq_counter)); + sendable.send_success_step_or_completion_success(Some(seq_counter), Some(msg_counter)); Ok(()) } @@ -776,6 +809,7 @@ impl VerificationReporterCore { &self, mut sendable: VerificationSendable<'_, TcState, VerifFailure>, seq_counter: &(impl SequenceCountProviderCore + ?Sized), + msg_counter: &(impl SequenceCountProviderCore + ?Sized), sender: &mut (impl EcssTmSenderCore + ?Sized), ) -> Result<(), VerificationOrSendErrorWithToken> { sender @@ -786,7 +820,7 @@ impl VerificationReporterCore { sendable.token.unwrap(), ) })?; - sendable.send_success_verif_failure(Some(seq_counter)); + sendable.send_success_verif_failure(Some(seq_counter), Some(msg_counter)); Ok(()) } @@ -794,6 +828,7 @@ impl VerificationReporterCore { &mut self, src_data_buf: &'src_data mut [u8], subservice: u8, + seq_count: u16, msg_counter: u16, req_id: &RequestId, time_stamp: Option<&'src_data [u8]>, @@ -812,7 +847,7 @@ impl VerificationReporterCore { step.write_to_be_bytes(&mut src_data_buf[idx..idx + step.byte_width()]) .unwrap(); } - let mut sp_header = SpHeader::tm_unseg(self.apid(), 0, 0).unwrap(); + let mut sp_header = SpHeader::tm_unseg(self.apid(), seq_count, 0).unwrap(); Ok(self.create_pus_verif_tm_base( src_data_buf, subservice, @@ -827,6 +862,7 @@ impl VerificationReporterCore { &mut self, src_data_buf: &'src_data mut [u8], subservice: u8, + seq_count: u16, msg_counter: u16, req_id: &RequestId, step: Option<&(impl EcssEnumeration + ?Sized)>, @@ -856,7 +892,7 @@ impl VerificationReporterCore { if let Some(failure_data) = params.failure_data { src_data_buf[idx..idx + failure_data.len()].copy_from_slice(failure_data); } - let mut sp_header = SpHeader::tm_unseg(self.apid(), 0, 0).unwrap(); + let mut sp_header = SpHeader::tm_unseg(self.apid(), seq_count, 0).unwrap(); Ok(self.create_pus_verif_tm_base( src_data_buf, subservice, @@ -900,6 +936,7 @@ mod alloc_mod { pub struct VerificationReporterCfg { apid: u16, seq_counter: Box + Send>, + msg_counter: Box + Send>, pub step_field_width: usize, pub fail_code_field_width: usize, pub max_fail_data_len: usize, @@ -909,6 +946,7 @@ mod alloc_mod { pub fn new( apid: u16, seq_counter: Box + Send>, + msg_counter: Box + Send>, step_field_width: usize, fail_code_field_width: usize, max_fail_data_len: usize, @@ -919,6 +957,7 @@ mod alloc_mod { Some(Self { apid, seq_counter, + msg_counter, step_field_width, fail_code_field_width, max_fail_data_len, @@ -932,6 +971,7 @@ mod alloc_mod { pub struct VerificationReporter { source_data_buf: Vec, seq_counter: Box + Send + 'static>, + msg_counter: Box + Send + 'static>, pub reporter: VerificationReporterCore, } @@ -947,6 +987,7 @@ mod alloc_mod { + cfg.max_fail_data_len ], seq_counter: cfg.seq_counter.clone(), + msg_counter: cfg.msg_counter.clone(), reporter, } } @@ -980,10 +1021,15 @@ mod alloc_mod { self.source_data_buf.as_mut_slice(), token, self.seq_counter.as_ref(), + self.msg_counter.as_ref(), time_stamp, )?; - self.reporter - .send_acceptance_success(sendable, self.seq_counter.as_ref(), sender) + self.reporter.send_acceptance_success( + sendable, + self.seq_counter.as_ref(), + self.msg_counter.as_ref(), + sender, + ) } /// Package and send a PUS TM\[1, 2\] packet, see 8.1.2.2 of the PUS standard @@ -997,10 +1043,15 @@ mod alloc_mod { self.source_data_buf.as_mut_slice(), token, self.seq_counter.as_ref(), + self.msg_counter.as_ref(), params, )?; - self.reporter - .send_acceptance_failure(sendable, self.seq_counter.as_ref(), sender) + self.reporter.send_acceptance_failure( + sendable, + self.seq_counter.as_ref(), + self.msg_counter.as_ref(), + sender, + ) } /// Package and send a PUS TM\[1, 3\] packet, see 8.1.2.3 of the PUS standard. @@ -1019,10 +1070,15 @@ mod alloc_mod { self.source_data_buf.as_mut_slice(), token, self.seq_counter.as_mut(), + self.msg_counter.as_mut(), time_stamp, )?; - self.reporter - .send_start_success(sendable, self.seq_counter.as_ref(), sender) + self.reporter.send_start_success( + sendable, + self.seq_counter.as_ref(), + self.msg_counter.as_ref(), + sender, + ) } /// Package and send a PUS TM\[1, 4\] packet, see 8.1.2.4 of the PUS standard. @@ -1039,10 +1095,15 @@ mod alloc_mod { self.source_data_buf.as_mut_slice(), token, self.seq_counter.as_mut(), + self.msg_counter.as_mut(), params, )?; - self.reporter - .send_start_failure(sendable, self.seq_counter.as_ref(), sender) + self.reporter.send_start_failure( + sendable, + self.seq_counter.as_ref(), + self.msg_counter.as_ref(), + sender, + ) } /// Package and send a PUS TM\[1, 5\] packet, see 8.1.2.5 of the PUS standard. @@ -1059,11 +1120,17 @@ mod alloc_mod { self.source_data_buf.as_mut_slice(), token, self.seq_counter.as_mut(), + self.msg_counter.as_mut(), time_stamp, step, )?; self.reporter - .send_step_or_completion_success(sendable, self.seq_counter.as_ref(), sender) + .send_step_or_completion_success( + sendable, + self.seq_counter.as_ref(), + self.msg_counter.as_ref(), + sender, + ) .map_err(|e| e.0) } @@ -1081,11 +1148,13 @@ mod alloc_mod { self.source_data_buf.as_mut_slice(), token, self.seq_counter.as_mut(), + self.msg_counter.as_mut(), params, )?; self.reporter.send_step_or_completion_failure( sendable, self.seq_counter.as_ref(), + self.msg_counter.as_ref(), sender, ) } @@ -1104,11 +1173,13 @@ mod alloc_mod { self.source_data_buf.as_mut_slice(), token, self.seq_counter.as_mut(), + self.msg_counter.as_mut(), time_stamp, )?; self.reporter.send_step_or_completion_success( sendable, self.seq_counter.as_ref(), + self.msg_counter.as_ref(), sender, ) } @@ -1127,11 +1198,13 @@ mod alloc_mod { self.source_data_buf.as_mut_slice(), token, self.seq_counter.as_mut(), + self.msg_counter.as_mut(), params, )?; self.reporter.send_step_or_completion_failure( sendable, self.seq_counter.as_ref(), + self.msg_counter.as_ref(), sender, ) } @@ -1415,9 +1488,10 @@ mod tests { use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU8, EcssEnumeration, PusPacket}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tm::PusTm; - use spacepackets::{ByteConversionError, SpHeader}; + use spacepackets::{ByteConversionError, CcsdsPacket, SpHeader}; use std::collections::VecDeque; use std::sync::{mpsc, Arc, RwLock}; + use std::time::Duration; use std::vec; use std::vec::Vec; @@ -1504,6 +1578,7 @@ mod tests { let cfg = VerificationReporterCfg::new( TEST_APID, Box::new(SeqCountProviderSimple::default()), + Box::new(SeqCountProviderSimple::default()), 1, 2, 8, @@ -2238,4 +2313,68 @@ mod tests { let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); completion_success_check(sender, tok.req_id); } + + #[test] + // TODO: maybe a bit more extensive testing, all I have time for right now + fn test_seq_count_increment() { + let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); + let shared_tm_pool: SharedPool = + Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); + let (verif_tx, verif_rx) = mpsc::channel(); + let sender = MpscVerifSender::new(shared_tm_pool.clone(), verif_tx); + let cfg = VerificationReporterCfg::new( + TEST_APID, + Box::new(SeqCountProviderSimple::default()), + Box::new(SeqCountProviderSimple::default()), + 1, + 2, + 8, + ) + .unwrap(); + let mut reporter = VerificationReporterWithSender::new(&cfg, Box::new(sender)); + + let mut sph = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap(); + let tc_header = PusTcSecondaryHeader::new_simple(17, 1); + let pus_tc_0 = PusTc::new(&mut sph, tc_header, None, true); + let init_token = reporter.add_tc(&pus_tc_0); + + // Complete success sequence for a telecommand + let accepted_token = reporter + .acceptance_success(init_token, Some(&EMPTY_STAMP)) + .unwrap(); + let started_token = reporter + .start_success(accepted_token, Some(&EMPTY_STAMP)) + .unwrap(); + reporter + .completion_success(started_token, Some(&EMPTY_STAMP)) + .unwrap(); + + // Verify it arrives correctly on receiver end + let mut tm_buf: [u8; 1024] = [0; 1024]; + let mut packet_idx = 0; + while packet_idx < 3 { + let addr = verif_rx.recv_timeout(Duration::from_millis(10)).unwrap(); + let tm_len; + { + let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); + let store_guard = rg.read_with_guard(addr); + let slice = store_guard.read().expect("Error reading TM slice"); + tm_len = slice.len(); + tm_buf[0..tm_len].copy_from_slice(slice); + } + let (pus_tm, _) = + PusTm::from_bytes(&tm_buf[0..tm_len], 7).expect("Error reading verification TM"); + if packet_idx == 0 { + assert_eq!(pus_tm.subservice(), 1); + assert_eq!(pus_tm.sp_header.seq_count(), 0); + } else if packet_idx == 1 { + assert_eq!(pus_tm.subservice(), 3); + assert_eq!(pus_tm.sp_header.seq_count(), 1); + } else if packet_idx == 2 { + assert_eq!(pus_tm.subservice(), 7); + assert_eq!(pus_tm.sp_header.seq_count(), 2); + } + packet_idx += 1; + } + } } diff --git a/satrs-core/tests/pus_verification.rs b/satrs-core/tests/pus_verification.rs index e5ffad0..dd98faa 100644 --- a/satrs-core/tests/pus_verification.rs +++ b/satrs-core/tests/pus_verification.rs @@ -35,6 +35,7 @@ pub mod crossbeam_test { let cfg = VerificationReporterCfg::new( TEST_APID, Box::new(SeqCountProviderSyncClonable::default()), + Box::new(SeqCountProviderSyncClonable::default()), 1, 2, 8, diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index e7ec7e0..fe4b787 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -79,6 +79,8 @@ fn main() { PUS_APID, #[allow(clippy::box_default)] Box::new(SeqCountProviderSimple::default()), + #[allow(clippy::box_default)] + Box::new(SeqCountProviderSimple::default()), 1, 2, 8,