Add Crossbeam helpers #52
@ -61,7 +61,7 @@ pub type AcceptedTc = (StoreAddr, VerificationToken<TcStateAccepted>);
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum GenericSendError {
|
||||
RxDisconnected,
|
||||
QueueFull(u32),
|
||||
QueueFull(Option<u32>),
|
||||
}
|
||||
|
||||
impl Display for GenericSendError {
|
||||
@ -71,7 +71,7 @@ impl Display for GenericSendError {
|
||||
write!(f, "rx side has disconnected")
|
||||
}
|
||||
GenericSendError::QueueFull(max_cap) => {
|
||||
write!(f, "queue with max capacity of {max_cap} is full")
|
||||
write!(f, "queue with max capacity of {max_cap:?} is full")
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -319,6 +319,7 @@ pub mod std_mod {
|
||||
use crate::ChannelId;
|
||||
use alloc::boxed::Box;
|
||||
use alloc::vec::Vec;
|
||||
use crossbeam_channel as cb;
|
||||
use spacepackets::ecss::tm::PusTmCreator;
|
||||
use spacepackets::ecss::PusError;
|
||||
use spacepackets::time::cds::TimeProvider;
|
||||
@ -327,16 +328,36 @@ pub mod std_mod {
|
||||
use std::cell::RefCell;
|
||||
use std::string::String;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::mpsc::{SendError, TryRecvError};
|
||||
use std::sync::mpsc::TryRecvError;
|
||||
use thiserror::Error;
|
||||
|
||||
impl From<mpsc::SendError<StoreAddr>> for EcssTmtcError {
|
||||
fn from(_: mpsc::SendError<StoreAddr>) -> Self {
|
||||
Self::Send(GenericSendError::RxDisconnected)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<cb::SendError<StoreAddr>> for EcssTmtcError {
|
||||
fn from(_: cb::SendError<StoreAddr>) -> Self {
|
||||
Self::Send(GenericSendError::RxDisconnected)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<cb::TrySendError<StoreAddr>> for EcssTmtcError {
|
||||
fn from(value: cb::TrySendError<StoreAddr>) -> Self {
|
||||
match value {
|
||||
cb::TrySendError::Full(_) => Self::Send(GenericSendError::QueueFull(None)),
|
||||
cb::TrySendError::Disconnected(_) => Self::Send(GenericSendError::RxDisconnected),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MpscTmInStoreSender {
|
||||
id: ChannelId,
|
||||
name: &'static str,
|
||||
shared_tm_store: SharedTmStore,
|
||||
sender: mpsc::Sender<StoreAddr>,
|
||||
pub ignore_poison_errors: bool,
|
||||
}
|
||||
|
||||
impl EcssChannel for MpscTmInStoreSender {
|
||||
@ -349,11 +370,6 @@ pub mod std_mod {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SendError<StoreAddr>> for EcssTmtcError {
|
||||
fn from(_: SendError<StoreAddr>) -> Self {
|
||||
Self::Send(GenericSendError::RxDisconnected)
|
||||
}
|
||||
}
|
||||
impl MpscTmInStoreSender {
|
||||
pub fn send_direct_tm(&self, tm: PusTmCreator) -> Result<(), EcssTmtcError> {
|
||||
let addr = self.shared_tm_store.add_pus_tm(&tm)?;
|
||||
@ -366,7 +382,10 @@ pub mod std_mod {
|
||||
impl EcssTmSenderCore for MpscTmInStoreSender {
|
||||
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
|
||||
match tm {
|
||||
PusTmWrapper::InStore(addr) => self.sender.send(addr).map_err(|e| e.into()),
|
||||
PusTmWrapper::InStore(addr) => {
|
||||
self.sender.send(addr)?;
|
||||
Ok(())
|
||||
}
|
||||
PusTmWrapper::Direct(tm) => self.send_direct_tm(tm),
|
||||
}
|
||||
}
|
||||
@ -384,7 +403,6 @@ pub mod std_mod {
|
||||
name,
|
||||
shared_tm_store,
|
||||
sender,
|
||||
ignore_poison_errors: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -393,7 +411,6 @@ pub mod std_mod {
|
||||
id: ChannelId,
|
||||
name: &'static str,
|
||||
receiver: mpsc::Receiver<TcAddrWithToken>,
|
||||
pub ignore_poison_errors: bool,
|
||||
}
|
||||
|
||||
impl EcssChannel for MpscTcInStoreReceiver {
|
||||
@ -427,12 +444,7 @@ pub mod std_mod {
|
||||
name: &'static str,
|
||||
receiver: mpsc::Receiver<TcAddrWithToken>,
|
||||
) -> Self {
|
||||
Self {
|
||||
id,
|
||||
name,
|
||||
receiver,
|
||||
ignore_poison_errors: false,
|
||||
}
|
||||
Self { id, name, receiver }
|
||||
}
|
||||
}
|
||||
|
||||
@ -447,8 +459,8 @@ pub mod std_mod {
|
||||
name: &'static str,
|
||||
}
|
||||
|
||||
impl From<SendError<Vec<u8>>> for EcssTmtcError {
|
||||
fn from(_: SendError<Vec<u8>>) -> Self {
|
||||
impl From<mpsc::SendError<Vec<u8>>> for EcssTmtcError {
|
||||
fn from(_: mpsc::SendError<Vec<u8>>) -> Self {
|
||||
Self::Send(GenericSendError::RxDisconnected)
|
||||
}
|
||||
}
|
||||
@ -482,6 +494,94 @@ pub mod std_mod {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CrossbeamTmInStoreSender {
|
||||
id: ChannelId,
|
||||
name: &'static str,
|
||||
shared_tm_store: SharedTmStore,
|
||||
sender: crossbeam_channel::Sender<StoreAddr>,
|
||||
}
|
||||
|
||||
impl CrossbeamTmInStoreSender {
|
||||
pub fn new(
|
||||
id: ChannelId,
|
||||
name: &'static str,
|
||||
shared_tm_store: SharedTmStore,
|
||||
sender: crossbeam_channel::Sender<StoreAddr>,
|
||||
) -> Self {
|
||||
Self {
|
||||
id,
|
||||
name,
|
||||
shared_tm_store,
|
||||
sender,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EcssChannel for CrossbeamTmInStoreSender {
|
||||
fn id(&self) -> ChannelId {
|
||||
self.id
|
||||
}
|
||||
|
||||
fn name(&self) -> &'static str {
|
||||
self.name
|
||||
}
|
||||
}
|
||||
|
||||
impl EcssTmSenderCore for CrossbeamTmInStoreSender {
|
||||
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
|
||||
match tm {
|
||||
PusTmWrapper::InStore(addr) => self.sender.try_send(addr)?,
|
||||
PusTmWrapper::Direct(tm) => {
|
||||
let addr = self.shared_tm_store.add_pus_tm(&tm)?;
|
||||
self.sender.try_send(addr)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CrossbeamTcInStoreReceiver {
|
||||
id: ChannelId,
|
||||
name: &'static str,
|
||||
receiver: cb::Receiver<TcAddrWithToken>,
|
||||
}
|
||||
|
||||
impl CrossbeamTcInStoreReceiver {
|
||||
pub fn new(
|
||||
id: ChannelId,
|
||||
name: &'static str,
|
||||
receiver: cb::Receiver<TcAddrWithToken>,
|
||||
) -> Self {
|
||||
Self { id, name, receiver }
|
||||
}
|
||||
}
|
||||
|
||||
impl EcssChannel for CrossbeamTcInStoreReceiver {
|
||||
fn id(&self) -> ChannelId {
|
||||
self.id
|
||||
}
|
||||
|
||||
fn name(&self) -> &'static str {
|
||||
self.name
|
||||
}
|
||||
}
|
||||
|
||||
impl EcssTcReceiverCore for CrossbeamTcInStoreReceiver {
|
||||
fn recv_tc(&self) -> Result<ReceivedTcWrapper, TryRecvTmtcError> {
|
||||
let (store_addr, token) = self.receiver.try_recv().map_err(|e| match e {
|
||||
cb::TryRecvError::Empty => TryRecvTmtcError::Empty,
|
||||
cb::TryRecvError::Disconnected => {
|
||||
TryRecvTmtcError::Error(EcssTmtcError::from(GenericRecvError::TxDisconnected))
|
||||
}
|
||||
})?;
|
||||
Ok(ReceivedTcWrapper {
|
||||
store_addr,
|
||||
token: Some(token),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Error)]
|
||||
pub enum PusPacketHandlingError {
|
||||
#[error("generic PUS error: {0}")]
|
||||
|
@ -1,5 +1,3 @@
|
||||
// TODO: Refactor this to also test the STD impl using mpsc
|
||||
// TODO: Change back to cross-beam as soon as STD impl was added back for TM.
|
||||
#[cfg(feature = "crossbeam")]
|
||||
pub mod crossbeam_test {
|
||||
use hashbrown::HashMap;
|
||||
@ -7,13 +5,13 @@ pub mod crossbeam_test {
|
||||
use satrs_core::pus::verification::{
|
||||
FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender,
|
||||
};
|
||||
use satrs_core::pus::MpscTmInStoreSender;
|
||||
use satrs_core::pus::CrossbeamTmInStoreSender;
|
||||
use satrs_core::tmtc::tm_helper::SharedTmStore;
|
||||
use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader};
|
||||
use spacepackets::ecss::tm::PusTmReader;
|
||||
use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, SerializablePusPacket};
|
||||
use spacepackets::SpHeader;
|
||||
use std::sync::{mpsc, Arc, RwLock};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
@ -39,9 +37,9 @@ pub mod crossbeam_test {
|
||||
let shared_tm_store = SharedTmStore::new(Box::new(LocalPool::new(pool_cfg.clone())));
|
||||
let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg)));
|
||||
let shared_tc_pool_1 = shared_tc_pool_0.clone();
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let (tx, rx) = crossbeam_channel::bounded(10);
|
||||
let sender =
|
||||
MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tx.clone());
|
||||
CrossbeamTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tx.clone());
|
||||
let mut reporter_with_sender_0 =
|
||||
VerificationReporterWithSender::new(&cfg, Box::new(sender));
|
||||
let mut reporter_with_sender_1 = reporter_with_sender_0.clone();
|
||||
|
Loading…
Reference in New Issue
Block a user