trying to avoid locking whole verification reporter

This commit is contained in:
Robin Müller 2022-11-21 10:16:15 +01:00
parent d6ae213a6e
commit 43a1fb90ce
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
7 changed files with 142 additions and 41 deletions

7
Cargo.lock generated
View File

@ -284,6 +284,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650"
[[package]]
name = "dyn-clone"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f94fa09c2aeea5b8839e414b7b841bf429fd25b9c522116ac97ee87856d88b2"
[[package]]
name = "embed-doc-image"
version = "0.1.4"
@ -597,6 +603,7 @@ dependencies = [
"crossbeam-channel",
"delegate 0.8.0",
"downcast-rs",
"dyn-clone",
"embed-doc-image",
"hashbrown",
"heapless",

View File

@ -10,6 +10,7 @@ delegate = "0.8"
hashbrown = "0.13"
heapless = "0.7"
paste = "1.0"
dyn-clone = "1.0.9"
embed-doc-image = "0.1"
[dependencies.num-traits]

View File

@ -30,3 +30,4 @@ pub mod params;
pub mod pool;
pub mod pus;
pub mod tmtc;
pub mod seq_count;

View File

@ -17,6 +17,7 @@
//! use std::time::Duration;
//! use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool};
//! use satrs_core::pus::verification::{CrossbeamVerifSender, VerificationReporterCfg, VerificationReporterWithSender};
//! use satrs_core::seq_count::SimpleSeqCountProvider;
//! use spacepackets::ecss::PusPacket;
//! use spacepackets::SpHeader;
//! use spacepackets::tc::{PusTc, PusTcSecondaryHeader};
@ -29,7 +30,7 @@
//! let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone()))));
//! let (verif_tx, verif_rx) = crossbeam_channel::bounded(10);
//! let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), verif_tx);
//! let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
//! let cfg = VerificationReporterCfg::new(TEST_APID, Box::new(SimpleSeqCountProvider::default()), 1, 2, 8).unwrap();
//! let mut reporter = VerificationReporterWithSender::new(cfg , Box::new(sender));
//!
//! let mut sph = SpHeader::tc(TEST_APID, 0, 0).unwrap();
@ -83,14 +84,17 @@ use spacepackets::tm::{PusTm, PusTmSecondaryHeader};
use spacepackets::{CcsdsPacket, PacketId, PacketSequenceCtrl};
use spacepackets::{SpHeader, MAX_APID};
pub use crate::seq_count::SimpleSeqCountProvider;
#[cfg(feature = "alloc")]
pub use allocmod::{VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender};
pub use allocmod::{VerificationReporterWithBuf, VerificationReporterCfg, VerificationReporterWithSender};
#[cfg(feature = "std")]
pub use stdmod::{
CrossbeamVerifSender, MpscVerifSender, SharedStdVerifReporterWithSender,
StdVerifReporterWithSender, StdVerifSenderError,
};
use crate::seq_count::SequenceCountProvider;
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub enum Subservices {
@ -276,10 +280,10 @@ impl<'a> FailParamsWithStep<'a> {
}
}
#[derive(Clone)]
pub struct VerificationReporterBasic {
pub dest_id: u16,
apid: u16,
msg_count: u16,
}
impl VerificationReporterBasic {
@ -289,7 +293,6 @@ impl VerificationReporterBasic {
}
Some(Self {
apid,
msg_count: 0,
dest_id: 0,
})
}
@ -332,6 +335,7 @@ impl VerificationReporterBasic {
buf: &mut [u8],
token: VerificationToken<TcStateNone>,
sender: &mut (impl EcssTmSender<Error = E> + ?Sized),
seq_counter: &mut (impl SequenceCountProvider<u16> + ?Sized),
time_stamp: &[u8],
) -> Result<VerificationToken<TcStateAccepted>, VerificationErrorWithToken<E, TcStateNone>>
{
@ -339,6 +343,7 @@ impl VerificationReporterBasic {
.create_pus_verif_success_tm(
buf,
Subservices::TmAcceptanceSuccess.into(),
seq_counter.get(),
&token.req_id,
time_stamp,
None::<&dyn EcssEnumeration>,
@ -347,7 +352,8 @@ impl VerificationReporterBasic {
sender
.send_tm(tm)
.map_err(|e| VerificationErrorWithToken(e, token))?;
self.msg_count += 1;
seq_counter.increment();
//seq_counter.get_and_increment()
Ok(VerificationToken {
state: PhantomData,
req_id: token.req_id,
@ -360,12 +366,14 @@ impl VerificationReporterBasic {
buf: &mut [u8],
token: VerificationToken<TcStateNone>,
sender: &mut (impl EcssTmSender<Error = E> + ?Sized),
seq_counter: &mut (impl SequenceCountProvider<u16> + ?Sized),
params: FailParams,
) -> Result<(), VerificationErrorWithToken<E, TcStateNone>> {
let tm = self
.create_pus_verif_fail_tm(
buf,
Subservices::TmAcceptanceFailure.into(),
seq_counter.get(),
&token.req_id,
None::<&dyn EcssEnumeration>,
&params,
@ -374,7 +382,7 @@ impl VerificationReporterBasic {
sender
.send_tm(tm)
.map_err(|e| VerificationErrorWithToken(e, token))?;
self.msg_count += 1;
seq_counter.increment();
Ok(())
}
@ -386,6 +394,7 @@ impl VerificationReporterBasic {
buf: &mut [u8],
token: VerificationToken<TcStateAccepted>,
sender: &mut (impl EcssTmSender<Error = E> + ?Sized),
seq_counter: &mut (impl SequenceCountProvider<u16> + ?Sized),
time_stamp: &[u8],
) -> Result<VerificationToken<TcStateStarted>, VerificationErrorWithToken<E, TcStateAccepted>>
{
@ -393,6 +402,7 @@ impl VerificationReporterBasic {
.create_pus_verif_success_tm(
buf,
Subservices::TmStartSuccess.into(),
seq_counter.get(),
&token.req_id,
time_stamp,
None::<&dyn EcssEnumeration>,
@ -401,7 +411,7 @@ impl VerificationReporterBasic {
sender
.send_tm(tm)
.map_err(|e| VerificationErrorWithToken(e, token))?;
self.msg_count += 1;
seq_counter.increment();
Ok(VerificationToken {
state: PhantomData,
req_id: token.req_id,
@ -417,12 +427,14 @@ impl VerificationReporterBasic {
buf: &mut [u8],
token: VerificationToken<TcStateAccepted>,
sender: &mut (impl EcssTmSender<Error = E> + ?Sized),
seq_counter: &mut (impl SequenceCountProvider<u16> + ?Sized),
params: FailParams,
) -> Result<(), VerificationErrorWithToken<E, TcStateAccepted>> {
let tm = self
.create_pus_verif_fail_tm(
buf,
Subservices::TmStartFailure.into(),
seq_counter.get(),
&token.req_id,
None::<&dyn EcssEnumeration>,
&params,
@ -431,7 +443,7 @@ impl VerificationReporterBasic {
sender
.send_tm(tm)
.map_err(|e| VerificationErrorWithToken(e, token))?;
self.msg_count += 1;
seq_counter.increment();
Ok(())
}
@ -443,18 +455,20 @@ impl VerificationReporterBasic {
buf: &mut [u8],
token: &VerificationToken<TcStateStarted>,
sender: &mut (impl EcssTmSender<Error = E> + ?Sized),
seq_counter: &mut (impl SequenceCountProvider<u16> + ?Sized),
time_stamp: &[u8],
step: impl EcssEnumeration,
) -> Result<(), EcssTmError<E>> {
let tm = self.create_pus_verif_success_tm(
buf,
Subservices::TmStepSuccess.into(),
seq_counter.get(),
&token.req_id,
time_stamp,
Some(&step),
)?;
sender.send_tm(tm)?;
self.msg_count += 1;
seq_counter.increment();
Ok(())
}
@ -467,12 +481,14 @@ impl VerificationReporterBasic {
buf: &mut [u8],
token: VerificationToken<TcStateStarted>,
sender: &mut (impl EcssTmSender<Error = E> + ?Sized),
seq_counter: &mut (impl SequenceCountProvider<u16> + ?Sized),
params: FailParamsWithStep,
) -> Result<(), VerificationErrorWithToken<E, TcStateStarted>> {
let tm = self
.create_pus_verif_fail_tm(
buf,
Subservices::TmStepFailure.into(),
seq_counter.get(),
&token.req_id,
Some(params.step),
&params.bp,
@ -481,7 +497,7 @@ impl VerificationReporterBasic {
sender
.send_tm(tm)
.map_err(|e| VerificationErrorWithToken(e, token))?;
self.msg_count += 1;
seq_counter.increment();
Ok(())
}
@ -494,12 +510,14 @@ impl VerificationReporterBasic {
buf: &mut [u8],
token: VerificationToken<TcStateStarted>,
sender: &mut (impl EcssTmSender<Error = E> + ?Sized),
seq_counter: &mut (impl SequenceCountProvider<u16> + ?Sized),
time_stamp: &[u8],
) -> Result<(), VerificationErrorWithToken<E, TcStateStarted>> {
let tm = self
.create_pus_verif_success_tm(
buf,
Subservices::TmCompletionSuccess.into(),
seq_counter.get(),
&token.req_id,
time_stamp,
None::<&dyn EcssEnumeration>,
@ -508,7 +526,7 @@ impl VerificationReporterBasic {
sender
.send_tm(tm)
.map_err(|e| VerificationErrorWithToken(e, token))?;
self.msg_count += 1;
seq_counter.increment();
Ok(())
}
@ -521,12 +539,14 @@ impl VerificationReporterBasic {
buf: &mut [u8],
token: VerificationToken<TcStateStarted>,
sender: &mut (impl EcssTmSender<Error = E> + ?Sized),
seq_counter: &mut (impl SequenceCountProvider<u16> + ?Sized),
params: FailParams,
) -> Result<(), VerificationErrorWithToken<E, TcStateStarted>> {
let tm = self
.create_pus_verif_fail_tm(
buf,
Subservices::TmCompletionFailure.into(),
seq_counter.get(),
&token.req_id,
None::<&dyn EcssEnumeration>,
&params,
@ -535,7 +555,7 @@ impl VerificationReporterBasic {
sender
.send_tm(tm)
.map_err(|e| VerificationErrorWithToken(e, token))?;
self.msg_count += 1;
seq_counter.increment();
Ok(())
}
@ -543,6 +563,7 @@ impl VerificationReporterBasic {
&'a mut self,
buf: &'a mut [u8],
subservice: u8,
msg_counter: u16,
req_id: &RequestId,
time_stamp: &'a [u8],
step: Option<&(impl EcssEnumeration + ?Sized)>,
@ -564,6 +585,7 @@ impl VerificationReporterBasic {
Ok(self.create_pus_verif_tm_base(
buf,
subservice,
msg_counter,
&mut sp_header,
time_stamp,
source_data_len,
@ -574,6 +596,7 @@ impl VerificationReporterBasic {
&'a mut self,
buf: &'a mut [u8],
subservice: u8,
msg_counter: u16,
req_id: &RequestId,
step: Option<&(impl EcssEnumeration + ?Sized)>,
params: &'a FailParams,
@ -607,6 +630,7 @@ impl VerificationReporterBasic {
Ok(self.create_pus_verif_tm_base(
buf,
subservice,
msg_counter,
&mut sp_header,
params.time_stamp,
source_data_len,
@ -617,12 +641,13 @@ impl VerificationReporterBasic {
&'a mut self,
buf: &'a mut [u8],
subservice: u8,
msg_counter: u16,
sp_header: &mut SpHeader,
time_stamp: &'a [u8],
source_data_len: usize,
) -> PusTm {
let tm_sec_header =
PusTmSecondaryHeader::new(1, subservice, self.msg_count, self.dest_id, time_stamp);
PusTmSecondaryHeader::new(1, subservice, msg_counter, self.dest_id, time_stamp);
PusTm::new(
sp_header,
tm_sec_header,
@ -639,8 +664,10 @@ mod allocmod {
use alloc::vec;
use alloc::vec::Vec;
#[derive(Clone)]
pub struct VerificationReporterCfg {
apid: u16,
seq_counter: Box<dyn SequenceCountProvider<u16> + Send>,
pub step_field_width: usize,
pub fail_code_field_width: usize,
pub max_fail_data_len: usize,
@ -649,6 +676,7 @@ mod allocmod {
impl VerificationReporterCfg {
pub fn new(
apid: u16,
seq_counter: Box<dyn SequenceCountProvider<u16> + Send>,
step_field_width: usize,
fail_code_field_width: usize,
max_fail_data_len: usize,
@ -658,6 +686,7 @@ mod allocmod {
}
Some(Self {
apid,
seq_counter,
step_field_width,
fail_code_field_width,
max_fail_data_len,
@ -667,13 +696,15 @@ mod allocmod {
/// Primary verification handler. It provides an API to send PUS 1 verification telemetry packets
/// and verify the various steps of telecommand handling as specified in the PUS standard.
pub struct VerificationReporter {
#[derive (Clone)]
pub struct VerificationReporterWithBuf {
source_data_buf: Vec<u8>,
seq_counter: Box<dyn SequenceCountProvider<u16> + Send + 'static>,
pub reporter: VerificationReporterBasic,
}
impl VerificationReporter {
pub fn new(cfg: VerificationReporterCfg) -> Self {
impl VerificationReporterWithBuf {
pub fn new(cfg: &VerificationReporterCfg) -> Self {
let reporter = VerificationReporterBasic::new(cfg.apid).unwrap();
Self {
source_data_buf: vec![
@ -683,6 +714,7 @@ mod allocmod {
+ cfg.fail_code_field_width as usize
+ cfg.max_fail_data_len
],
seq_counter: cfg.seq_counter.clone(),
reporter,
}
}
@ -714,6 +746,7 @@ mod allocmod {
self.source_data_buf.as_mut_slice(),
token,
sender,
self.seq_counter.as_mut(),
time_stamp,
)
}
@ -729,6 +762,7 @@ mod allocmod {
self.source_data_buf.as_mut_slice(),
token,
sender,
self.seq_counter.as_mut(),
params,
)
}
@ -747,6 +781,7 @@ mod allocmod {
self.source_data_buf.as_mut_slice(),
token,
sender,
self.seq_counter.as_mut(),
time_stamp,
)
}
@ -762,7 +797,7 @@ mod allocmod {
params: FailParams,
) -> Result<(), VerificationErrorWithToken<E, TcStateAccepted>> {
self.reporter
.start_failure(self.source_data_buf.as_mut_slice(), token, sender, params)
.start_failure(self.source_data_buf.as_mut_slice(), token, sender, self.seq_counter.as_mut(), params)
}
/// Package and send a PUS TM\[1, 5\] packet, see 8.1.2.5 of the PUS standard.
@ -779,6 +814,7 @@ mod allocmod {
self.source_data_buf.as_mut_slice(),
token,
sender,
self.seq_counter.as_mut(),
time_stamp,
step,
)
@ -795,7 +831,7 @@ mod allocmod {
params: FailParamsWithStep,
) -> Result<(), VerificationErrorWithToken<E, TcStateStarted>> {
self.reporter
.step_failure(self.source_data_buf.as_mut_slice(), token, sender, params)
.step_failure(self.source_data_buf.as_mut_slice(), token, sender, self.seq_counter.as_mut(), params)
}
/// Package and send a PUS TM\[1, 7\] packet, see 8.1.2.7 of the PUS standard.
@ -812,6 +848,7 @@ mod allocmod {
self.source_data_buf.as_mut_slice(),
token,
sender,
self.seq_counter.as_mut(),
time_stamp,
)
}
@ -830,6 +867,7 @@ mod allocmod {
self.source_data_buf.as_mut_slice(),
token,
sender,
self.seq_counter.as_mut(),
params,
)
}
@ -838,18 +876,18 @@ mod allocmod {
/// Helper object which caches the sender passed as a trait object. Provides the same
/// API as [VerificationReporter] but without the explicit sender arguments.
pub struct VerificationReporterWithSender<E> {
pub reporter: VerificationReporter,
pub reporter: VerificationReporterWithBuf,
pub sender: Box<dyn EcssTmSender<Error = E>>,
}
impl<E: 'static> VerificationReporterWithSender<E> {
pub fn new(cfg: VerificationReporterCfg, sender: Box<dyn EcssTmSender<Error = E>>) -> Self {
let reporter = VerificationReporter::new(cfg);
pub fn new(cfg: &VerificationReporterCfg, sender: Box<dyn EcssTmSender<Error = E>>) -> Self {
let reporter = VerificationReporterWithBuf::new(cfg);
Self::new_from_reporter(reporter, sender)
}
pub fn new_from_reporter(
reporter: VerificationReporter,
reporter: VerificationReporterWithBuf,
sender: Box<dyn EcssTmSender<Error = E>>,
) -> Self {
Self { reporter, sender }
@ -1095,8 +1133,8 @@ mod tests {
use crate::pus::tests::CommonTmInfo;
use crate::pus::verification::{
EcssTmError, EcssTmSender, FailParams, FailParamsWithStep, RequestId, TcStateNone,
VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender,
VerificationToken,
VerificationReporterWithBuf, VerificationReporterCfg, VerificationReporterWithSender,
VerificationToken
};
use alloc::boxed::Box;
use alloc::format;
@ -1106,6 +1144,7 @@ mod tests {
use spacepackets::{ByteConversionError, SpHeader};
use std::collections::VecDeque;
use std::vec::Vec;
use crate::seq_count::SimpleSeqCountProvider;
const TEST_APID: u16 = 0x02;
const EMPTY_STAMP: [u8; 7] = [0; 7];
@ -1160,13 +1199,13 @@ mod tests {
}
struct TestBase<'a> {
vr: VerificationReporter,
vr: VerificationReporterWithBuf,
#[allow(dead_code)]
tc: PusTc<'a>,
}
impl<'a> TestBase<'a> {
fn rep(&mut self) -> &mut VerificationReporter {
fn rep(&mut self) -> &mut VerificationReporterWithBuf {
&mut self.vr
}
}
@ -1177,14 +1216,14 @@ mod tests {
}
impl<'a, E> TestBaseWithHelper<'a, E> {
fn rep(&mut self) -> &mut VerificationReporter {
fn rep(&mut self) -> &mut VerificationReporterWithBuf {
&mut self.helper.reporter
}
}
fn base_reporter() -> VerificationReporter {
let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
VerificationReporter::new(cfg)
fn base_reporter() -> VerificationReporterWithBuf {
let cfg = VerificationReporterCfg::new(TEST_APID, Box::new(SimpleSeqCountProvider::default()), 1, 2, 8).unwrap();
VerificationReporterWithBuf::new(&cfg)
}
fn base_tc_init(app_data: Option<&[u8]>) -> (PusTc, RequestId) {

View File

@ -0,0 +1,56 @@
use dyn_clone::DynClone;
#[cfg(feature = "std")]
pub use stdmod::*;
pub trait SequenceCountProvider<Raw>: DynClone {
fn get(&self) -> Raw;
fn increment(&mut self);
fn get_and_increment(&mut self) -> Raw {
let val = self.get();
self.increment();
val
}
}
#[derive(Default, Clone)]
pub struct SimpleSeqCountProvider {
seq_count: u16
}
dyn_clone::clone_trait_object!(SequenceCountProvider<u16>);
impl SequenceCountProvider<u16> for SimpleSeqCountProvider {
fn get(&self) -> u16 {
self.seq_count
}
fn increment(&mut self) {
self.seq_count += 1;
}
}
#[cfg(feature = "std")]
pub mod stdmod {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicU16, Ordering};
#[derive(Clone)]
pub struct SyncSeqCountProvider {
seq_count: Arc<AtomicU16>
}
impl SequenceCountProvider<u16> for SyncSeqCountProvider {
fn get(&self) -> u16 {
self.seq_count.load(Ordering::SeqCst)
}
fn increment(&mut self) {
self.seq_count.fetch_add(1, Ordering::SeqCst);
}
fn get_and_increment(&mut self) -> u16 {
self.seq_count.fetch_add(1, Ordering::SeqCst)
}
}
}

View File

@ -1,9 +1,7 @@
use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool};
use satrs_core::pus::verification::{
CrossbeamVerifSender, FailParams, RequestId, VerificationReporterCfg,
VerificationReporterWithSender,
};
use satrs_core::pus::verification::{CrossbeamVerifSender, FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender};
use hashbrown::HashMap;
use satrs_core::seq_count::SimpleSeqCountProvider;
use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket};
use spacepackets::tc::{PusTc, PusTcSecondaryHeader};
use spacepackets::tm::PusTm;
@ -25,7 +23,7 @@ const PACKETS_SENT: u8 = 8;
/// threads have sent the correct expected verification reports
#[test]
fn test_shared_reporter() {
let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
let cfg = VerificationReporterCfg::new(TEST_APID, Box::new(SimpleSeqCountProvider::default()), 1, 2, 8).unwrap();
// Shared pool object to store the verification PUS telemetry
let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]);
let shared_tm_pool: SharedPool =
@ -35,7 +33,7 @@ fn test_shared_reporter() {
let (tx, rx) = crossbeam_channel::bounded(5);
let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), tx.clone());
let reporter_with_sender_0 = Arc::new(Mutex::new(VerificationReporterWithSender::new(
cfg,
&cfg,
Box::new(sender),
)));
let reporter_with_sender_1 = reporter_with_sender_0.clone();

View File

@ -13,9 +13,7 @@ use satrs_core::pus::event_man::{
DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken,
PusEventDispatcher,
};
use satrs_core::pus::verification::{
MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender,
};
use satrs_core::pus::verification::{MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender};
use satrs_core::pus::{EcssTmError, EcssTmSender};
use satrs_core::tmtc::CcsdsError;
use satrs_example::{OBSW_SERVER_ADDR, SERVER_PORT};
@ -25,6 +23,7 @@ use std::net::{IpAddr, SocketAddr};
use std::sync::mpsc::channel;
use std::sync::{mpsc, Arc, Mutex, RwLock};
use std::thread;
use satrs_core::seq_count::SimpleSeqCountProvider;
struct TmFunnel {
tm_funnel_rx: mpsc::Receiver<StoreAddr>,
@ -73,9 +72,9 @@ fn main() {
let (tm_funnel_tx, tm_funnel_rx) = channel();
let (tm_server_tx, tm_server_rx) = channel();
let sender = MpscVerifSender::new(tm_store.clone(), tm_funnel_tx.clone());
let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap();
let verif_cfg = VerificationReporterCfg::new(PUS_APID, Box::new(SimpleSeqCountProvider::default()), 1, 2, 8).unwrap();
let reporter_with_sender_0 = Arc::new(Mutex::new(VerificationReporterWithSender::new(
verif_cfg,
&verif_cfg,
Box::new(sender),
)));