added first crossbeam API to allow bounded API

This commit is contained in:
Robin Müller 2023-07-11 22:48:43 +02:00
parent 1283db3e75
commit 8b077c47b2
Signed by: muellerr
GPG Key ID: A649FB78196E3849

View File

@ -61,7 +61,7 @@ pub type AcceptedTc = (StoreAddr, VerificationToken<TcStateAccepted>);
#[derive(Debug, Copy, Clone)]
pub enum GenericSendError {
RxDisconnected,
QueueFull(u32),
QueueFull(Option<u32>),
}
impl Display for GenericSendError {
@ -71,7 +71,7 @@ impl Display for GenericSendError {
write!(f, "rx side has disconnected")
}
GenericSendError::QueueFull(max_cap) => {
write!(f, "queue with max capacity of {max_cap} is full")
write!(f, "queue with max capacity of {max_cap:?} is full")
}
}
}
@ -327,8 +327,30 @@ pub mod std_mod {
use std::cell::RefCell;
use std::string::String;
use std::sync::mpsc;
use std::sync::mpsc::{SendError, TryRecvError};
use std::sync::mpsc::TryRecvError;
use thiserror::Error;
use crossbeam_channel as cb;
impl From<mpsc::SendError<StoreAddr>> for EcssTmtcError {
fn from(_: mpsc::SendError<StoreAddr>) -> Self {
Self::Send(GenericSendError::RxDisconnected)
}
}
impl From<cb::SendError<StoreAddr>> for EcssTmtcError {
fn from(_: cb::SendError<StoreAddr>) -> Self {
Self::Send(GenericSendError::RxDisconnected)
}
}
impl From<cb::TrySendError<StoreAddr>> for EcssTmtcError {
fn from(value: cb::TrySendError<StoreAddr>) -> Self {
match value {
cb::TrySendError::Full(_) => Self::Send(GenericSendError::QueueFull(None)),
cb::TrySendError::Disconnected(_) => Self::Send(GenericSendError::RxDisconnected),
}
}
}
#[derive(Clone)]
pub struct MpscTmInStoreSender {
@ -336,7 +358,6 @@ pub mod std_mod {
name: &'static str,
shared_tm_store: SharedTmStore,
sender: mpsc::Sender<StoreAddr>,
pub ignore_poison_errors: bool,
}
impl EcssChannel for MpscTmInStoreSender {
@ -349,11 +370,6 @@ pub mod std_mod {
}
}
impl From<SendError<StoreAddr>> for EcssTmtcError {
fn from(_: SendError<StoreAddr>) -> Self {
Self::Send(GenericSendError::RxDisconnected)
}
}
impl MpscTmInStoreSender {
pub fn send_direct_tm(&self, tm: PusTmCreator) -> Result<(), EcssTmtcError> {
let addr = self.shared_tm_store.add_pus_tm(&tm)?;
@ -366,7 +382,10 @@ pub mod std_mod {
impl EcssTmSenderCore for MpscTmInStoreSender {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
match tm {
PusTmWrapper::InStore(addr) => self.sender.send(addr).map_err(|e| e.into()),
PusTmWrapper::InStore(addr) => {
self.sender.send(addr)?;
Ok(())
}
PusTmWrapper::Direct(tm) => self.send_direct_tm(tm),
}
}
@ -384,7 +403,6 @@ pub mod std_mod {
name,
shared_tm_store,
sender,
ignore_poison_errors: false,
}
}
}
@ -393,7 +411,6 @@ pub mod std_mod {
id: ChannelId,
name: &'static str,
receiver: mpsc::Receiver<TcAddrWithToken>,
pub ignore_poison_errors: bool,
}
impl EcssChannel for MpscTcInStoreReceiver {
@ -431,7 +448,6 @@ pub mod std_mod {
id,
name,
receiver,
ignore_poison_errors: false,
}
}
}
@ -447,8 +463,8 @@ pub mod std_mod {
name: &'static str,
}
impl From<SendError<Vec<u8>>> for EcssTmtcError {
fn from(_: SendError<Vec<u8>>) -> Self {
impl From<mpsc::SendError<Vec<u8>>> for EcssTmtcError {
fn from(_: mpsc::SendError<Vec<u8>>) -> Self {
Self::Send(GenericSendError::RxDisconnected)
}
}
@ -482,6 +498,101 @@ pub mod std_mod {
}
}
#[derive(Clone)]
pub struct CrossbeamTmInStoreSender {
id: ChannelId,
name: &'static str,
shared_tm_store: SharedTmStore,
sender: crossbeam_channel::Sender<StoreAddr>,
pub ignore_poison_errors: bool,
}
impl CrossbeamTmInStoreSender {
pub fn new(
id: ChannelId,
name: &'static str,
shared_tm_store: SharedTmStore,
sender: crossbeam_channel::Sender<StoreAddr>,
ignore_poison_errors: bool,
) -> Self {
Self {
id,
name,
shared_tm_store,
sender,
ignore_poison_errors,
}
}
}
impl EcssChannel for CrossbeamTmInStoreSender {
fn id(&self) -> ChannelId {
self.id
}
fn name(&self) -> &'static str {
self.name
}
}
impl EcssTmSenderCore for CrossbeamTmInStoreSender {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
match tm {
PusTmWrapper::InStore(addr) => self.sender.try_send(addr)?,
PusTmWrapper::Direct(tm) => {
let addr = self.shared_tm_store.add_pus_tm(&tm)?;
self.sender.try_send(addr)?;
}
}
Ok(())
}
}
pub struct CrossbeamTcInStoreReceiver {
id: ChannelId,
name: &'static str,
receiver: cb::Receiver<TcAddrWithToken>,
}
impl CrossbeamTcInStoreReceiver {
pub fn new(
id: ChannelId,
name: &'static str,
receiver: cb::Receiver<TcAddrWithToken>
) -> Self {
Self {
id,
name,
receiver,
}
}
}
impl EcssChannel for CrossbeamTcInStoreReceiver {
fn id(&self) -> ChannelId {
self.id
}
fn name(&self) -> &'static str {
self.name
}
}
impl EcssTcReceiverCore for CrossbeamTcInStoreReceiver {
fn recv_tc(&self) -> Result<ReceivedTcWrapper, TryRecvTmtcError> {
let (store_addr, token) = self.receiver.try_recv().map_err(|e| match e {
cb::TryRecvError::Empty => TryRecvTmtcError::Empty,
cb::TryRecvError::Disconnected => {
TryRecvTmtcError::Error(EcssTmtcError::from(GenericRecvError::TxDisconnected))
}
})?;
Ok(ReceivedTcWrapper {
store_addr,
token: Some(token),
})
}
}
#[derive(Debug, Clone, Error)]
pub enum PusPacketHandlingError {
#[error("generic PUS error: {0}")]