diff --git a/fsrc-core/src/pool.rs b/fsrc-core/src/pool.rs index af68d90..79100bb 100644 --- a/fsrc-core/src/pool.rs +++ b/fsrc-core/src/pool.rs @@ -195,7 +195,7 @@ impl LocalPool { } let addr = self.reserve(len)?; let raw_pos = self.raw_pos(&addr).unwrap(); - let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..len]; + let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + len]; Ok((addr, block)) } diff --git a/fsrc-core/src/pus/verification.rs b/fsrc-core/src/pus/verification.rs index 578986f..7c3e92c 100644 --- a/fsrc-core/src/pus/verification.rs +++ b/fsrc-core/src/pus/verification.rs @@ -26,11 +26,7 @@ use spacepackets::{ByteConversionError, SizeMissmatch, SpHeader}; use spacepackets::{CcsdsPacket, PacketId, PacketSequenceCtrl}; #[cfg(feature = "std")] -use std::sync::mpsc::SendError; -#[cfg(feature = "std")] -use std::sync::MutexGuard; -#[cfg(feature = "std")] -use std::sync::{mpsc, Mutex}; +use std::sync::{mpsc, RwLock, RwLockWriteGuard}; /// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard /// This field equivalent to the first two bytes of the CCSDS space packet header. @@ -42,12 +38,16 @@ pub struct RequestId { } impl RequestId { - const SIZE_AS_BYTES: usize = size_of::(); + pub const SIZE_AS_BYTES: usize = size_of::(); + + pub fn raw(&self) -> u32 { + ((self.version_number as u32) << 29) + | ((self.packet_id.raw() as u32) << 16) + | self.psc.raw() as u32 + } pub fn to_bytes(&self, buf: &mut [u8]) { - let raw = ((self.version_number as u32) << 29) - | ((self.packet_id.raw() as u32) << 16) - | self.psc.raw() as u32; + let raw = self.raw(); buf.copy_from_slice(raw.to_be_bytes().as_slice()); } @@ -533,17 +533,20 @@ impl VerificationReporter { /// API as [VerificationReporter] but without the explicit sender arguments. pub struct VerificationReporterWithSender { reporter: VerificationReporter, - pub sender: Box>, + pub sender: Box + Send>, } impl VerificationReporterWithSender { - pub fn new(cfg: VerificationReporterCfg, sender: Box>) -> Self { + pub fn new( + cfg: VerificationReporterCfg, + sender: Box + Send>, + ) -> Self { Self::new_from_reporter(VerificationReporter::new(cfg), sender) } pub fn new_from_reporter( reporter: VerificationReporter, - sender: Box>, + sender: Box + Send>, ) -> Self { Self { reporter, sender } } @@ -632,13 +635,13 @@ impl VerificationReporterWithSender { #[cfg(feature = "std")] pub struct StdVerifSender { pub ignore_poison_error: bool, - tm_store: Arc>, + tm_store: Arc>, tx: mpsc::Sender, } #[cfg(feature = "std")] impl StdVerifSender { - pub fn new(tm_store: Arc>, tx: mpsc::Sender) -> Self { + pub fn new(tm_store: Arc>, tx: mpsc::Sender) -> Self { Self { ignore_poison_error: true, tx, @@ -647,12 +650,17 @@ impl StdVerifSender { } } +#[cfg(feature = "std")] +unsafe impl Sync for StdVerifSender {} +#[cfg(feature = "std")] +unsafe impl Send for StdVerifSender {} + #[cfg(feature = "std")] #[derive(Debug, Eq, PartialEq)] pub enum StdVerifSenderError { PoisonError, StoreError(StoreError), - SendError(SendError), + RxDisconnected(StoreAddr), } #[cfg(feature = "std")] @@ -661,17 +669,17 @@ impl VerificationSender for StdVerifSender { &mut self, tm: PusTm, ) -> Result<(), VerificationError> { - let operation = |mut mg: MutexGuard| { + let operation = |mut mg: RwLockWriteGuard| { let (addr, buf) = mg .free_element(tm.len_packed()) .map_err(|e| VerificationError::SendError(StdVerifSenderError::StoreError(e)))?; tm.write_to(buf).map_err(VerificationError::PusError)?; - self.tx - .send(addr) - .map_err(|e| VerificationError::SendError(StdVerifSenderError::SendError(e)))?; + self.tx.send(addr).map_err(|_| { + VerificationError::SendError(StdVerifSenderError::RxDisconnected(addr)) + })?; Ok(()) }; - match self.tm_store.lock() { + match self.tm_store.write() { Ok(lock) => operation(lock), Err(poison_error) => { if self.ignore_poison_error { diff --git a/fsrc-core/tests/pool_test.rs b/fsrc-core/tests/pool_test.rs index f06f302..e116130 100644 --- a/fsrc-core/tests/pool_test.rs +++ b/fsrc-core/tests/pool_test.rs @@ -10,11 +10,11 @@ const DUMMY_DATA: [u8; 4] = [0, 1, 2, 3]; #[test] fn threaded_usage() { let pool_cfg = PoolCfg::new(vec![(16, 6), (32, 3), (8, 12)]); - let shared_dummy = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); - let shared_clone = shared_dummy.clone(); + let shared_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); + let shared_clone = shared_pool.clone(); let (tx, rx): (Sender, Receiver) = mpsc::channel(); let jh0 = thread::spawn(move || { - let mut dummy = shared_dummy.write().unwrap(); + let mut dummy = shared_pool.write().unwrap(); let addr = dummy.add(&DUMMY_DATA).expect("Writing data failed"); tx.send(addr).expect("Sending store address failed"); }); diff --git a/fsrc-core/tests/verification_test.rs b/fsrc-core/tests/verification_test.rs index 8b13789..f80efb8 100644 --- a/fsrc-core/tests/verification_test.rs +++ b/fsrc-core/tests/verification_test.rs @@ -1 +1,111 @@ +use fsrc_core::pool::{LocalPool, PoolCfg}; +use fsrc_core::pus::verification::{ + FailParams, RequestId, StdVerifSender, VerificationReporter, VerificationReporterCfg, +}; +use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket}; +use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; +use spacepackets::tm::PusTm; +use spacepackets::SpHeader; +use std::sync::atomic::{AtomicU16, Ordering}; +use std::sync::{mpsc, Arc, Mutex, RwLock}; +use std::thread; +const TEST_APID: u16 = 0x03; +const FIXED_STAMP: [u8; 7] = [0; 7]; +const PACKETS_SENT: u8 = 8; + +#[test] +fn test_shared_reporter() { + let seq_counter_0 = Arc::new(AtomicU16::new(0)); + let seq_counter_1 = seq_counter_0.clone(); + let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8); + // Shared pool object to store the verification PUS telemetry + let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); + let shared_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); + let (tx, rx) = mpsc::channel(); + let mut sender_0 = StdVerifSender::new(shared_pool.clone(), tx.clone()); + let mut sender_1 = StdVerifSender::new(shared_pool.clone(), tx); + let reporter_0 = Arc::new(Mutex::new(VerificationReporter::new(cfg))); + let reporter_1 = reporter_0.clone(); + + let verif_sender_0 = thread::spawn(move || { + let mut sph = + SpHeader::tc(TEST_APID, seq_counter_0.fetch_add(1, Ordering::SeqCst), 0).unwrap(); + let tc_header = PusTcSecondaryHeader::new_simple(17, 1); + let pus_tc = PusTc::new(&mut sph, tc_header, None, true); + let mut mg = reporter_0.lock().unwrap(); + let token = mg.add_tc(&pus_tc); + let accepted_token = mg + .acceptance_success(token, &mut sender_0, &FIXED_STAMP) + .expect("Acceptance success failed"); + let started_token = mg + .start_success(accepted_token, &mut sender_0, &FIXED_STAMP) + .expect("Start success failed"); + mg.step_success( + &started_token, + &mut sender_0, + &FIXED_STAMP, + EcssEnumU8::new(0), + ) + .expect("Start success failed"); + mg.step_success( + &started_token, + &mut sender_0, + &FIXED_STAMP, + EcssEnumU8::new(1), + ) + .expect("Start success failed"); + mg.completion_success(started_token, &mut sender_0, &FIXED_STAMP) + .expect("Completion success failed"); + }); + + let verif_sender_1 = thread::spawn(move || { + let mut sph = + SpHeader::tc(TEST_APID, seq_counter_1.fetch_add(1, Ordering::SeqCst), 0).unwrap(); + let tc_header = PusTcSecondaryHeader::new_simple(5, 1); + let pus_tc = PusTc::new(&mut sph, tc_header, None, true); + let mut mg = reporter_1.lock().unwrap(); + let token = mg.add_tc(&pus_tc); + let accepted_token = mg + .acceptance_success(token, &mut sender_1, &FIXED_STAMP) + .expect("Acceptance success failed"); + let started_token = mg + .start_success(accepted_token, &mut sender_1, &FIXED_STAMP) + .expect("Start success failed"); + let fail_code = EcssEnumU16::new(2); + let params = FailParams::new(&FIXED_STAMP, &fail_code, None); + mg.completion_failure(started_token, &mut sender_1, params) + .expect("Completion success failed"); + }); + + let verif_receiver = thread::spawn(move || { + let mut packet_counter = 0; + let mut tm_buf: [u8; 1024] = [0; 1024]; + while packet_counter < PACKETS_SENT { + let verif_addr = rx.recv().expect("Error receiving verification packet"); + let mut rg = shared_pool.write().expect("Error locking shared pool"); + let store_guard = rg.read_with_guard(verif_addr); + let slice = store_guard.read().expect("Error reading TM slice"); + let tm_len = slice.len(); + tm_buf[0..tm_len].copy_from_slice(slice); + drop(store_guard); + drop(rg); + let (pus_tm, _) = PusTm::new_from_raw_slice(&tm_buf[0..tm_len], 7) + .expect("Error reading verification TM"); + let req_id = RequestId::from_bytes( + &pus_tm.source_data().expect("Invalid TM source data")[0..RequestId::SIZE_AS_BYTES], + ) + .unwrap(); + println!( + "Received PUS Verification TM[{},{}] for request ID {:#08x}", + pus_tm.service(), + pus_tm.subservice(), + req_id.raw() + ); + packet_counter += 1; + } + }); + verif_sender_0.join().expect("Joining thread 0 failed"); + verif_sender_1.join().expect("Joining thread 1 failed"); + verif_receiver.join().expect("Joining thread 2 failed") +}