basic shared verificator test
This commit is contained in:
parent
ac56fbae99
commit
df7282df25
@ -195,7 +195,7 @@ impl LocalPool {
|
|||||||
}
|
}
|
||||||
let addr = self.reserve(len)?;
|
let addr = self.reserve(len)?;
|
||||||
let raw_pos = self.raw_pos(&addr).unwrap();
|
let raw_pos = self.raw_pos(&addr).unwrap();
|
||||||
let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..len];
|
let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + len];
|
||||||
Ok((addr, block))
|
Ok((addr, block))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,11 +26,7 @@ use spacepackets::{ByteConversionError, SizeMissmatch, SpHeader};
|
|||||||
use spacepackets::{CcsdsPacket, PacketId, PacketSequenceCtrl};
|
use spacepackets::{CcsdsPacket, PacketId, PacketSequenceCtrl};
|
||||||
|
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
use std::sync::mpsc::SendError;
|
use std::sync::{mpsc, RwLock, RwLockWriteGuard};
|
||||||
#[cfg(feature = "std")]
|
|
||||||
use std::sync::MutexGuard;
|
|
||||||
#[cfg(feature = "std")]
|
|
||||||
use std::sync::{mpsc, Mutex};
|
|
||||||
|
|
||||||
/// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard
|
/// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard
|
||||||
/// This field equivalent to the first two bytes of the CCSDS space packet header.
|
/// This field equivalent to the first two bytes of the CCSDS space packet header.
|
||||||
@ -42,12 +38,16 @@ pub struct RequestId {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RequestId {
|
impl RequestId {
|
||||||
const SIZE_AS_BYTES: usize = size_of::<u32>();
|
pub const SIZE_AS_BYTES: usize = size_of::<u32>();
|
||||||
|
|
||||||
|
pub fn raw(&self) -> u32 {
|
||||||
|
((self.version_number as u32) << 29)
|
||||||
|
| ((self.packet_id.raw() as u32) << 16)
|
||||||
|
| self.psc.raw() as u32
|
||||||
|
}
|
||||||
|
|
||||||
pub fn to_bytes(&self, buf: &mut [u8]) {
|
pub fn to_bytes(&self, buf: &mut [u8]) {
|
||||||
let raw = ((self.version_number as u32) << 29)
|
let raw = self.raw();
|
||||||
| ((self.packet_id.raw() as u32) << 16)
|
|
||||||
| self.psc.raw() as u32;
|
|
||||||
buf.copy_from_slice(raw.to_be_bytes().as_slice());
|
buf.copy_from_slice(raw.to_be_bytes().as_slice());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -533,17 +533,20 @@ impl VerificationReporter {
|
|||||||
/// API as [VerificationReporter] but without the explicit sender arguments.
|
/// API as [VerificationReporter] but without the explicit sender arguments.
|
||||||
pub struct VerificationReporterWithSender<E> {
|
pub struct VerificationReporterWithSender<E> {
|
||||||
reporter: VerificationReporter,
|
reporter: VerificationReporter,
|
||||||
pub sender: Box<dyn VerificationSender<E>>,
|
pub sender: Box<dyn VerificationSender<E> + Send>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E: 'static> VerificationReporterWithSender<E> {
|
impl<E: 'static> VerificationReporterWithSender<E> {
|
||||||
pub fn new(cfg: VerificationReporterCfg, sender: Box<dyn VerificationSender<E>>) -> Self {
|
pub fn new(
|
||||||
|
cfg: VerificationReporterCfg,
|
||||||
|
sender: Box<dyn VerificationSender<E> + Send>,
|
||||||
|
) -> Self {
|
||||||
Self::new_from_reporter(VerificationReporter::new(cfg), sender)
|
Self::new_from_reporter(VerificationReporter::new(cfg), sender)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_from_reporter(
|
pub fn new_from_reporter(
|
||||||
reporter: VerificationReporter,
|
reporter: VerificationReporter,
|
||||||
sender: Box<dyn VerificationSender<E>>,
|
sender: Box<dyn VerificationSender<E> + Send>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self { reporter, sender }
|
Self { reporter, sender }
|
||||||
}
|
}
|
||||||
@ -632,13 +635,13 @@ impl<E: 'static> VerificationReporterWithSender<E> {
|
|||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
pub struct StdVerifSender {
|
pub struct StdVerifSender {
|
||||||
pub ignore_poison_error: bool,
|
pub ignore_poison_error: bool,
|
||||||
tm_store: Arc<Mutex<LocalPool>>,
|
tm_store: Arc<RwLock<LocalPool>>,
|
||||||
tx: mpsc::Sender<StoreAddr>,
|
tx: mpsc::Sender<StoreAddr>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
impl StdVerifSender {
|
impl StdVerifSender {
|
||||||
pub fn new(tm_store: Arc<Mutex<LocalPool>>, tx: mpsc::Sender<StoreAddr>) -> Self {
|
pub fn new(tm_store: Arc<RwLock<LocalPool>>, tx: mpsc::Sender<StoreAddr>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
ignore_poison_error: true,
|
ignore_poison_error: true,
|
||||||
tx,
|
tx,
|
||||||
@ -647,12 +650,17 @@ impl StdVerifSender {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "std")]
|
||||||
|
unsafe impl Sync for StdVerifSender {}
|
||||||
|
#[cfg(feature = "std")]
|
||||||
|
unsafe impl Send for StdVerifSender {}
|
||||||
|
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
#[derive(Debug, Eq, PartialEq)]
|
#[derive(Debug, Eq, PartialEq)]
|
||||||
pub enum StdVerifSenderError {
|
pub enum StdVerifSenderError {
|
||||||
PoisonError,
|
PoisonError,
|
||||||
StoreError(StoreError),
|
StoreError(StoreError),
|
||||||
SendError(SendError<StoreAddr>),
|
RxDisconnected(StoreAddr),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
@ -661,17 +669,17 @@ impl VerificationSender<StdVerifSenderError> for StdVerifSender {
|
|||||||
&mut self,
|
&mut self,
|
||||||
tm: PusTm,
|
tm: PusTm,
|
||||||
) -> Result<(), VerificationError<StdVerifSenderError>> {
|
) -> Result<(), VerificationError<StdVerifSenderError>> {
|
||||||
let operation = |mut mg: MutexGuard<LocalPool>| {
|
let operation = |mut mg: RwLockWriteGuard<LocalPool>| {
|
||||||
let (addr, buf) = mg
|
let (addr, buf) = mg
|
||||||
.free_element(tm.len_packed())
|
.free_element(tm.len_packed())
|
||||||
.map_err(|e| VerificationError::SendError(StdVerifSenderError::StoreError(e)))?;
|
.map_err(|e| VerificationError::SendError(StdVerifSenderError::StoreError(e)))?;
|
||||||
tm.write_to(buf).map_err(VerificationError::PusError)?;
|
tm.write_to(buf).map_err(VerificationError::PusError)?;
|
||||||
self.tx
|
self.tx.send(addr).map_err(|_| {
|
||||||
.send(addr)
|
VerificationError::SendError(StdVerifSenderError::RxDisconnected(addr))
|
||||||
.map_err(|e| VerificationError::SendError(StdVerifSenderError::SendError(e)))?;
|
})?;
|
||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
match self.tm_store.lock() {
|
match self.tm_store.write() {
|
||||||
Ok(lock) => operation(lock),
|
Ok(lock) => operation(lock),
|
||||||
Err(poison_error) => {
|
Err(poison_error) => {
|
||||||
if self.ignore_poison_error {
|
if self.ignore_poison_error {
|
||||||
|
@ -10,11 +10,11 @@ const DUMMY_DATA: [u8; 4] = [0, 1, 2, 3];
|
|||||||
#[test]
|
#[test]
|
||||||
fn threaded_usage() {
|
fn threaded_usage() {
|
||||||
let pool_cfg = PoolCfg::new(vec![(16, 6), (32, 3), (8, 12)]);
|
let pool_cfg = PoolCfg::new(vec![(16, 6), (32, 3), (8, 12)]);
|
||||||
let shared_dummy = Arc::new(RwLock::new(LocalPool::new(pool_cfg)));
|
let shared_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg)));
|
||||||
let shared_clone = shared_dummy.clone();
|
let shared_clone = shared_pool.clone();
|
||||||
let (tx, rx): (Sender<StoreAddr>, Receiver<StoreAddr>) = mpsc::channel();
|
let (tx, rx): (Sender<StoreAddr>, Receiver<StoreAddr>) = mpsc::channel();
|
||||||
let jh0 = thread::spawn(move || {
|
let jh0 = thread::spawn(move || {
|
||||||
let mut dummy = shared_dummy.write().unwrap();
|
let mut dummy = shared_pool.write().unwrap();
|
||||||
let addr = dummy.add(&DUMMY_DATA).expect("Writing data failed");
|
let addr = dummy.add(&DUMMY_DATA).expect("Writing data failed");
|
||||||
tx.send(addr).expect("Sending store address failed");
|
tx.send(addr).expect("Sending store address failed");
|
||||||
});
|
});
|
||||||
|
@ -1 +1,111 @@
|
|||||||
|
use fsrc_core::pool::{LocalPool, PoolCfg};
|
||||||
|
use fsrc_core::pus::verification::{
|
||||||
|
FailParams, RequestId, StdVerifSender, VerificationReporter, VerificationReporterCfg,
|
||||||
|
};
|
||||||
|
use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket};
|
||||||
|
use spacepackets::tc::{PusTc, PusTcSecondaryHeader};
|
||||||
|
use spacepackets::tm::PusTm;
|
||||||
|
use spacepackets::SpHeader;
|
||||||
|
use std::sync::atomic::{AtomicU16, Ordering};
|
||||||
|
use std::sync::{mpsc, Arc, Mutex, RwLock};
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
const TEST_APID: u16 = 0x03;
|
||||||
|
const FIXED_STAMP: [u8; 7] = [0; 7];
|
||||||
|
const PACKETS_SENT: u8 = 8;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_shared_reporter() {
|
||||||
|
let seq_counter_0 = Arc::new(AtomicU16::new(0));
|
||||||
|
let seq_counter_1 = seq_counter_0.clone();
|
||||||
|
let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8);
|
||||||
|
// Shared pool object to store the verification PUS telemetry
|
||||||
|
let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]);
|
||||||
|
let shared_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg)));
|
||||||
|
let (tx, rx) = mpsc::channel();
|
||||||
|
let mut sender_0 = StdVerifSender::new(shared_pool.clone(), tx.clone());
|
||||||
|
let mut sender_1 = StdVerifSender::new(shared_pool.clone(), tx);
|
||||||
|
let reporter_0 = Arc::new(Mutex::new(VerificationReporter::new(cfg)));
|
||||||
|
let reporter_1 = reporter_0.clone();
|
||||||
|
|
||||||
|
let verif_sender_0 = thread::spawn(move || {
|
||||||
|
let mut sph =
|
||||||
|
SpHeader::tc(TEST_APID, seq_counter_0.fetch_add(1, Ordering::SeqCst), 0).unwrap();
|
||||||
|
let tc_header = PusTcSecondaryHeader::new_simple(17, 1);
|
||||||
|
let pus_tc = PusTc::new(&mut sph, tc_header, None, true);
|
||||||
|
let mut mg = reporter_0.lock().unwrap();
|
||||||
|
let token = mg.add_tc(&pus_tc);
|
||||||
|
let accepted_token = mg
|
||||||
|
.acceptance_success(token, &mut sender_0, &FIXED_STAMP)
|
||||||
|
.expect("Acceptance success failed");
|
||||||
|
let started_token = mg
|
||||||
|
.start_success(accepted_token, &mut sender_0, &FIXED_STAMP)
|
||||||
|
.expect("Start success failed");
|
||||||
|
mg.step_success(
|
||||||
|
&started_token,
|
||||||
|
&mut sender_0,
|
||||||
|
&FIXED_STAMP,
|
||||||
|
EcssEnumU8::new(0),
|
||||||
|
)
|
||||||
|
.expect("Start success failed");
|
||||||
|
mg.step_success(
|
||||||
|
&started_token,
|
||||||
|
&mut sender_0,
|
||||||
|
&FIXED_STAMP,
|
||||||
|
EcssEnumU8::new(1),
|
||||||
|
)
|
||||||
|
.expect("Start success failed");
|
||||||
|
mg.completion_success(started_token, &mut sender_0, &FIXED_STAMP)
|
||||||
|
.expect("Completion success failed");
|
||||||
|
});
|
||||||
|
|
||||||
|
let verif_sender_1 = thread::spawn(move || {
|
||||||
|
let mut sph =
|
||||||
|
SpHeader::tc(TEST_APID, seq_counter_1.fetch_add(1, Ordering::SeqCst), 0).unwrap();
|
||||||
|
let tc_header = PusTcSecondaryHeader::new_simple(5, 1);
|
||||||
|
let pus_tc = PusTc::new(&mut sph, tc_header, None, true);
|
||||||
|
let mut mg = reporter_1.lock().unwrap();
|
||||||
|
let token = mg.add_tc(&pus_tc);
|
||||||
|
let accepted_token = mg
|
||||||
|
.acceptance_success(token, &mut sender_1, &FIXED_STAMP)
|
||||||
|
.expect("Acceptance success failed");
|
||||||
|
let started_token = mg
|
||||||
|
.start_success(accepted_token, &mut sender_1, &FIXED_STAMP)
|
||||||
|
.expect("Start success failed");
|
||||||
|
let fail_code = EcssEnumU16::new(2);
|
||||||
|
let params = FailParams::new(&FIXED_STAMP, &fail_code, None);
|
||||||
|
mg.completion_failure(started_token, &mut sender_1, params)
|
||||||
|
.expect("Completion success failed");
|
||||||
|
});
|
||||||
|
|
||||||
|
let verif_receiver = thread::spawn(move || {
|
||||||
|
let mut packet_counter = 0;
|
||||||
|
let mut tm_buf: [u8; 1024] = [0; 1024];
|
||||||
|
while packet_counter < PACKETS_SENT {
|
||||||
|
let verif_addr = rx.recv().expect("Error receiving verification packet");
|
||||||
|
let mut rg = shared_pool.write().expect("Error locking shared pool");
|
||||||
|
let store_guard = rg.read_with_guard(verif_addr);
|
||||||
|
let slice = store_guard.read().expect("Error reading TM slice");
|
||||||
|
let tm_len = slice.len();
|
||||||
|
tm_buf[0..tm_len].copy_from_slice(slice);
|
||||||
|
drop(store_guard);
|
||||||
|
drop(rg);
|
||||||
|
let (pus_tm, _) = PusTm::new_from_raw_slice(&tm_buf[0..tm_len], 7)
|
||||||
|
.expect("Error reading verification TM");
|
||||||
|
let req_id = RequestId::from_bytes(
|
||||||
|
&pus_tm.source_data().expect("Invalid TM source data")[0..RequestId::SIZE_AS_BYTES],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
println!(
|
||||||
|
"Received PUS Verification TM[{},{}] for request ID {:#08x}",
|
||||||
|
pus_tm.service(),
|
||||||
|
pus_tm.subservice(),
|
||||||
|
req_id.raw()
|
||||||
|
);
|
||||||
|
packet_counter += 1;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
verif_sender_0.join().expect("Joining thread 0 failed");
|
||||||
|
verif_sender_1.join().expect("Joining thread 1 failed");
|
||||||
|
verif_receiver.join().expect("Joining thread 2 failed")
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user