From 01c3440c0743fd24bfa2a6670e3357f1b6fd81fb Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 7 Sep 2022 11:02:45 +0200 Subject: [PATCH] support cross-beam channel backend --- Cargo.lock | 1 + fsrc-core/Cargo.toml | 10 +- fsrc-core/src/pool.rs | 1 + fsrc-core/src/pus/verification.rs | 201 +++++++++++++++++++-------- fsrc-core/tests/verification_test.rs | 103 ++++++++++---- 5 files changed, 228 insertions(+), 88 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 28701b0..1d8bc60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -223,6 +223,7 @@ name = "fsrc-core" version = "0.1.0" dependencies = [ "bus", + "crossbeam-channel", "delegate", "downcast-rs", "hashbrown", diff --git a/fsrc-core/Cargo.toml b/fsrc-core/Cargo.toml index 392e4f4..160c727 100644 --- a/fsrc-core/Cargo.toml +++ b/fsrc-core/Cargo.toml @@ -22,17 +22,23 @@ default-features = false version = "2.2.3" optional = true +[dependencies.crossbeam-channel] +version= "0.5" +default-features = false + [dependencies.spacepackets] path = "../spacepackets" [dev-dependencies] -postcard = { version = "1.0.1", features = ["use-std"] } serde = "1.0.143" zerocopy = "0.6.1" once_cell = "1.13.1" +[dev-dependencies.postcard] +version = "1.0.1" + [features] default = ["std"] -std = ["downcast-rs/std", "alloc", "bus"] +std = ["downcast-rs/std", "alloc", "bus", "postcard/use-std", "crossbeam-channel/std"] alloc = [] diff --git a/fsrc-core/src/pool.rs b/fsrc-core/src/pool.rs index 79100bb..e1a00a7 100644 --- a/fsrc-core/src/pool.rs +++ b/fsrc-core/src/pool.rs @@ -87,6 +87,7 @@ type NumBlocks = u16; /// /// * `cfg`: Vector of tuples which represent a subpool. The first entry in the tuple specifies the /// number of memory blocks in the subpool, the second entry the size of the blocks +#[derive(Clone)] pub struct PoolCfg { cfg: Vec<(NumBlocks, usize)>, } diff --git a/fsrc-core/src/pus/verification.rs b/fsrc-core/src/pus/verification.rs index 7c3e92c..b81a47d 100644 --- a/fsrc-core/src/pus/verification.rs +++ b/fsrc-core/src/pus/verification.rs @@ -9,11 +9,10 @@ //! //! # Example //! TODO: Cross Ref integration test which will be provided -use crate::pool::{LocalPool, StoreAddr, StoreError}; use alloc::boxed::Box; -use alloc::sync::Arc; use alloc::vec; use alloc::vec::Vec; +use core::hash::{Hash, Hasher}; use core::marker::PhantomData; use core::mem::size_of; use delegate::delegate; @@ -26,17 +25,32 @@ use spacepackets::{ByteConversionError, SizeMissmatch, SpHeader}; use spacepackets::{CcsdsPacket, PacketId, PacketSequenceCtrl}; #[cfg(feature = "std")] -use std::sync::{mpsc, RwLock, RwLockWriteGuard}; +pub use stdmod::{CrossbeamVerifSender, StdVerifSender, StdVerifSenderError}; /// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard /// This field equivalent to the first two bytes of the CCSDS space packet header. -#[derive(Debug, Eq, PartialEq, Copy, Clone)] +#[derive(Debug, Eq, Copy, Clone)] pub struct RequestId { version_number: u8, packet_id: PacketId, psc: PacketSequenceCtrl, } +impl Hash for RequestId { + fn hash(&self, state: &mut H) { + self.raw().hash(state); + } +} + +// Implement manually to satisfy derive_hash_xor_eq lint +impl PartialEq for RequestId { + fn eq(&self, other: &Self) -> bool { + self.version_number == other.version_number + && self.packet_id == other.packet_id + && self.psc == other.psc + } +} + impl RequestId { pub const SIZE_AS_BYTES: usize = size_of::(); @@ -96,7 +110,7 @@ pub struct VerificationErrorWithToken(VerificationError, VerificationTo /// PUS Service 1 Verification Telemetry to a verification TM recipient. The [Downcast] trait /// is implemented to allow passing the sender as a boxed trait object and still retrieve the /// concrete type at a later point. -pub trait VerificationSender: Downcast { +pub trait VerificationSender: Downcast + Send { fn send_verification_tm(&mut self, tm: PusTm) -> Result<(), VerificationError>; } @@ -124,6 +138,10 @@ impl VerificationToken { req_id, } } + + pub fn req_id(&self) -> RequestId { + self.req_id + } } pub struct VerificationReporterCfg { @@ -533,20 +551,17 @@ impl VerificationReporter { /// API as [VerificationReporter] but without the explicit sender arguments. pub struct VerificationReporterWithSender { reporter: VerificationReporter, - pub sender: Box + Send>, + pub sender: Box>, } impl VerificationReporterWithSender { - pub fn new( - cfg: VerificationReporterCfg, - sender: Box + Send>, - ) -> Self { + pub fn new(cfg: VerificationReporterCfg, sender: Box>) -> Self { Self::new_from_reporter(VerificationReporter::new(cfg), sender) } pub fn new_from_reporter( reporter: VerificationReporter, - sender: Box + Send>, + sender: Box>, ) -> Self { Self { reporter, sender } } @@ -633,61 +648,127 @@ impl VerificationReporterWithSender { } #[cfg(feature = "std")] -pub struct StdVerifSender { - pub ignore_poison_error: bool, - tm_store: Arc>, - tx: mpsc::Sender, -} +mod stdmod { + use crate::pool::{LocalPool, StoreAddr, StoreError}; + use crate::pus::verification::{VerificationError, VerificationSender}; + use delegate::delegate; + use spacepackets::tm::PusTm; + use std::sync::{mpsc, Arc, RwLock, RwLockWriteGuard}; -#[cfg(feature = "std")] -impl StdVerifSender { - pub fn new(tm_store: Arc>, tx: mpsc::Sender) -> Self { - Self { - ignore_poison_error: true, - tx, - tm_store, + #[derive(Debug, Eq, PartialEq)] + pub enum StdVerifSenderError { + PoisonError, + StoreError(StoreError), + RxDisconnected(StoreAddr), + } + + trait SendBackend: Send { + fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr>; + } + + struct StdSenderBase { + pub ignore_poison_error: bool, + tm_store: Arc>, + tx: S, + } + + impl StdSenderBase { + pub fn new(tm_store: Arc>, tx: S) -> Self { + Self { + ignore_poison_error: false, + tm_store, + tx, + } } } -} -#[cfg(feature = "std")] -unsafe impl Sync for StdVerifSender {} -#[cfg(feature = "std")] -unsafe impl Send for StdVerifSender {} + impl SendBackend for mpsc::Sender { + fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr> { + self.send(addr).map_err(|_| addr) + } + } -#[cfg(feature = "std")] -#[derive(Debug, Eq, PartialEq)] -pub enum StdVerifSenderError { - PoisonError, - StoreError(StoreError), - RxDisconnected(StoreAddr), -} + pub struct StdVerifSender { + base: StdSenderBase>, + } -#[cfg(feature = "std")] -impl VerificationSender for StdVerifSender { - fn send_verification_tm( - &mut self, - tm: PusTm, - ) -> Result<(), VerificationError> { - let operation = |mut mg: RwLockWriteGuard| { - let (addr, buf) = mg - .free_element(tm.len_packed()) - .map_err(|e| VerificationError::SendError(StdVerifSenderError::StoreError(e)))?; - tm.write_to(buf).map_err(VerificationError::PusError)?; - self.tx.send(addr).map_err(|_| { - VerificationError::SendError(StdVerifSenderError::RxDisconnected(addr)) - })?; - Ok(()) - }; - match self.tm_store.write() { - Ok(lock) => operation(lock), - Err(poison_error) => { - if self.ignore_poison_error { - operation(poison_error.into_inner()) - } else { - Err(VerificationError::SendError( - StdVerifSenderError::PoisonError, - )) + impl StdVerifSender { + pub fn new(tm_store: Arc>, tx: mpsc::Sender) -> Self { + Self { + base: StdSenderBase::new(tm_store, tx), + } + } + } + + //noinspection RsTraitImplementation + impl VerificationSender for StdVerifSender { + delegate!( + to self.base { + fn send_verification_tm(&mut self, tm: PusTm) -> Result<(), VerificationError>; + } + ); + } + unsafe impl Sync for StdVerifSender {} + unsafe impl Send for StdVerifSender {} + + impl SendBackend for crossbeam_channel::Sender { + fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr> { + self.send(addr).map_err(|_| addr) + } + } + + pub struct CrossbeamVerifSender { + base: StdSenderBase>, + } + + impl CrossbeamVerifSender { + pub fn new( + tm_store: Arc>, + tx: crossbeam_channel::Sender, + ) -> Self { + Self { + base: StdSenderBase::new(tm_store, tx), + } + } + } + + //noinspection RsTraitImplementation + impl VerificationSender for CrossbeamVerifSender { + delegate!( + to self.base { + fn send_verification_tm(&mut self, tm: PusTm) -> Result<(), VerificationError>; + } + ); + } + + unsafe impl Sync for CrossbeamVerifSender {} + unsafe impl Send for CrossbeamVerifSender {} + + impl VerificationSender for StdSenderBase { + fn send_verification_tm( + &mut self, + tm: PusTm, + ) -> Result<(), VerificationError> { + let operation = |mut mg: RwLockWriteGuard| { + let (addr, buf) = mg.free_element(tm.len_packed()).map_err(|e| { + VerificationError::SendError(StdVerifSenderError::StoreError(e)) + })?; + tm.write_to(buf).map_err(VerificationError::PusError)?; + self.tx.send(addr).map_err(|_| { + VerificationError::SendError(StdVerifSenderError::RxDisconnected(addr)) + })?; + Ok(()) + }; + match self.tm_store.write() { + Ok(lock) => operation(lock), + Err(poison_error) => { + if self.ignore_poison_error { + operation(poison_error.into_inner()) + } else { + Err(VerificationError::SendError( + StdVerifSenderError::PoisonError, + )) + } } } } diff --git a/fsrc-core/tests/verification_test.rs b/fsrc-core/tests/verification_test.rs index f80efb8..e1431a4 100644 --- a/fsrc-core/tests/verification_test.rs +++ b/fsrc-core/tests/verification_test.rs @@ -1,13 +1,13 @@ use fsrc_core::pool::{LocalPool, PoolCfg}; use fsrc_core::pus::verification::{ - FailParams, RequestId, StdVerifSender, VerificationReporter, VerificationReporterCfg, + CrossbeamVerifSender, FailParams, RequestId, VerificationReporter, VerificationReporterCfg, }; +use hashbrown::HashMap; use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tm::PusTm; use spacepackets::SpHeader; -use std::sync::atomic::{AtomicU16, Ordering}; -use std::sync::{mpsc, Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread; const TEST_APID: u16 = 0x03; @@ -16,28 +16,55 @@ const PACKETS_SENT: u8 = 8; #[test] fn test_shared_reporter() { - let seq_counter_0 = Arc::new(AtomicU16::new(0)); - let seq_counter_1 = seq_counter_0.clone(); let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8); // Shared pool object to store the verification PUS telemetry let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); - let shared_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); - let (tx, rx) = mpsc::channel(); - let mut sender_0 = StdVerifSender::new(shared_pool.clone(), tx.clone()); - let mut sender_1 = StdVerifSender::new(shared_pool.clone(), tx); + let shared_tm_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg.clone()))); + let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); + let shared_tc_pool_1 = shared_tc_pool_0.clone(); + let (tx, rx) = crossbeam_channel::bounded(5); + let mut sender_0 = CrossbeamVerifSender::new(shared_tm_pool.clone(), tx.clone()); + let mut sender_1 = CrossbeamVerifSender::new(shared_tm_pool.clone(), tx); let reporter_0 = Arc::new(Mutex::new(VerificationReporter::new(cfg))); let reporter_1 = reporter_0.clone(); - let verif_sender_0 = thread::spawn(move || { - let mut sph = - SpHeader::tc(TEST_APID, seq_counter_0.fetch_add(1, Ordering::SeqCst), 0).unwrap(); + let (tx_tc_0, rx_tc_0) = crossbeam_channel::bounded(3); + let (tx_tc_1, rx_tc_1) = crossbeam_channel::bounded(3); + { + let mut tc_guard = shared_tc_pool_0.write().unwrap(); + let mut sph = SpHeader::tc(TEST_APID, 0, 0).unwrap(); let tc_header = PusTcSecondaryHeader::new_simple(17, 1); - let pus_tc = PusTc::new(&mut sph, tc_header, None, true); + let pus_tc_0 = PusTc::new(&mut sph, tc_header, None, true); + let (addr, mut buf) = tc_guard.free_element(pus_tc_0.len_packed()).unwrap(); + pus_tc_0.write_to(&mut buf).unwrap(); + tx_tc_0.send(addr).unwrap(); + let mut sph = SpHeader::tc(TEST_APID, 1, 0).unwrap(); + let tc_header = PusTcSecondaryHeader::new_simple(5, 1); + let pus_tc_1 = PusTc::new(&mut sph, tc_header, None, true); + let (addr, mut buf) = tc_guard.free_element(pus_tc_0.len_packed()).unwrap(); + pus_tc_1.write_to(&mut buf).unwrap(); + tx_tc_1.send(addr).unwrap(); + } + + let verif_sender_0 = thread::spawn(move || { + let mut tc_buf: [u8; 1024] = [0; 1024]; + let tc_addr = rx_tc_1.recv().unwrap(); + let tc_len; + { + let mut tc_guard = shared_tc_pool_0.write().unwrap(); + let pg = tc_guard.read_with_guard(tc_addr); + let buf = pg.read().unwrap(); + tc_len = buf.len(); + tc_buf[0..tc_len].copy_from_slice(buf); + } + let (tc, _) = PusTc::new_from_raw_slice(&tc_buf[0..tc_len]).unwrap(); + let req_id = RequestId::new(&tc); let mut mg = reporter_0.lock().unwrap(); - let token = mg.add_tc(&pus_tc); + let token = mg.add_tc_with_req_id(req_id); let accepted_token = mg .acceptance_success(token, &mut sender_0, &FIXED_STAMP) .expect("Acceptance success failed"); + // Do some start handling here let started_token = mg .start_success(accepted_token, &mut sender_0, &FIXED_STAMP) .expect("Start success failed"); @@ -48,6 +75,7 @@ fn test_shared_reporter() { EcssEnumU8::new(0), ) .expect("Start success failed"); + // Do some step handling here mg.step_success( &started_token, &mut sender_0, @@ -60,12 +88,19 @@ fn test_shared_reporter() { }); let verif_sender_1 = thread::spawn(move || { - let mut sph = - SpHeader::tc(TEST_APID, seq_counter_1.fetch_add(1, Ordering::SeqCst), 0).unwrap(); - let tc_header = PusTcSecondaryHeader::new_simple(5, 1); - let pus_tc = PusTc::new(&mut sph, tc_header, None, true); + let mut tc_buf: [u8; 1024] = [0; 1024]; + let tc_addr = rx_tc_0.recv().unwrap(); + let tc_len; + { + let mut tc_guard = shared_tc_pool_1.write().unwrap(); + let pg = tc_guard.read_with_guard(tc_addr); + let buf = pg.read().unwrap(); + tc_len = buf.len(); + tc_buf[0..tc_len].copy_from_slice(buf); + } + let (tc, _) = PusTc::new_from_raw_slice(&tc_buf[0..tc_len]).unwrap(); let mut mg = reporter_1.lock().unwrap(); - let token = mg.add_tc(&pus_tc); + let token = mg.add_tc(&tc); let accepted_token = mg .acceptance_success(token, &mut sender_1, &FIXED_STAMP) .expect("Acceptance success failed"); @@ -81,9 +116,10 @@ fn test_shared_reporter() { let verif_receiver = thread::spawn(move || { let mut packet_counter = 0; let mut tm_buf: [u8; 1024] = [0; 1024]; + let mut verif_map = HashMap::new(); while packet_counter < PACKETS_SENT { let verif_addr = rx.recv().expect("Error receiving verification packet"); - let mut rg = shared_pool.write().expect("Error locking shared pool"); + let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); let store_guard = rg.read_with_guard(verif_addr); let slice = store_guard.read().expect("Error reading TM slice"); let tm_len = slice.len(); @@ -96,14 +132,29 @@ fn test_shared_reporter() { &pus_tm.source_data().expect("Invalid TM source data")[0..RequestId::SIZE_AS_BYTES], ) .unwrap(); - println!( - "Received PUS Verification TM[{},{}] for request ID {:#08x}", - pus_tm.service(), - pus_tm.subservice(), - req_id.raw() - ); + if !verif_map.contains_key(&req_id) { + let mut content = Vec::new(); + content.push(pus_tm.subservice()); + verif_map.insert(req_id, content); + } else { + let content = verif_map.get_mut(&req_id).unwrap(); + content.push(pus_tm.subservice()) + } packet_counter += 1; } + for (_req_id, content) in verif_map { + if content.len() == 3 { + assert_eq!(content[0], 1); + assert_eq!(content[1], 3); + assert_eq!(content[2], 8); + } else { + assert_eq!(content[0], 1); + assert_eq!(content[1], 3); + assert_eq!(content[2], 5); + assert_eq!(content[3], 5); + assert_eq!(content[4], 7); + } + } }); verif_sender_0.join().expect("Joining thread 0 failed"); verif_sender_1.join().expect("Joining thread 1 failed");