support cross-beam channel backend

This commit is contained in:
Robin Müller 2022-09-07 11:02:45 +02:00
parent df7282df25
commit 01c3440c07
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
5 changed files with 228 additions and 88 deletions

1
Cargo.lock generated
View File

@ -223,6 +223,7 @@ name = "fsrc-core"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"bus", "bus",
"crossbeam-channel",
"delegate", "delegate",
"downcast-rs", "downcast-rs",
"hashbrown", "hashbrown",

View File

@ -22,17 +22,23 @@ default-features = false
version = "2.2.3" version = "2.2.3"
optional = true optional = true
[dependencies.crossbeam-channel]
version= "0.5"
default-features = false
[dependencies.spacepackets] [dependencies.spacepackets]
path = "../spacepackets" path = "../spacepackets"
[dev-dependencies] [dev-dependencies]
postcard = { version = "1.0.1", features = ["use-std"] }
serde = "1.0.143" serde = "1.0.143"
zerocopy = "0.6.1" zerocopy = "0.6.1"
once_cell = "1.13.1" once_cell = "1.13.1"
[dev-dependencies.postcard]
version = "1.0.1"
[features] [features]
default = ["std"] default = ["std"]
std = ["downcast-rs/std", "alloc", "bus"] std = ["downcast-rs/std", "alloc", "bus", "postcard/use-std", "crossbeam-channel/std"]
alloc = [] alloc = []

View File

@ -87,6 +87,7 @@ type NumBlocks = u16;
/// ///
/// * `cfg`: Vector of tuples which represent a subpool. The first entry in the tuple specifies the /// * `cfg`: Vector of tuples which represent a subpool. The first entry in the tuple specifies the
/// number of memory blocks in the subpool, the second entry the size of the blocks /// number of memory blocks in the subpool, the second entry the size of the blocks
#[derive(Clone)]
pub struct PoolCfg { pub struct PoolCfg {
cfg: Vec<(NumBlocks, usize)>, cfg: Vec<(NumBlocks, usize)>,
} }

View File

@ -9,11 +9,10 @@
//! //!
//! # Example //! # Example
//! TODO: Cross Ref integration test which will be provided //! TODO: Cross Ref integration test which will be provided
use crate::pool::{LocalPool, StoreAddr, StoreError};
use alloc::boxed::Box; use alloc::boxed::Box;
use alloc::sync::Arc;
use alloc::vec; use alloc::vec;
use alloc::vec::Vec; use alloc::vec::Vec;
use core::hash::{Hash, Hasher};
use core::marker::PhantomData; use core::marker::PhantomData;
use core::mem::size_of; use core::mem::size_of;
use delegate::delegate; use delegate::delegate;
@ -26,17 +25,32 @@ use spacepackets::{ByteConversionError, SizeMissmatch, SpHeader};
use spacepackets::{CcsdsPacket, PacketId, PacketSequenceCtrl}; use spacepackets::{CcsdsPacket, PacketId, PacketSequenceCtrl};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::sync::{mpsc, RwLock, RwLockWriteGuard}; pub use stdmod::{CrossbeamVerifSender, StdVerifSender, StdVerifSenderError};
/// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard /// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard
/// This field equivalent to the first two bytes of the CCSDS space packet header. /// This field equivalent to the first two bytes of the CCSDS space packet header.
#[derive(Debug, Eq, PartialEq, Copy, Clone)] #[derive(Debug, Eq, Copy, Clone)]
pub struct RequestId { pub struct RequestId {
version_number: u8, version_number: u8,
packet_id: PacketId, packet_id: PacketId,
psc: PacketSequenceCtrl, psc: PacketSequenceCtrl,
} }
impl Hash for RequestId {
fn hash<H: Hasher>(&self, state: &mut H) {
self.raw().hash(state);
}
}
// Implement manually to satisfy derive_hash_xor_eq lint
impl PartialEq for RequestId {
fn eq(&self, other: &Self) -> bool {
self.version_number == other.version_number
&& self.packet_id == other.packet_id
&& self.psc == other.psc
}
}
impl RequestId { impl RequestId {
pub const SIZE_AS_BYTES: usize = size_of::<u32>(); pub const SIZE_AS_BYTES: usize = size_of::<u32>();
@ -96,7 +110,7 @@ pub struct VerificationErrorWithToken<E, T>(VerificationError<E>, VerificationTo
/// PUS Service 1 Verification Telemetry to a verification TM recipient. The [Downcast] trait /// PUS Service 1 Verification Telemetry to a verification TM recipient. The [Downcast] trait
/// is implemented to allow passing the sender as a boxed trait object and still retrieve the /// is implemented to allow passing the sender as a boxed trait object and still retrieve the
/// concrete type at a later point. /// concrete type at a later point.
pub trait VerificationSender<E>: Downcast { pub trait VerificationSender<E>: Downcast + Send {
fn send_verification_tm(&mut self, tm: PusTm) -> Result<(), VerificationError<E>>; fn send_verification_tm(&mut self, tm: PusTm) -> Result<(), VerificationError<E>>;
} }
@ -124,6 +138,10 @@ impl<STATE> VerificationToken<STATE> {
req_id, req_id,
} }
} }
pub fn req_id(&self) -> RequestId {
self.req_id
}
} }
pub struct VerificationReporterCfg { pub struct VerificationReporterCfg {
@ -533,20 +551,17 @@ impl VerificationReporter {
/// API as [VerificationReporter] but without the explicit sender arguments. /// API as [VerificationReporter] but without the explicit sender arguments.
pub struct VerificationReporterWithSender<E> { pub struct VerificationReporterWithSender<E> {
reporter: VerificationReporter, reporter: VerificationReporter,
pub sender: Box<dyn VerificationSender<E> + Send>, pub sender: Box<dyn VerificationSender<E>>,
} }
impl<E: 'static> VerificationReporterWithSender<E> { impl<E: 'static> VerificationReporterWithSender<E> {
pub fn new( pub fn new(cfg: VerificationReporterCfg, sender: Box<dyn VerificationSender<E>>) -> Self {
cfg: VerificationReporterCfg,
sender: Box<dyn VerificationSender<E> + Send>,
) -> Self {
Self::new_from_reporter(VerificationReporter::new(cfg), sender) Self::new_from_reporter(VerificationReporter::new(cfg), sender)
} }
pub fn new_from_reporter( pub fn new_from_reporter(
reporter: VerificationReporter, reporter: VerificationReporter,
sender: Box<dyn VerificationSender<E> + Send>, sender: Box<dyn VerificationSender<E>>,
) -> Self { ) -> Self {
Self { reporter, sender } Self { reporter, sender }
} }
@ -633,61 +648,127 @@ impl<E: 'static> VerificationReporterWithSender<E> {
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub struct StdVerifSender { mod stdmod {
pub ignore_poison_error: bool, use crate::pool::{LocalPool, StoreAddr, StoreError};
tm_store: Arc<RwLock<LocalPool>>, use crate::pus::verification::{VerificationError, VerificationSender};
tx: mpsc::Sender<StoreAddr>, use delegate::delegate;
} use spacepackets::tm::PusTm;
use std::sync::{mpsc, Arc, RwLock, RwLockWriteGuard};
#[cfg(feature = "std")] #[derive(Debug, Eq, PartialEq)]
impl StdVerifSender { pub enum StdVerifSenderError {
pub fn new(tm_store: Arc<RwLock<LocalPool>>, tx: mpsc::Sender<StoreAddr>) -> Self { PoisonError,
Self { StoreError(StoreError),
ignore_poison_error: true, RxDisconnected(StoreAddr),
tx, }
tm_store,
trait SendBackend: Send {
fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr>;
}
struct StdSenderBase<S> {
pub ignore_poison_error: bool,
tm_store: Arc<RwLock<LocalPool>>,
tx: S,
}
impl<S: SendBackend> StdSenderBase<S> {
pub fn new(tm_store: Arc<RwLock<LocalPool>>, tx: S) -> Self {
Self {
ignore_poison_error: false,
tm_store,
tx,
}
} }
} }
}
#[cfg(feature = "std")] impl SendBackend for mpsc::Sender<StoreAddr> {
unsafe impl Sync for StdVerifSender {} fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr> {
#[cfg(feature = "std")] self.send(addr).map_err(|_| addr)
unsafe impl Send for StdVerifSender {} }
}
#[cfg(feature = "std")] pub struct StdVerifSender {
#[derive(Debug, Eq, PartialEq)] base: StdSenderBase<mpsc::Sender<StoreAddr>>,
pub enum StdVerifSenderError { }
PoisonError,
StoreError(StoreError),
RxDisconnected(StoreAddr),
}
#[cfg(feature = "std")] impl StdVerifSender {
impl VerificationSender<StdVerifSenderError> for StdVerifSender { pub fn new(tm_store: Arc<RwLock<LocalPool>>, tx: mpsc::Sender<StoreAddr>) -> Self {
fn send_verification_tm( Self {
&mut self, base: StdSenderBase::new(tm_store, tx),
tm: PusTm, }
) -> Result<(), VerificationError<StdVerifSenderError>> { }
let operation = |mut mg: RwLockWriteGuard<LocalPool>| { }
let (addr, buf) = mg
.free_element(tm.len_packed()) //noinspection RsTraitImplementation
.map_err(|e| VerificationError::SendError(StdVerifSenderError::StoreError(e)))?; impl VerificationSender<StdVerifSenderError> for StdVerifSender {
tm.write_to(buf).map_err(VerificationError::PusError)?; delegate!(
self.tx.send(addr).map_err(|_| { to self.base {
VerificationError::SendError(StdVerifSenderError::RxDisconnected(addr)) fn send_verification_tm(&mut self, tm: PusTm) -> Result<(), VerificationError<StdVerifSenderError>>;
})?; }
Ok(()) );
}; }
match self.tm_store.write() { unsafe impl Sync for StdVerifSender {}
Ok(lock) => operation(lock), unsafe impl Send for StdVerifSender {}
Err(poison_error) => {
if self.ignore_poison_error { impl SendBackend for crossbeam_channel::Sender<StoreAddr> {
operation(poison_error.into_inner()) fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr> {
} else { self.send(addr).map_err(|_| addr)
Err(VerificationError::SendError( }
StdVerifSenderError::PoisonError, }
))
pub struct CrossbeamVerifSender {
base: StdSenderBase<crossbeam_channel::Sender<StoreAddr>>,
}
impl CrossbeamVerifSender {
pub fn new(
tm_store: Arc<RwLock<LocalPool>>,
tx: crossbeam_channel::Sender<StoreAddr>,
) -> Self {
Self {
base: StdSenderBase::new(tm_store, tx),
}
}
}
//noinspection RsTraitImplementation
impl VerificationSender<StdVerifSenderError> for CrossbeamVerifSender {
delegate!(
to self.base {
fn send_verification_tm(&mut self, tm: PusTm) -> Result<(), VerificationError<StdVerifSenderError>>;
}
);
}
unsafe impl Sync for CrossbeamVerifSender {}
unsafe impl Send for CrossbeamVerifSender {}
impl<S: SendBackend + 'static> VerificationSender<StdVerifSenderError> for StdSenderBase<S> {
fn send_verification_tm(
&mut self,
tm: PusTm,
) -> Result<(), VerificationError<StdVerifSenderError>> {
let operation = |mut mg: RwLockWriteGuard<LocalPool>| {
let (addr, buf) = mg.free_element(tm.len_packed()).map_err(|e| {
VerificationError::SendError(StdVerifSenderError::StoreError(e))
})?;
tm.write_to(buf).map_err(VerificationError::PusError)?;
self.tx.send(addr).map_err(|_| {
VerificationError::SendError(StdVerifSenderError::RxDisconnected(addr))
})?;
Ok(())
};
match self.tm_store.write() {
Ok(lock) => operation(lock),
Err(poison_error) => {
if self.ignore_poison_error {
operation(poison_error.into_inner())
} else {
Err(VerificationError::SendError(
StdVerifSenderError::PoisonError,
))
}
} }
} }
} }

View File

@ -1,13 +1,13 @@
use fsrc_core::pool::{LocalPool, PoolCfg}; use fsrc_core::pool::{LocalPool, PoolCfg};
use fsrc_core::pus::verification::{ use fsrc_core::pus::verification::{
FailParams, RequestId, StdVerifSender, VerificationReporter, VerificationReporterCfg, CrossbeamVerifSender, FailParams, RequestId, VerificationReporter, VerificationReporterCfg,
}; };
use hashbrown::HashMap;
use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket}; use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket};
use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader};
use spacepackets::tm::PusTm; use spacepackets::tm::PusTm;
use spacepackets::SpHeader; use spacepackets::SpHeader;
use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::{Arc, Mutex, RwLock};
use std::sync::{mpsc, Arc, Mutex, RwLock};
use std::thread; use std::thread;
const TEST_APID: u16 = 0x03; const TEST_APID: u16 = 0x03;
@ -16,28 +16,55 @@ const PACKETS_SENT: u8 = 8;
#[test] #[test]
fn test_shared_reporter() { fn test_shared_reporter() {
let seq_counter_0 = Arc::new(AtomicU16::new(0));
let seq_counter_1 = seq_counter_0.clone();
let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8); let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8);
// Shared pool object to store the verification PUS telemetry // Shared pool object to store the verification PUS telemetry
let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]);
let shared_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); let shared_tm_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg.clone())));
let (tx, rx) = mpsc::channel(); let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg)));
let mut sender_0 = StdVerifSender::new(shared_pool.clone(), tx.clone()); let shared_tc_pool_1 = shared_tc_pool_0.clone();
let mut sender_1 = StdVerifSender::new(shared_pool.clone(), tx); let (tx, rx) = crossbeam_channel::bounded(5);
let mut sender_0 = CrossbeamVerifSender::new(shared_tm_pool.clone(), tx.clone());
let mut sender_1 = CrossbeamVerifSender::new(shared_tm_pool.clone(), tx);
let reporter_0 = Arc::new(Mutex::new(VerificationReporter::new(cfg))); let reporter_0 = Arc::new(Mutex::new(VerificationReporter::new(cfg)));
let reporter_1 = reporter_0.clone(); let reporter_1 = reporter_0.clone();
let verif_sender_0 = thread::spawn(move || { let (tx_tc_0, rx_tc_0) = crossbeam_channel::bounded(3);
let mut sph = let (tx_tc_1, rx_tc_1) = crossbeam_channel::bounded(3);
SpHeader::tc(TEST_APID, seq_counter_0.fetch_add(1, Ordering::SeqCst), 0).unwrap(); {
let mut tc_guard = shared_tc_pool_0.write().unwrap();
let mut sph = SpHeader::tc(TEST_APID, 0, 0).unwrap();
let tc_header = PusTcSecondaryHeader::new_simple(17, 1); let tc_header = PusTcSecondaryHeader::new_simple(17, 1);
let pus_tc = PusTc::new(&mut sph, tc_header, None, true); let pus_tc_0 = PusTc::new(&mut sph, tc_header, None, true);
let (addr, mut buf) = tc_guard.free_element(pus_tc_0.len_packed()).unwrap();
pus_tc_0.write_to(&mut buf).unwrap();
tx_tc_0.send(addr).unwrap();
let mut sph = SpHeader::tc(TEST_APID, 1, 0).unwrap();
let tc_header = PusTcSecondaryHeader::new_simple(5, 1);
let pus_tc_1 = PusTc::new(&mut sph, tc_header, None, true);
let (addr, mut buf) = tc_guard.free_element(pus_tc_0.len_packed()).unwrap();
pus_tc_1.write_to(&mut buf).unwrap();
tx_tc_1.send(addr).unwrap();
}
let verif_sender_0 = thread::spawn(move || {
let mut tc_buf: [u8; 1024] = [0; 1024];
let tc_addr = rx_tc_1.recv().unwrap();
let tc_len;
{
let mut tc_guard = shared_tc_pool_0.write().unwrap();
let pg = tc_guard.read_with_guard(tc_addr);
let buf = pg.read().unwrap();
tc_len = buf.len();
tc_buf[0..tc_len].copy_from_slice(buf);
}
let (tc, _) = PusTc::new_from_raw_slice(&tc_buf[0..tc_len]).unwrap();
let req_id = RequestId::new(&tc);
let mut mg = reporter_0.lock().unwrap(); let mut mg = reporter_0.lock().unwrap();
let token = mg.add_tc(&pus_tc); let token = mg.add_tc_with_req_id(req_id);
let accepted_token = mg let accepted_token = mg
.acceptance_success(token, &mut sender_0, &FIXED_STAMP) .acceptance_success(token, &mut sender_0, &FIXED_STAMP)
.expect("Acceptance success failed"); .expect("Acceptance success failed");
// Do some start handling here
let started_token = mg let started_token = mg
.start_success(accepted_token, &mut sender_0, &FIXED_STAMP) .start_success(accepted_token, &mut sender_0, &FIXED_STAMP)
.expect("Start success failed"); .expect("Start success failed");
@ -48,6 +75,7 @@ fn test_shared_reporter() {
EcssEnumU8::new(0), EcssEnumU8::new(0),
) )
.expect("Start success failed"); .expect("Start success failed");
// Do some step handling here
mg.step_success( mg.step_success(
&started_token, &started_token,
&mut sender_0, &mut sender_0,
@ -60,12 +88,19 @@ fn test_shared_reporter() {
}); });
let verif_sender_1 = thread::spawn(move || { let verif_sender_1 = thread::spawn(move || {
let mut sph = let mut tc_buf: [u8; 1024] = [0; 1024];
SpHeader::tc(TEST_APID, seq_counter_1.fetch_add(1, Ordering::SeqCst), 0).unwrap(); let tc_addr = rx_tc_0.recv().unwrap();
let tc_header = PusTcSecondaryHeader::new_simple(5, 1); let tc_len;
let pus_tc = PusTc::new(&mut sph, tc_header, None, true); {
let mut tc_guard = shared_tc_pool_1.write().unwrap();
let pg = tc_guard.read_with_guard(tc_addr);
let buf = pg.read().unwrap();
tc_len = buf.len();
tc_buf[0..tc_len].copy_from_slice(buf);
}
let (tc, _) = PusTc::new_from_raw_slice(&tc_buf[0..tc_len]).unwrap();
let mut mg = reporter_1.lock().unwrap(); let mut mg = reporter_1.lock().unwrap();
let token = mg.add_tc(&pus_tc); let token = mg.add_tc(&tc);
let accepted_token = mg let accepted_token = mg
.acceptance_success(token, &mut sender_1, &FIXED_STAMP) .acceptance_success(token, &mut sender_1, &FIXED_STAMP)
.expect("Acceptance success failed"); .expect("Acceptance success failed");
@ -81,9 +116,10 @@ fn test_shared_reporter() {
let verif_receiver = thread::spawn(move || { let verif_receiver = thread::spawn(move || {
let mut packet_counter = 0; let mut packet_counter = 0;
let mut tm_buf: [u8; 1024] = [0; 1024]; let mut tm_buf: [u8; 1024] = [0; 1024];
let mut verif_map = HashMap::new();
while packet_counter < PACKETS_SENT { while packet_counter < PACKETS_SENT {
let verif_addr = rx.recv().expect("Error receiving verification packet"); let verif_addr = rx.recv().expect("Error receiving verification packet");
let mut rg = shared_pool.write().expect("Error locking shared pool"); let mut rg = shared_tm_pool.write().expect("Error locking shared pool");
let store_guard = rg.read_with_guard(verif_addr); let store_guard = rg.read_with_guard(verif_addr);
let slice = store_guard.read().expect("Error reading TM slice"); let slice = store_guard.read().expect("Error reading TM slice");
let tm_len = slice.len(); let tm_len = slice.len();
@ -96,14 +132,29 @@ fn test_shared_reporter() {
&pus_tm.source_data().expect("Invalid TM source data")[0..RequestId::SIZE_AS_BYTES], &pus_tm.source_data().expect("Invalid TM source data")[0..RequestId::SIZE_AS_BYTES],
) )
.unwrap(); .unwrap();
println!( if !verif_map.contains_key(&req_id) {
"Received PUS Verification TM[{},{}] for request ID {:#08x}", let mut content = Vec::new();
pus_tm.service(), content.push(pus_tm.subservice());
pus_tm.subservice(), verif_map.insert(req_id, content);
req_id.raw() } else {
); let content = verif_map.get_mut(&req_id).unwrap();
content.push(pus_tm.subservice())
}
packet_counter += 1; packet_counter += 1;
} }
for (_req_id, content) in verif_map {
if content.len() == 3 {
assert_eq!(content[0], 1);
assert_eq!(content[1], 3);
assert_eq!(content[2], 8);
} else {
assert_eq!(content[0], 1);
assert_eq!(content[1], 3);
assert_eq!(content[2], 5);
assert_eq!(content[3], 5);
assert_eq!(content[4], 7);
}
}
}); });
verif_sender_0.join().expect("Joining thread 0 failed"); verif_sender_0.join().expect("Joining thread 0 failed");
verif_sender_1.join().expect("Joining thread 1 failed"); verif_sender_1.join().expect("Joining thread 1 failed");