From b74bfea7ddde27fad29a978123687b287b15c979 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 30 Dec 2022 23:28:33 +0100 Subject: [PATCH] put crossbeam dep behind feature gate --- satrs-core/Cargo.toml | 4 +- satrs-core/src/pus/verification.rs | 20 +- satrs-core/tests/pus_verification.rs | 369 ++++++++++++++------------- 3 files changed, 204 insertions(+), 189 deletions(-) diff --git a/satrs-core/Cargo.toml b/satrs-core/Cargo.toml index 3781142..19948e4 100644 --- a/satrs-core/Cargo.toml +++ b/satrs-core/Cargo.toml @@ -10,7 +10,7 @@ delegate = "0.8" hashbrown = "0.13" heapless = "0.7" paste = "1.0" -dyn-clone = "1.0.9" +dyn-clone = "1.0" embed-doc-image = "0.1" [dependencies.num-traits] @@ -28,6 +28,7 @@ optional = true [dependencies.crossbeam-channel] version= "0.5" default-features = false +optional = true [dependencies.serde] version = "1.0" @@ -52,6 +53,7 @@ default = ["std"] std = ["downcast-rs/std", "alloc", "bus", "postcard/use-std", "crossbeam-channel/std", "serde/std", "spacepackets/std"] alloc = ["serde/alloc", "spacepackets/alloc"] serde = ["dep:serde", "spacepackets/serde"] +crossbeam = ["crossbeam-channel"] heapless = [] doc-images = [] diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index 69c189f..35de019 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -13,10 +13,10 @@ //! executed. Note that the verification part could also be done in a separate thread. //! //! ``` -//! use std::sync::{Arc, RwLock}; +//! use std::sync::{Arc, mpsc, RwLock}; //! use std::time::Duration; //! use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; -//! use satrs_core::pus::verification::{CrossbeamVerifSender, VerificationReporterCfg, VerificationReporterWithSender}; +//! use satrs_core::pus::verification::{MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender}; //! use satrs_core::seq_count::SimpleSeqCountProvider; //! use spacepackets::ecss::PusPacket; //! use spacepackets::SpHeader; @@ -28,8 +28,8 @@ //! //! let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); //! let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); -//! let (verif_tx, verif_rx) = crossbeam_channel::bounded(10); -//! let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), verif_tx); +//! let (verif_tx, verif_rx) = mpsc::channel(); +//! let sender = MpscVerifSender::new(shared_tm_pool.clone(), verif_tx); //! let cfg = VerificationReporterCfg::new(TEST_APID, Box::new(SimpleSeqCountProvider::default()), 1, 2, 8).unwrap(); //! let mut reporter = VerificationReporterWithSender::new(&cfg , Box::new(sender)); //! @@ -93,10 +93,12 @@ pub use alloc_mod::{ }; use crate::seq_count::SequenceCountProvider; +#[cfg(feature = "crossbeam")] +pub use stdmod::CrossbeamVerifSender; #[cfg(feature = "std")] pub use stdmod::{ - CrossbeamVerifSender, MpscVerifSender, SharedStdVerifReporterWithSender, - StdVerifReporterWithSender, StdVerifSenderError, + MpscVerifSender, SharedStdVerifReporterWithSender, StdVerifReporterWithSender, + StdVerifSenderError, }; #[derive(Debug, Eq, PartialEq, Copy, Clone)] @@ -1091,11 +1093,13 @@ mod stdmod { /// Verification sender with a [crossbeam_channel::Sender] backend. /// It implements the [EcssTmSender] trait to be used as PUS Verification TM sender + #[cfg(feature = "crossbeam")] #[derive(Clone)] pub struct CrossbeamVerifSender { base: StdSenderBase>, } + #[cfg(feature = "crossbeam")] impl CrossbeamVerifSender { pub fn new(tm_store: SharedPool, tx: crossbeam_channel::Sender) -> Self { Self { @@ -1105,6 +1109,7 @@ mod stdmod { } //noinspection RsTraitImplementation + #[cfg(feature = "crossbeam")] impl EcssTmSender for CrossbeamVerifSender { type Error = StdVerifSenderError; @@ -1115,7 +1120,10 @@ mod stdmod { ); } + // TODO: Are those really necessary? Check with test.. + #[cfg(feature = "crossbeam")] unsafe impl Sync for CrossbeamVerifSender {} + #[cfg(feature = "crossbeam")] unsafe impl Send for CrossbeamVerifSender {} impl EcssTmSender for StdSenderBase { diff --git a/satrs-core/tests/pus_verification.rs b/satrs-core/tests/pus_verification.rs index 13bd05c..dac580a 100644 --- a/satrs-core/tests/pus_verification.rs +++ b/satrs-core/tests/pus_verification.rs @@ -1,193 +1,198 @@ -use hashbrown::HashMap; -use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; -use satrs_core::pus::verification::{ - CrossbeamVerifSender, FailParams, RequestId, VerificationReporterCfg, - VerificationReporterWithSender, -}; -use satrs_core::seq_count::SyncSeqCountProvider; -use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket}; -use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; -use spacepackets::tm::PusTm; -use spacepackets::SpHeader; -use std::sync::{Arc, RwLock}; -use std::thread; -use std::time::Duration; +#[cfg(feature = "crossbeam")] +pub mod crossbeam_test { + use hashbrown::HashMap; + use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; + use satrs_core::pus::verification::{ + CrossbeamVerifSender, FailParams, RequestId, VerificationReporterCfg, + VerificationReporterWithSender, + }; + use satrs_core::seq_count::SyncSeqCountProvider; + use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket}; + use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; + use spacepackets::tm::PusTm; + use spacepackets::SpHeader; + use std::sync::{Arc, RwLock}; + use std::thread; + use std::time::Duration; -const TEST_APID: u16 = 0x03; -const FIXED_STAMP: [u8; 7] = [0; 7]; -const PACKETS_SENT: u8 = 8; + const TEST_APID: u16 = 0x03; + const FIXED_STAMP: [u8; 7] = [0; 7]; + const PACKETS_SENT: u8 = 8; -/// This test also shows how the verification report could be used in a multi-threaded context, -/// wrapping it into an [Arc] and [Mutex] and then passing it to two threads. -/// -/// - The first thread generates a acceptance, a start, two steps and one completion report -/// - The second generates an acceptance and start success report and a completion failure -/// - The third thread is the verification receiver. In the test case, it verifies the other two -/// threads have sent the correct expected verification reports -#[test] -fn test_shared_reporter() { - // We use a synced sequence count provider here because both verification reporters have the - // the same APID. If they had distinct APIDs, the more correct approach would be to have - // each reporter have an own sequence count provider. - let cfg = VerificationReporterCfg::new( - TEST_APID, - Box::new(SyncSeqCountProvider::default()), - 1, - 2, - 8, - ) - .unwrap(); - // Shared pool object to store the verification PUS telemetry - let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); - let shared_tm_pool: SharedPool = - Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); - let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); - let shared_tc_pool_1 = shared_tc_pool_0.clone(); - let (tx, rx) = crossbeam_channel::bounded(5); - let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), tx.clone()); - let mut reporter_with_sender_0 = VerificationReporterWithSender::new(&cfg, Box::new(sender)); - let mut reporter_with_sender_1 = reporter_with_sender_0.clone(); - // For test purposes, we retrieve the request ID from the TCs and pass them to the receiver - // tread. - let req_id_0; - let req_id_1; + /// This test also shows how the verification report could be used in a multi-threaded context, + /// wrapping it into an [Arc] and [Mutex] and then passing it to two threads. + /// + /// - The first thread generates a acceptance, a start, two steps and one completion report + /// - The second generates an acceptance and start success report and a completion failure + /// - The third thread is the verification receiver. In the test case, it verifies the other two + /// threads have sent the correct expected verification reports + #[test] + fn test_shared_reporter() { + // We use a synced sequence count provider here because both verification reporters have the + // the same APID. If they had distinct APIDs, the more correct approach would be to have + // each reporter have an own sequence count provider. + let cfg = VerificationReporterCfg::new( + TEST_APID, + Box::new(SyncSeqCountProvider::default()), + 1, + 2, + 8, + ) + .unwrap(); + // Shared pool object to store the verification PUS telemetry + let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); + let shared_tm_pool: SharedPool = + Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); + let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); + let shared_tc_pool_1 = shared_tc_pool_0.clone(); + let (tx, rx) = crossbeam_channel::bounded(5); + let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), tx.clone()); + let mut reporter_with_sender_0 = + VerificationReporterWithSender::new(&cfg, Box::new(sender)); + let mut reporter_with_sender_1 = reporter_with_sender_0.clone(); + // For test purposes, we retrieve the request ID from the TCs and pass them to the receiver + // tread. + let req_id_0; + let req_id_1; - let (tx_tc_0, rx_tc_0) = crossbeam_channel::bounded(3); - let (tx_tc_1, rx_tc_1) = crossbeam_channel::bounded(3); - { - let mut tc_guard = shared_tc_pool_0.write().unwrap(); - let mut sph = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap(); - let tc_header = PusTcSecondaryHeader::new_simple(17, 1); - let pus_tc_0 = PusTc::new(&mut sph, tc_header, None, true); - req_id_0 = RequestId::new(&pus_tc_0); - let (addr, mut buf) = tc_guard.free_element(pus_tc_0.len_packed()).unwrap(); - pus_tc_0.write_to_bytes(&mut buf).unwrap(); - tx_tc_0.send(addr).unwrap(); - let mut sph = SpHeader::tc_unseg(TEST_APID, 1, 0).unwrap(); - let tc_header = PusTcSecondaryHeader::new_simple(5, 1); - let pus_tc_1 = PusTc::new(&mut sph, tc_header, None, true); - req_id_1 = RequestId::new(&pus_tc_1); - let (addr, mut buf) = tc_guard.free_element(pus_tc_0.len_packed()).unwrap(); - pus_tc_1.write_to_bytes(&mut buf).unwrap(); - tx_tc_1.send(addr).unwrap(); - } - let verif_sender_0 = thread::spawn(move || { - let mut tc_buf: [u8; 1024] = [0; 1024]; - let tc_addr = rx_tc_0 - .recv_timeout(Duration::from_millis(20)) - .expect("Receive timeout"); - let tc_len; + let (tx_tc_0, rx_tc_0) = crossbeam_channel::bounded(3); + let (tx_tc_1, rx_tc_1) = crossbeam_channel::bounded(3); { let mut tc_guard = shared_tc_pool_0.write().unwrap(); - let pg = tc_guard.read_with_guard(tc_addr); - let buf = pg.read().unwrap(); - tc_len = buf.len(); - tc_buf[0..tc_len].copy_from_slice(buf); + let mut sph = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap(); + let tc_header = PusTcSecondaryHeader::new_simple(17, 1); + let pus_tc_0 = PusTc::new(&mut sph, tc_header, None, true); + req_id_0 = RequestId::new(&pus_tc_0); + let (addr, mut buf) = tc_guard.free_element(pus_tc_0.len_packed()).unwrap(); + pus_tc_0.write_to_bytes(&mut buf).unwrap(); + tx_tc_0.send(addr).unwrap(); + let mut sph = SpHeader::tc_unseg(TEST_APID, 1, 0).unwrap(); + let tc_header = PusTcSecondaryHeader::new_simple(5, 1); + let pus_tc_1 = PusTc::new(&mut sph, tc_header, None, true); + req_id_1 = RequestId::new(&pus_tc_1); + let (addr, mut buf) = tc_guard.free_element(pus_tc_0.len_packed()).unwrap(); + pus_tc_1.write_to_bytes(&mut buf).unwrap(); + tx_tc_1.send(addr).unwrap(); } - let (_tc, _) = PusTc::from_bytes(&tc_buf[0..tc_len]).unwrap(); - let accepted_token; - - let token = reporter_with_sender_0.add_tc_with_req_id(req_id_0); - accepted_token = reporter_with_sender_0 - .acceptance_success(token, &FIXED_STAMP) - .expect("Acceptance success failed"); - - // Do some start handling here - let started_token; - started_token = reporter_with_sender_0 - .start_success(accepted_token, &FIXED_STAMP) - .expect("Start success failed"); - // Do some step handling here - reporter_with_sender_0 - .step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(0)) - .expect("Start success failed"); - - // Finish up - reporter_with_sender_0 - .step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(1)) - .expect("Start success failed"); - reporter_with_sender_0 - .completion_success(started_token, &FIXED_STAMP) - .expect("Completion success failed"); - }); - - let verif_sender_1 = thread::spawn(move || { - let mut tc_buf: [u8; 1024] = [0; 1024]; - let tc_addr = rx_tc_1 - .recv_timeout(Duration::from_millis(20)) - .expect("Receive timeout"); - let tc_len; - { - let mut tc_guard = shared_tc_pool_1.write().unwrap(); - let pg = tc_guard.read_with_guard(tc_addr); - let buf = pg.read().unwrap(); - tc_len = buf.len(); - tc_buf[0..tc_len].copy_from_slice(buf); - } - let (tc, _) = PusTc::from_bytes(&tc_buf[0..tc_len]).unwrap(); - let token = reporter_with_sender_1.add_tc(&tc); - let accepted_token = reporter_with_sender_1 - .acceptance_success(token, &FIXED_STAMP) - .expect("Acceptance success failed"); - let started_token = reporter_with_sender_1 - .start_success(accepted_token, &FIXED_STAMP) - .expect("Start success failed"); - let fail_code = EcssEnumU16::new(2); - let params = FailParams::new(&FIXED_STAMP, &fail_code, None); - reporter_with_sender_1 - .completion_failure(started_token, params) - .expect("Completion success failed"); - }); - - let verif_receiver = thread::spawn(move || { - let mut packet_counter = 0; - let mut tm_buf: [u8; 1024] = [0; 1024]; - let mut verif_map = HashMap::new(); - while packet_counter < PACKETS_SENT { - let verif_addr = rx - .recv_timeout(Duration::from_millis(50)) - .expect("Packet reception timeout"); - let tm_len; + let verif_sender_0 = thread::spawn(move || { + let mut tc_buf: [u8; 1024] = [0; 1024]; + let tc_addr = rx_tc_0 + .recv_timeout(Duration::from_millis(20)) + .expect("Receive timeout"); + let tc_len; { - let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); - let store_guard = rg.read_with_guard(verif_addr); - let slice = store_guard.read().expect("Error reading TM slice"); - tm_len = slice.len(); - tm_buf[0..tm_len].copy_from_slice(slice); + let mut tc_guard = shared_tc_pool_0.write().unwrap(); + let pg = tc_guard.read_with_guard(tc_addr); + let buf = pg.read().unwrap(); + tc_len = buf.len(); + tc_buf[0..tc_len].copy_from_slice(buf); } - let (pus_tm, _) = - PusTm::from_bytes(&tm_buf[0..tm_len], 7).expect("Error reading verification TM"); - let req_id = RequestId::from_bytes( - &pus_tm.source_data().expect("Invalid TM source data")[0..RequestId::SIZE_AS_BYTES], - ) - .unwrap(); - if !verif_map.contains_key(&req_id) { - let mut content = Vec::new(); - content.push(pus_tm.subservice()); - verif_map.insert(req_id, content); - } else { - let content = verif_map.get_mut(&req_id).unwrap(); - content.push(pus_tm.subservice()) + let (_tc, _) = PusTc::from_bytes(&tc_buf[0..tc_len]).unwrap(); + let accepted_token; + + let token = reporter_with_sender_0.add_tc_with_req_id(req_id_0); + accepted_token = reporter_with_sender_0 + .acceptance_success(token, &FIXED_STAMP) + .expect("Acceptance success failed"); + + // Do some start handling here + let started_token; + started_token = reporter_with_sender_0 + .start_success(accepted_token, &FIXED_STAMP) + .expect("Start success failed"); + // Do some step handling here + reporter_with_sender_0 + .step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(0)) + .expect("Start success failed"); + + // Finish up + reporter_with_sender_0 + .step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(1)) + .expect("Start success failed"); + reporter_with_sender_0 + .completion_success(started_token, &FIXED_STAMP) + .expect("Completion success failed"); + }); + + let verif_sender_1 = thread::spawn(move || { + let mut tc_buf: [u8; 1024] = [0; 1024]; + let tc_addr = rx_tc_1 + .recv_timeout(Duration::from_millis(20)) + .expect("Receive timeout"); + let tc_len; + { + let mut tc_guard = shared_tc_pool_1.write().unwrap(); + let pg = tc_guard.read_with_guard(tc_addr); + let buf = pg.read().unwrap(); + tc_len = buf.len(); + tc_buf[0..tc_len].copy_from_slice(buf); } - packet_counter += 1; - } - for (req_id, content) in verif_map { - if req_id == req_id_1 { - assert_eq!(content[0], 1); - assert_eq!(content[1], 3); - assert_eq!(content[2], 8); - } else if req_id == req_id_0 { - assert_eq!(content[0], 1); - assert_eq!(content[1], 3); - assert_eq!(content[2], 5); - assert_eq!(content[3], 5); - assert_eq!(content[4], 7); - } else { - panic!("Unexpected request ID {:?}", req_id); + let (tc, _) = PusTc::from_bytes(&tc_buf[0..tc_len]).unwrap(); + let token = reporter_with_sender_1.add_tc(&tc); + let accepted_token = reporter_with_sender_1 + .acceptance_success(token, &FIXED_STAMP) + .expect("Acceptance success failed"); + let started_token = reporter_with_sender_1 + .start_success(accepted_token, &FIXED_STAMP) + .expect("Start success failed"); + let fail_code = EcssEnumU16::new(2); + let params = FailParams::new(&FIXED_STAMP, &fail_code, None); + reporter_with_sender_1 + .completion_failure(started_token, params) + .expect("Completion success failed"); + }); + + let verif_receiver = thread::spawn(move || { + let mut packet_counter = 0; + let mut tm_buf: [u8; 1024] = [0; 1024]; + let mut verif_map = HashMap::new(); + while packet_counter < PACKETS_SENT { + let verif_addr = rx + .recv_timeout(Duration::from_millis(50)) + .expect("Packet reception timeout"); + let tm_len; + { + let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); + let store_guard = rg.read_with_guard(verif_addr); + let slice = store_guard.read().expect("Error reading TM slice"); + tm_len = slice.len(); + tm_buf[0..tm_len].copy_from_slice(slice); + } + let (pus_tm, _) = PusTm::from_bytes(&tm_buf[0..tm_len], 7) + .expect("Error reading verification TM"); + let req_id = RequestId::from_bytes( + &pus_tm.source_data().expect("Invalid TM source data") + [0..RequestId::SIZE_AS_BYTES], + ) + .unwrap(); + if !verif_map.contains_key(&req_id) { + let mut content = Vec::new(); + content.push(pus_tm.subservice()); + verif_map.insert(req_id, content); + } else { + let content = verif_map.get_mut(&req_id).unwrap(); + content.push(pus_tm.subservice()) + } + packet_counter += 1; } - } - }); - verif_sender_0.join().expect("Joining thread 0 failed"); - verif_sender_1.join().expect("Joining thread 1 failed"); - verif_receiver.join().expect("Joining thread 2 failed"); + for (req_id, content) in verif_map { + if req_id == req_id_1 { + assert_eq!(content[0], 1); + assert_eq!(content[1], 3); + assert_eq!(content[2], 8); + } else if req_id == req_id_0 { + assert_eq!(content[0], 1); + assert_eq!(content[1], 3); + assert_eq!(content[2], 5); + assert_eq!(content[3], 5); + assert_eq!(content[4], 7); + } else { + panic!("Unexpected request ID {:?}", req_id); + } + } + }); + verif_sender_0.join().expect("Joining thread 0 failed"); + verif_sender_1.join().expect("Joining thread 1 failed"); + verif_receiver.join().expect("Joining thread 2 failed"); + } }