Add Crossbeam helpers #52
@ -61,7 +61,7 @@ pub type AcceptedTc = (StoreAddr, VerificationToken<TcStateAccepted>);
|
|||||||
#[derive(Debug, Copy, Clone)]
|
#[derive(Debug, Copy, Clone)]
|
||||||
pub enum GenericSendError {
|
pub enum GenericSendError {
|
||||||
RxDisconnected,
|
RxDisconnected,
|
||||||
QueueFull(u32),
|
QueueFull(Option<u32>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Display for GenericSendError {
|
impl Display for GenericSendError {
|
||||||
@ -71,7 +71,7 @@ impl Display for GenericSendError {
|
|||||||
write!(f, "rx side has disconnected")
|
write!(f, "rx side has disconnected")
|
||||||
}
|
}
|
||||||
GenericSendError::QueueFull(max_cap) => {
|
GenericSendError::QueueFull(max_cap) => {
|
||||||
write!(f, "queue with max capacity of {max_cap} is full")
|
write!(f, "queue with max capacity of {max_cap:?} is full")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -319,6 +319,7 @@ pub mod std_mod {
|
|||||||
use crate::ChannelId;
|
use crate::ChannelId;
|
||||||
use alloc::boxed::Box;
|
use alloc::boxed::Box;
|
||||||
use alloc::vec::Vec;
|
use alloc::vec::Vec;
|
||||||
|
use crossbeam_channel as cb;
|
||||||
use spacepackets::ecss::tm::PusTmCreator;
|
use spacepackets::ecss::tm::PusTmCreator;
|
||||||
use spacepackets::ecss::PusError;
|
use spacepackets::ecss::PusError;
|
||||||
use spacepackets::time::cds::TimeProvider;
|
use spacepackets::time::cds::TimeProvider;
|
||||||
@ -327,16 +328,36 @@ pub mod std_mod {
|
|||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::string::String;
|
use std::string::String;
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
use std::sync::mpsc::{SendError, TryRecvError};
|
use std::sync::mpsc::TryRecvError;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
|
impl From<mpsc::SendError<StoreAddr>> for EcssTmtcError {
|
||||||
|
fn from(_: mpsc::SendError<StoreAddr>) -> Self {
|
||||||
|
Self::Send(GenericSendError::RxDisconnected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<cb::SendError<StoreAddr>> for EcssTmtcError {
|
||||||
|
fn from(_: cb::SendError<StoreAddr>) -> Self {
|
||||||
|
Self::Send(GenericSendError::RxDisconnected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<cb::TrySendError<StoreAddr>> for EcssTmtcError {
|
||||||
|
fn from(value: cb::TrySendError<StoreAddr>) -> Self {
|
||||||
|
match value {
|
||||||
|
cb::TrySendError::Full(_) => Self::Send(GenericSendError::QueueFull(None)),
|
||||||
|
cb::TrySendError::Disconnected(_) => Self::Send(GenericSendError::RxDisconnected),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct MpscTmInStoreSender {
|
pub struct MpscTmInStoreSender {
|
||||||
id: ChannelId,
|
id: ChannelId,
|
||||||
name: &'static str,
|
name: &'static str,
|
||||||
shared_tm_store: SharedTmStore,
|
shared_tm_store: SharedTmStore,
|
||||||
sender: mpsc::Sender<StoreAddr>,
|
sender: mpsc::Sender<StoreAddr>,
|
||||||
pub ignore_poison_errors: bool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EcssChannel for MpscTmInStoreSender {
|
impl EcssChannel for MpscTmInStoreSender {
|
||||||
@ -349,11 +370,6 @@ pub mod std_mod {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<SendError<StoreAddr>> for EcssTmtcError {
|
|
||||||
fn from(_: SendError<StoreAddr>) -> Self {
|
|
||||||
Self::Send(GenericSendError::RxDisconnected)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl MpscTmInStoreSender {
|
impl MpscTmInStoreSender {
|
||||||
pub fn send_direct_tm(&self, tm: PusTmCreator) -> Result<(), EcssTmtcError> {
|
pub fn send_direct_tm(&self, tm: PusTmCreator) -> Result<(), EcssTmtcError> {
|
||||||
let addr = self.shared_tm_store.add_pus_tm(&tm)?;
|
let addr = self.shared_tm_store.add_pus_tm(&tm)?;
|
||||||
@ -366,7 +382,10 @@ pub mod std_mod {
|
|||||||
impl EcssTmSenderCore for MpscTmInStoreSender {
|
impl EcssTmSenderCore for MpscTmInStoreSender {
|
||||||
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
|
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
|
||||||
match tm {
|
match tm {
|
||||||
PusTmWrapper::InStore(addr) => self.sender.send(addr).map_err(|e| e.into()),
|
PusTmWrapper::InStore(addr) => {
|
||||||
|
self.sender.send(addr)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
PusTmWrapper::Direct(tm) => self.send_direct_tm(tm),
|
PusTmWrapper::Direct(tm) => self.send_direct_tm(tm),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -384,7 +403,6 @@ pub mod std_mod {
|
|||||||
name,
|
name,
|
||||||
shared_tm_store,
|
shared_tm_store,
|
||||||
sender,
|
sender,
|
||||||
ignore_poison_errors: false,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -393,7 +411,6 @@ pub mod std_mod {
|
|||||||
id: ChannelId,
|
id: ChannelId,
|
||||||
name: &'static str,
|
name: &'static str,
|
||||||
receiver: mpsc::Receiver<TcAddrWithToken>,
|
receiver: mpsc::Receiver<TcAddrWithToken>,
|
||||||
pub ignore_poison_errors: bool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EcssChannel for MpscTcInStoreReceiver {
|
impl EcssChannel for MpscTcInStoreReceiver {
|
||||||
@ -427,12 +444,7 @@ pub mod std_mod {
|
|||||||
name: &'static str,
|
name: &'static str,
|
||||||
receiver: mpsc::Receiver<TcAddrWithToken>,
|
receiver: mpsc::Receiver<TcAddrWithToken>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self { id, name, receiver }
|
||||||
id,
|
|
||||||
name,
|
|
||||||
receiver,
|
|
||||||
ignore_poison_errors: false,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -447,8 +459,8 @@ pub mod std_mod {
|
|||||||
name: &'static str,
|
name: &'static str,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<SendError<Vec<u8>>> for EcssTmtcError {
|
impl From<mpsc::SendError<Vec<u8>>> for EcssTmtcError {
|
||||||
fn from(_: SendError<Vec<u8>>) -> Self {
|
fn from(_: mpsc::SendError<Vec<u8>>) -> Self {
|
||||||
Self::Send(GenericSendError::RxDisconnected)
|
Self::Send(GenericSendError::RxDisconnected)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -482,6 +494,94 @@ pub mod std_mod {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct CrossbeamTmInStoreSender {
|
||||||
|
id: ChannelId,
|
||||||
|
name: &'static str,
|
||||||
|
shared_tm_store: SharedTmStore,
|
||||||
|
sender: crossbeam_channel::Sender<StoreAddr>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CrossbeamTmInStoreSender {
|
||||||
|
pub fn new(
|
||||||
|
id: ChannelId,
|
||||||
|
name: &'static str,
|
||||||
|
shared_tm_store: SharedTmStore,
|
||||||
|
sender: crossbeam_channel::Sender<StoreAddr>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
id,
|
||||||
|
name,
|
||||||
|
shared_tm_store,
|
||||||
|
sender,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EcssChannel for CrossbeamTmInStoreSender {
|
||||||
|
fn id(&self) -> ChannelId {
|
||||||
|
self.id
|
||||||
|
}
|
||||||
|
|
||||||
|
fn name(&self) -> &'static str {
|
||||||
|
self.name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EcssTmSenderCore for CrossbeamTmInStoreSender {
|
||||||
|
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
|
||||||
|
match tm {
|
||||||
|
PusTmWrapper::InStore(addr) => self.sender.try_send(addr)?,
|
||||||
|
PusTmWrapper::Direct(tm) => {
|
||||||
|
let addr = self.shared_tm_store.add_pus_tm(&tm)?;
|
||||||
|
self.sender.try_send(addr)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct CrossbeamTcInStoreReceiver {
|
||||||
|
id: ChannelId,
|
||||||
|
name: &'static str,
|
||||||
|
receiver: cb::Receiver<TcAddrWithToken>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CrossbeamTcInStoreReceiver {
|
||||||
|
pub fn new(
|
||||||
|
id: ChannelId,
|
||||||
|
name: &'static str,
|
||||||
|
receiver: cb::Receiver<TcAddrWithToken>,
|
||||||
|
) -> Self {
|
||||||
|
Self { id, name, receiver }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EcssChannel for CrossbeamTcInStoreReceiver {
|
||||||
|
fn id(&self) -> ChannelId {
|
||||||
|
self.id
|
||||||
|
}
|
||||||
|
|
||||||
|
fn name(&self) -> &'static str {
|
||||||
|
self.name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EcssTcReceiverCore for CrossbeamTcInStoreReceiver {
|
||||||
|
fn recv_tc(&self) -> Result<ReceivedTcWrapper, TryRecvTmtcError> {
|
||||||
|
let (store_addr, token) = self.receiver.try_recv().map_err(|e| match e {
|
||||||
|
cb::TryRecvError::Empty => TryRecvTmtcError::Empty,
|
||||||
|
cb::TryRecvError::Disconnected => {
|
||||||
|
TryRecvTmtcError::Error(EcssTmtcError::from(GenericRecvError::TxDisconnected))
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
Ok(ReceivedTcWrapper {
|
||||||
|
store_addr,
|
||||||
|
token: Some(token),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Error)]
|
#[derive(Debug, Clone, Error)]
|
||||||
pub enum PusPacketHandlingError {
|
pub enum PusPacketHandlingError {
|
||||||
#[error("generic PUS error: {0}")]
|
#[error("generic PUS error: {0}")]
|
||||||
|
@ -1,5 +1,3 @@
|
|||||||
// TODO: Refactor this to also test the STD impl using mpsc
|
|
||||||
// TODO: Change back to cross-beam as soon as STD impl was added back for TM.
|
|
||||||
#[cfg(feature = "crossbeam")]
|
#[cfg(feature = "crossbeam")]
|
||||||
pub mod crossbeam_test {
|
pub mod crossbeam_test {
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
@ -7,13 +5,13 @@ pub mod crossbeam_test {
|
|||||||
use satrs_core::pus::verification::{
|
use satrs_core::pus::verification::{
|
||||||
FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender,
|
FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender,
|
||||||
};
|
};
|
||||||
use satrs_core::pus::MpscTmInStoreSender;
|
use satrs_core::pus::CrossbeamTmInStoreSender;
|
||||||
use satrs_core::tmtc::tm_helper::SharedTmStore;
|
use satrs_core::tmtc::tm_helper::SharedTmStore;
|
||||||
use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader};
|
use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader};
|
||||||
use spacepackets::ecss::tm::PusTmReader;
|
use spacepackets::ecss::tm::PusTmReader;
|
||||||
use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, SerializablePusPacket};
|
use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, SerializablePusPacket};
|
||||||
use spacepackets::SpHeader;
|
use spacepackets::SpHeader;
|
||||||
use std::sync::{mpsc, Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
@ -39,9 +37,9 @@ pub mod crossbeam_test {
|
|||||||
let shared_tm_store = SharedTmStore::new(Box::new(LocalPool::new(pool_cfg.clone())));
|
let shared_tm_store = SharedTmStore::new(Box::new(LocalPool::new(pool_cfg.clone())));
|
||||||
let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg)));
|
let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg)));
|
||||||
let shared_tc_pool_1 = shared_tc_pool_0.clone();
|
let shared_tc_pool_1 = shared_tc_pool_0.clone();
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = crossbeam_channel::bounded(10);
|
||||||
let sender =
|
let sender =
|
||||||
MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tx.clone());
|
CrossbeamTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tx.clone());
|
||||||
let mut reporter_with_sender_0 =
|
let mut reporter_with_sender_0 =
|
||||||
VerificationReporterWithSender::new(&cfg, Box::new(sender));
|
VerificationReporterWithSender::new(&cfg, Box::new(sender));
|
||||||
let mut reporter_with_sender_1 = reporter_with_sender_0.clone();
|
let mut reporter_with_sender_1 = reporter_with_sender_0.clone();
|
||||||
|
Loading…
Reference in New Issue
Block a user