new generic Mpsc PUS senders #37

Merged
muellerr merged 1 commits from refactor_mpsc_pus_senders into main 2023-02-27 21:45:06 +01:00
9 changed files with 200 additions and 120 deletions
Showing only changes of commit 6072bc7657 - Show all commits

View File

@ -66,6 +66,7 @@ use core::slice::Iter;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
use hashbrown::HashMap; use hashbrown::HashMap;
use crate::SenderId;
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub use stdmod::*; pub use stdmod::*;
@ -84,8 +85,6 @@ pub type EventWithAuxData<Event> = (Event, Option<Params>);
pub type EventU32WithAuxData = EventWithAuxData<EventU32>; pub type EventU32WithAuxData = EventWithAuxData<EventU32>;
pub type EventU16WithAuxData = EventWithAuxData<EventU16>; pub type EventU16WithAuxData = EventWithAuxData<EventU16>;
pub type SenderId = u32;
pub trait SendEventProvider<Provider: GenericEvent, AuxDataProvider = Params> { pub trait SendEventProvider<Provider: GenericEvent, AuxDataProvider = Params> {
type Error; type Error;

View File

@ -44,3 +44,6 @@ pub mod seq_count;
pub mod tmtc; pub mod tmtc;
pub use spacepackets; pub use spacepackets;
// Generic sender ID type.
pub type SenderId = u32;

View File

@ -51,19 +51,17 @@ pub trait PowerSwitchInfo {
fn switch_delay_ms(&self) -> u32; fn switch_delay_ms(&self) -> u32;
} }
pub trait PowerSwitchProvider: PowerSwitcherCommandSender + PowerSwitchInfo {
type Error;
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
#![allow(dead_code)]
use super::*; use super::*;
use crate::power::PowerSwitcherCommandSender;
use std::boxed::Box; use std::boxed::Box;
struct Pcdu { struct Pcdu {
switch_rx: std::sync::mpsc::Receiver<(SwitchId, u16)>, switch_rx: std::sync::mpsc::Receiver<(SwitchId, u16)>,
} }
#[derive(Eq, PartialEq)]
enum DeviceState { enum DeviceState {
OFF, OFF,
SwitchingPower, SwitchingPower,
@ -73,6 +71,7 @@ mod tests {
} }
struct MyComplexDevice { struct MyComplexDevice {
power_switcher: Box<dyn PowerSwitcherCommandSender<Error = ()>>, power_switcher: Box<dyn PowerSwitcherCommandSender<Error = ()>>,
power_info: Box<dyn PowerSwitchInfo<Error = ()>>,
switch_id: SwitchId, switch_id: SwitchId,
some_state: u16, some_state: u16,
dev_state: DeviceState, dev_state: DeviceState,
@ -92,7 +91,7 @@ mod tests {
self.dev_state = DeviceState::SwitchingPower; self.dev_state = DeviceState::SwitchingPower;
} }
if self.dev_state == DeviceState::SwitchingPower { if self.dev_state == DeviceState::SwitchingPower {
if self.power_switcher.get_is_switch_on() { if self.power_info.get_is_switch_on(0).unwrap() {
self.dev_state = DeviceState::ON; self.dev_state = DeviceState::ON;
self.mode = 1; self.mode = 1;
} }

View File

@ -109,7 +109,9 @@ impl EventReporterBase {
aux_data: Option<&[u8]>, aux_data: Option<&[u8]>,
) -> Result<(), EcssTmErrorWithSend<E>> { ) -> Result<(), EcssTmErrorWithSend<E>> {
let tm = self.generate_generic_event_tm(buf, subservice, time_stamp, event_id, aux_data)?; let tm = self.generate_generic_event_tm(buf, subservice, time_stamp, event_id, aux_data)?;
sender.send_tm(tm)?; sender
.send_tm(tm)
.map_err(|e| EcssTmErrorWithSend::SendError(e))?;
self.msg_count += 1; self.msg_count += 1;
Ok(()) Ok(())
} }
@ -266,7 +268,7 @@ mod tests {
impl EcssTmSenderCore for TestSender { impl EcssTmSenderCore for TestSender {
type Error = (); type Error = ();
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<()>> { fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
assert!(tm.source_data().is_some()); assert!(tm.source_data().is_some());
let src_data = tm.source_data().unwrap(); let src_data = tm.source_data().unwrap();
assert!(src_data.len() >= 4); assert!(src_data.len() >= 4);

View File

@ -238,33 +238,14 @@ pub mod alloc_mod {
mod tests { mod tests {
use super::*; use super::*;
use crate::events::SeverityInfo; use crate::events::SeverityInfo;
use spacepackets::tm::PusTm; use crate::pus::MpscTmAsVecSender;
use std::sync::mpsc::{channel, SendError, TryRecvError}; use std::sync::mpsc::{channel, TryRecvError};
use std::vec::Vec;
const INFO_EVENT: EventU32TypedSev<SeverityInfo> = const INFO_EVENT: EventU32TypedSev<SeverityInfo> =
EventU32TypedSev::<SeverityInfo>::const_new(1, 0); EventU32TypedSev::<SeverityInfo>::const_new(1, 0);
const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5); const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5);
const EMPTY_STAMP: [u8; 7] = [0; 7]; const EMPTY_STAMP: [u8; 7] = [0; 7];
#[derive(Clone)]
struct EventTmSender {
sender: std::sync::mpsc::Sender<Vec<u8>>,
}
impl EcssTmSenderCore for EventTmSender {
type Error = SendError<Vec<u8>>;
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<Self::Error>> {
let mut vec = Vec::new();
tm.append_to_vec(&mut vec)
.map_err(|e| EcssTmErrorWithSend::EcssTmError(e.into()))?;
self.sender
.send(vec)
.map_err(EcssTmErrorWithSend::SendError)?;
Ok(())
}
}
fn create_basic_man() -> PusEventDispatcher<(), EventU32> { fn create_basic_man() -> PusEventDispatcher<(), EventU32> {
let reporter = EventReporter::new(0x02, 128).expect("Creating event repoter failed"); let reporter = EventReporter::new(0x02, 128).expect("Creating event repoter failed");
let backend = DefaultPusMgmtBackendProvider::<EventU32>::default(); let backend = DefaultPusMgmtBackendProvider::<EventU32>::default();
@ -275,7 +256,7 @@ mod tests {
fn test_basic() { fn test_basic() {
let mut event_man = create_basic_man(); let mut event_man = create_basic_man();
let (event_tx, event_rx) = channel(); let (event_tx, event_rx) = channel();
let mut sender = EventTmSender { sender: event_tx }; let mut sender = MpscTmAsVecSender::new(event_tx);
let event_sent = event_man let event_sent = event_man
.generate_pus_event_tm(&mut sender, &EMPTY_STAMP, INFO_EVENT, None) .generate_pus_event_tm(&mut sender, &EMPTY_STAMP, INFO_EVENT, None)
.expect("Sending info event failed"); .expect("Sending info event failed");
@ -289,7 +270,7 @@ mod tests {
fn test_disable_event() { fn test_disable_event() {
let mut event_man = create_basic_man(); let mut event_man = create_basic_man();
let (event_tx, event_rx) = channel(); let (event_tx, event_rx) = channel();
let mut sender = EventTmSender { sender: event_tx }; let mut sender = MpscTmAsVecSender::new(event_tx);
let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT); let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT);
assert!(res.is_ok()); assert!(res.is_ok());
assert!(res.unwrap()); assert!(res.unwrap());
@ -312,7 +293,7 @@ mod tests {
fn test_reenable_event() { fn test_reenable_event() {
let mut event_man = create_basic_man(); let mut event_man = create_basic_man();
let (event_tx, event_rx) = channel(); let (event_tx, event_rx) = channel();
let mut sender = EventTmSender { sender: event_tx }; let mut sender = MpscTmAsVecSender::new(event_tx);
let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT); let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT);
assert!(res.is_ok()); assert!(res.is_ok());
assert!(res.unwrap()); assert!(res.unwrap());

View File

@ -19,9 +19,12 @@ pub mod verification;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub use alloc_mod::*; pub use alloc_mod::*;
#[cfg(feature = "std")]
pub use std_mod::*;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum EcssTmErrorWithSend<E> { pub enum EcssTmErrorWithSend<E> {
/// Errors related to sending the verification telemetry to a TM recipient /// Errors related to sending the telemetry to a TM recipient
SendError(E), SendError(E),
EcssTmError(EcssTmError), EcssTmError(EcssTmError),
} }
@ -32,7 +35,7 @@ impl<E> From<EcssTmError> for EcssTmErrorWithSend<E> {
} }
} }
/// Generic error type which is also able to wrap a user send error with the user supplied type E. /// Generic error type for PUS TM handling.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum EcssTmError { pub enum EcssTmError {
/// Errors related to the time stamp format of the telemetry /// Errors related to the time stamp format of the telemetry
@ -61,7 +64,7 @@ impl From<ByteConversionError> for EcssTmError {
pub trait EcssTmSenderCore: Send { pub trait EcssTmSenderCore: Send {
type Error; type Error;
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<Self::Error>>; fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>;
} }
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
@ -89,6 +92,111 @@ mod alloc_mod {
impl_downcast!(EcssTmSender assoc Error); impl_downcast!(EcssTmSender assoc Error);
} }
#[cfg(feature = "std")]
pub mod std_mod {
use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError};
use crate::pus::EcssTmSenderCore;
use alloc::vec::Vec;
use spacepackets::ecss::PusError;
use spacepackets::tm::PusTm;
use std::sync::mpsc::SendError;
use std::sync::{mpsc, RwLockWriteGuard};
#[derive(Debug, Clone)]
pub enum MpscPusInStoreSendError {
LockError,
PusError(PusError),
StoreError(StoreError),
SendError(SendError<StoreAddr>),
RxDisconnected(StoreAddr),
}
impl From<PusError> for MpscPusInStoreSendError {
fn from(value: PusError) -> Self {
MpscPusInStoreSendError::PusError(value)
}
}
impl From<SendError<StoreAddr>> for MpscPusInStoreSendError {
fn from(value: SendError<StoreAddr>) -> Self {
MpscPusInStoreSendError::SendError(value)
}
}
impl From<StoreError> for MpscPusInStoreSendError {
fn from(value: StoreError) -> Self {
MpscPusInStoreSendError::StoreError(value)
}
}
#[derive(Clone)]
pub struct MpscTmInStoreSender {
store_helper: SharedPool,
sender: mpsc::Sender<StoreAddr>,
pub ignore_poison_errors: bool,
}
impl EcssTmSenderCore for MpscTmInStoreSender {
type Error = MpscPusInStoreSendError;
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
let operation = |mut store: RwLockWriteGuard<ShareablePoolProvider>| {
let (addr, slice) = store.free_element(tm.len_packed())?;
tm.write_to_bytes(slice)?;
self.sender.send(addr)?;
Ok(())
};
match self.store_helper.write() {
Ok(pool) => operation(pool),
Err(e) => {
if self.ignore_poison_errors {
operation(e.into_inner())
} else {
Err(MpscPusInStoreSendError::LockError)
}
}
}
}
}
impl MpscTmInStoreSender {
pub fn new(store_helper: SharedPool, sender: mpsc::Sender<StoreAddr>) -> Self {
Self {
store_helper,
sender,
ignore_poison_errors: false,
}
}
}
#[derive(Debug, Clone)]
pub enum MpscAsVecSenderError {
PusError(PusError),
SendError(SendError<Vec<u8>>),
}
#[derive(Debug, Clone)]
pub struct MpscTmAsVecSender {
sender: mpsc::Sender<Vec<u8>>,
}
impl MpscTmAsVecSender {
pub fn new(sender: mpsc::Sender<Vec<u8>>) -> Self {
Self { sender }
}
}
impl EcssTmSenderCore for MpscTmAsVecSender {
type Error = MpscAsVecSenderError;
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
let mut vec = Vec::new();
tm.append_to_vec(&mut vec)
.map_err(MpscAsVecSenderError::PusError)?;
self.sender
.send(vec)
.map_err(MpscAsVecSenderError::SendError)?;
Ok(())
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum GenericTcCheckError { pub enum GenericTcCheckError {
NotEnoughAppData, NotEnoughAppData,

View File

@ -102,10 +102,7 @@ use crate::seq_count::SequenceCountProviderCore;
#[cfg(all(feature = "crossbeam", feature = "std"))] #[cfg(all(feature = "crossbeam", feature = "std"))]
pub use stdmod::CrossbeamVerifSender; pub use stdmod::CrossbeamVerifSender;
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub use stdmod::{ pub use stdmod::{MpscVerifSender, SharedStdVerifReporterWithSender, StdVerifReporterWithSender};
MpscVerifSender, SharedStdVerifReporterWithSender, StdVerifReporterWithSender,
StdVerifSenderError,
};
/// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard. /// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard.
/// ///
@ -520,7 +517,12 @@ impl VerificationReporterCore {
{ {
sender sender
.send_tm(sendable.pus_tm.take().unwrap()) .send_tm(sendable.pus_tm.take().unwrap())
.map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; .map_err(|e| {
VerificationOrSendErrorWithToken(
EcssTmErrorWithSend::SendError(e),
sendable.token.unwrap(),
)
})?;
Ok(sendable.send_success_acceptance_success(Some(seq_counter))) Ok(sendable.send_success_acceptance_success(Some(seq_counter)))
} }
@ -532,7 +534,12 @@ impl VerificationReporterCore {
) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateNone>> { ) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateNone>> {
sender sender
.send_tm(sendable.pus_tm.take().unwrap()) .send_tm(sendable.pus_tm.take().unwrap())
.map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; .map_err(|e| {
VerificationOrSendErrorWithToken(
EcssTmErrorWithSend::SendError(e),
sendable.token.unwrap(),
)
})?;
sendable.send_success_verif_failure(Some(seq_counter)); sendable.send_success_verif_failure(Some(seq_counter));
Ok(()) Ok(())
} }
@ -591,7 +598,12 @@ impl VerificationReporterCore {
> { > {
sender sender
.send_tm(sendable.pus_tm.take().unwrap()) .send_tm(sendable.pus_tm.take().unwrap())
.map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; .map_err(|e| {
VerificationOrSendErrorWithToken(
EcssTmErrorWithSend::SendError(e),
sendable.token.unwrap(),
)
})?;
Ok(sendable.send_success_start_success(Some(seq_counter))) Ok(sendable.send_success_start_success(Some(seq_counter)))
} }
@ -627,7 +639,12 @@ impl VerificationReporterCore {
) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateAccepted>> { ) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateAccepted>> {
sender sender
.send_tm(sendable.pus_tm.take().unwrap()) .send_tm(sendable.pus_tm.take().unwrap())
.map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; .map_err(|e| {
VerificationOrSendErrorWithToken(
EcssTmErrorWithSend::SendError(e),
sendable.token.unwrap(),
)
})?;
sendable.send_success_verif_failure(Some(seq_counter)); sendable.send_success_verif_failure(Some(seq_counter));
Ok(()) Ok(())
} }
@ -738,7 +755,12 @@ impl VerificationReporterCore {
) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateStarted>> { ) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateStarted>> {
sender sender
.send_tm(sendable.pus_tm.take().unwrap()) .send_tm(sendable.pus_tm.take().unwrap())
.map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; .map_err(|e| {
VerificationOrSendErrorWithToken(
EcssTmErrorWithSend::SendError(e),
sendable.token.unwrap(),
)
})?;
sendable.send_success_step_or_completion_success(Some(seq_counter)); sendable.send_success_step_or_completion_success(Some(seq_counter));
Ok(()) Ok(())
} }
@ -751,7 +773,12 @@ impl VerificationReporterCore {
) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateStarted>> { ) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateStarted>> {
sender sender
.send_tm(sendable.pus_tm.take().unwrap()) .send_tm(sendable.pus_tm.take().unwrap())
.map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; .map_err(|e| {
VerificationOrSendErrorWithToken(
EcssTmErrorWithSend::SendError(e),
sendable.token.unwrap(),
)
})?;
sendable.send_success_verif_failure(Some(seq_counter)); sendable.send_success_verif_failure(Some(seq_counter));
Ok(()) Ok(())
} }
@ -1223,33 +1250,15 @@ mod alloc_mod {
mod stdmod { mod stdmod {
use super::alloc_mod::VerificationReporterWithSender; use super::alloc_mod::VerificationReporterWithSender;
use super::*; use super::*;
use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError}; use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr};
use crate::pus::MpscPusInStoreSendError;
use delegate::delegate; use delegate::delegate;
use spacepackets::tm::PusTm; use spacepackets::tm::PusTm;
use std::sync::{mpsc, Arc, Mutex, RwLockWriteGuard}; use std::sync::{mpsc, Arc, Mutex, RwLockWriteGuard};
pub type StdVerifReporterWithSender = VerificationReporterWithSender<StdVerifSenderError>; pub type StdVerifReporterWithSender = VerificationReporterWithSender<MpscPusInStoreSendError>;
pub type SharedStdVerifReporterWithSender = Arc<Mutex<StdVerifReporterWithSender>>; pub type SharedStdVerifReporterWithSender = Arc<Mutex<StdVerifReporterWithSender>>;
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum StdVerifSenderError {
PoisonError,
StoreError(StoreError),
RxDisconnected(StoreAddr),
}
impl From<StoreError> for StdVerifSenderError {
fn from(e: StoreError) -> Self {
StdVerifSenderError::StoreError(e)
}
}
impl From<StoreError> for EcssTmErrorWithSend<StdVerifSenderError> {
fn from(e: StoreError) -> Self {
EcssTmErrorWithSend::SendError(e.into())
}
}
trait SendBackend: Send { trait SendBackend: Send {
fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr>; fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr>;
} }
@ -1297,11 +1306,11 @@ mod stdmod {
//noinspection RsTraitImplementation //noinspection RsTraitImplementation
impl EcssTmSenderCore for MpscVerifSender { impl EcssTmSenderCore for MpscVerifSender {
type Error = StdVerifSenderError; type Error = MpscPusInStoreSendError;
delegate!( delegate!(
to self.base { to self.base {
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<StdVerifSenderError>>; fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>;
} }
); );
} }
@ -1332,25 +1341,26 @@ mod stdmod {
//noinspection RsTraitImplementation //noinspection RsTraitImplementation
#[cfg(feature = "crossbeam")] #[cfg(feature = "crossbeam")]
impl EcssTmSenderCore for CrossbeamVerifSender { impl EcssTmSenderCore for CrossbeamVerifSender {
type Error = StdVerifSenderError; type Error = MpscPusInStoreSendError;
delegate!( delegate!(
to self.base { to self.base {
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<StdVerifSenderError>>; fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>;
} }
); );
} }
impl<S: SendBackend + Clone + 'static> EcssTmSenderCore for StdSenderBase<S> { impl<S: SendBackend + Clone + 'static> EcssTmSenderCore for StdSenderBase<S> {
type Error = StdVerifSenderError; type Error = MpscPusInStoreSendError;
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<Self::Error>> { fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
let operation = |mut mg: RwLockWriteGuard<ShareablePoolProvider>| { let operation = |mut mg: RwLockWriteGuard<ShareablePoolProvider>| {
let (addr, buf) = mg.free_element(tm.len_packed())?; let (addr, buf) = mg.free_element(tm.len_packed())?;
tm.write_to_bytes(buf).map_err(EcssTmError::PusError)?; tm.write_to_bytes(buf)
.map_err(MpscPusInStoreSendError::PusError)?;
drop(mg); drop(mg);
self.tx.send(addr).map_err(|_| { self.tx
EcssTmErrorWithSend::SendError(StdVerifSenderError::RxDisconnected(addr)) .send(addr)
})?; .map_err(|_| MpscPusInStoreSendError::RxDisconnected(addr))?;
Ok(()) Ok(())
}; };
match self.tm_store.write() { match self.tm_store.write() {
@ -1359,9 +1369,7 @@ mod stdmod {
if self.ignore_poison_error { if self.ignore_poison_error {
operation(poison_error.into_inner()) operation(poison_error.into_inner())
} else { } else {
Err(EcssTmErrorWithSend::SendError( Err(MpscPusInStoreSendError::LockError)
StdVerifSenderError::PoisonError,
))
} }
} }
} }
@ -1427,7 +1435,7 @@ mod tests {
impl EcssTmSenderCore for TestSender { impl EcssTmSenderCore for TestSender {
type Error = (); type Error = ();
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<Self::Error>> { fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
assert_eq!(PusPacket::service(&tm), 1); assert_eq!(PusPacket::service(&tm), 1);
assert!(tm.source_data().is_some()); assert!(tm.source_data().is_some());
let mut time_stamp = [0; 7]; let mut time_stamp = [0; 7];
@ -1457,8 +1465,8 @@ mod tests {
impl EcssTmSenderCore for FallibleSender { impl EcssTmSenderCore for FallibleSender {
type Error = DummyError; type Error = DummyError;
fn send_tm(&mut self, _: PusTm) -> Result<(), EcssTmErrorWithSend<DummyError>> { fn send_tm(&mut self, _: PusTm) -> Result<(), Self::Error> {
Err(EcssTmErrorWithSend::SendError(DummyError {})) Err(DummyError {})
} }
} }

View File

@ -7,8 +7,8 @@ use satrs_core::params::{Params, ParamsHeapless, WritableToBeBytes};
use satrs_core::pus::event_man::{ use satrs_core::pus::event_man::{
DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher, DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher,
}; };
use satrs_core::pus::{EcssTmErrorWithSend, EcssTmSenderCore}; use satrs_core::pus::EcssTmSenderCore;
use spacepackets::ecss::PusPacket; use spacepackets::ecss::{PusError, PusPacket};
use spacepackets::tm::PusTm; use spacepackets::tm::PusTm;
use std::sync::mpsc::{channel, SendError, TryRecvError}; use std::sync::mpsc::{channel, SendError, TryRecvError};
use std::thread; use std::thread;
@ -18,20 +18,26 @@ const INFO_EVENT: EventU32TypedSev<SeverityInfo> =
const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5); const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5);
const EMPTY_STAMP: [u8; 7] = [0; 7]; const EMPTY_STAMP: [u8; 7] = [0; 7];
#[derive(Debug, Clone)]
pub enum CustomTmSenderError {
SendError(SendError<Vec<u8>>),
PusError(PusError),
}
#[derive(Clone)] #[derive(Clone)]
struct EventTmSender { struct EventTmSender {
sender: std::sync::mpsc::Sender<Vec<u8>>, sender: std::sync::mpsc::Sender<Vec<u8>>,
} }
impl EcssTmSenderCore for EventTmSender { impl EcssTmSenderCore for EventTmSender {
type Error = SendError<Vec<u8>>; type Error = CustomTmSenderError;
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<Self::Error>> { fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
let mut vec = Vec::new(); let mut vec = Vec::new();
tm.append_to_vec(&mut vec) tm.append_to_vec(&mut vec)
.map_err(|e| EcssTmErrorWithSend::EcssTmError(e.into()))?; .map_err(|e| CustomTmSenderError::PusError(e))?;
self.sender self.sender
.send(vec) .send(vec)
.map_err(EcssTmErrorWithSend::SendError)?; .map_err(CustomTmSenderError::SendError)?;
Ok(()) Ok(())
} }
} }

View File

@ -18,7 +18,7 @@ use satrs_core::event_man::{
}; };
use satrs_core::events::EventU32; use satrs_core::events::EventU32;
use satrs_core::hk::HkRequest; use satrs_core::hk::HkRequest;
use satrs_core::pool::{LocalPool, PoolCfg, StoreAddr}; use satrs_core::pool::{LocalPool, PoolCfg};
use satrs_core::pus::event_man::{ use satrs_core::pus::event_man::{
DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken,
PusEventDispatcher, PusEventDispatcher,
@ -27,7 +27,7 @@ use satrs_core::pus::hk::Subservice as HkSubservice;
use satrs_core::pus::verification::{ use satrs_core::pus::verification::{
MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender, MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender,
}; };
use satrs_core::pus::{EcssTmErrorWithSend, EcssTmSenderCore}; use satrs_core::pus::MpscTmInStoreSender;
use satrs_core::seq_count::SeqCountProviderSimple; use satrs_core::seq_count::SeqCountProviderSimple;
use satrs_core::spacepackets::{ use satrs_core::spacepackets::{
time::cds::TimeProvider, time::cds::TimeProvider,
@ -39,36 +39,10 @@ use satrs_example::{RequestTargetId, OBSW_SERVER_ADDR, SERVER_PORT};
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::sync::mpsc::{channel, TryRecvError}; use std::sync::mpsc::{channel, TryRecvError};
use std::sync::{mpsc, Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
#[derive(Clone)]
struct EventTmSender {
store_helper: TmStore,
sender: mpsc::Sender<StoreAddr>,
}
impl EventTmSender {
fn new(store_helper: TmStore, sender: mpsc::Sender<StoreAddr>) -> Self {
Self {
store_helper,
sender,
}
}
}
impl EcssTmSenderCore for EventTmSender {
type Error = mpsc::SendError<StoreAddr>;
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<Self::Error>> {
let addr = self.store_helper.add_pus_tm(&tm);
self.sender
.send(addr)
.map_err(EcssTmErrorWithSend::SendError)
}
}
fn main() { fn main() {
setup_logger().expect("setting up logging with fern failed"); setup_logger().expect("setting up logging with fern failed");
println!("Running OBSW example"); println!("Running OBSW example");
@ -199,7 +173,7 @@ fn main() {
.name("Event".to_string()) .name("Event".to_string())
.spawn(move || { .spawn(move || {
let mut timestamp: [u8; 7] = [0; 7]; let mut timestamp: [u8; 7] = [0; 7];
let mut sender = EventTmSender::new(tm_store, tm_funnel_tx); let mut sender = MpscTmInStoreSender::new(tm_store.pool, tm_funnel_tx);
let mut time_provider = TimeProvider::new_with_u16_days(0, 0); let mut time_provider = TimeProvider::new_with_u16_days(0, 0);
let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| {
reporter_event_handler reporter_event_handler