Merge pull request 'new generic Mpsc PUS senders' (#37) from refactor_mpsc_pus_senders into main
Reviewed-on: #37 Reviewed-by: lkoester <st167799@stud.uni-stuttgart.de>
This commit is contained in:
commit
989373c508
@ -66,6 +66,7 @@ use core::slice::Iter;
|
|||||||
#[cfg(feature = "alloc")]
|
#[cfg(feature = "alloc")]
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
|
|
||||||
|
use crate::SenderId;
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
pub use stdmod::*;
|
pub use stdmod::*;
|
||||||
|
|
||||||
@ -84,8 +85,6 @@ pub type EventWithAuxData<Event> = (Event, Option<Params>);
|
|||||||
pub type EventU32WithAuxData = EventWithAuxData<EventU32>;
|
pub type EventU32WithAuxData = EventWithAuxData<EventU32>;
|
||||||
pub type EventU16WithAuxData = EventWithAuxData<EventU16>;
|
pub type EventU16WithAuxData = EventWithAuxData<EventU16>;
|
||||||
|
|
||||||
pub type SenderId = u32;
|
|
||||||
|
|
||||||
pub trait SendEventProvider<Provider: GenericEvent, AuxDataProvider = Params> {
|
pub trait SendEventProvider<Provider: GenericEvent, AuxDataProvider = Params> {
|
||||||
type Error;
|
type Error;
|
||||||
|
|
||||||
|
@ -44,3 +44,6 @@ pub mod seq_count;
|
|||||||
pub mod tmtc;
|
pub mod tmtc;
|
||||||
|
|
||||||
pub use spacepackets;
|
pub use spacepackets;
|
||||||
|
|
||||||
|
// Generic sender ID type.
|
||||||
|
pub type SenderId = u32;
|
||||||
|
@ -51,19 +51,17 @@ pub trait PowerSwitchInfo {
|
|||||||
fn switch_delay_ms(&self) -> u32;
|
fn switch_delay_ms(&self) -> u32;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait PowerSwitchProvider: PowerSwitcherCommandSender + PowerSwitchInfo {
|
|
||||||
type Error;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
#![allow(dead_code)]
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::power::PowerSwitcherCommandSender;
|
|
||||||
use std::boxed::Box;
|
use std::boxed::Box;
|
||||||
|
|
||||||
struct Pcdu {
|
struct Pcdu {
|
||||||
switch_rx: std::sync::mpsc::Receiver<(SwitchId, u16)>,
|
switch_rx: std::sync::mpsc::Receiver<(SwitchId, u16)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Eq, PartialEq)]
|
||||||
enum DeviceState {
|
enum DeviceState {
|
||||||
OFF,
|
OFF,
|
||||||
SwitchingPower,
|
SwitchingPower,
|
||||||
@ -73,6 +71,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
struct MyComplexDevice {
|
struct MyComplexDevice {
|
||||||
power_switcher: Box<dyn PowerSwitcherCommandSender<Error = ()>>,
|
power_switcher: Box<dyn PowerSwitcherCommandSender<Error = ()>>,
|
||||||
|
power_info: Box<dyn PowerSwitchInfo<Error = ()>>,
|
||||||
switch_id: SwitchId,
|
switch_id: SwitchId,
|
||||||
some_state: u16,
|
some_state: u16,
|
||||||
dev_state: DeviceState,
|
dev_state: DeviceState,
|
||||||
@ -92,7 +91,7 @@ mod tests {
|
|||||||
self.dev_state = DeviceState::SwitchingPower;
|
self.dev_state = DeviceState::SwitchingPower;
|
||||||
}
|
}
|
||||||
if self.dev_state == DeviceState::SwitchingPower {
|
if self.dev_state == DeviceState::SwitchingPower {
|
||||||
if self.power_switcher.get_is_switch_on() {
|
if self.power_info.get_is_switch_on(0).unwrap() {
|
||||||
self.dev_state = DeviceState::ON;
|
self.dev_state = DeviceState::ON;
|
||||||
self.mode = 1;
|
self.mode = 1;
|
||||||
}
|
}
|
||||||
|
@ -109,7 +109,9 @@ impl EventReporterBase {
|
|||||||
aux_data: Option<&[u8]>,
|
aux_data: Option<&[u8]>,
|
||||||
) -> Result<(), EcssTmErrorWithSend<E>> {
|
) -> Result<(), EcssTmErrorWithSend<E>> {
|
||||||
let tm = self.generate_generic_event_tm(buf, subservice, time_stamp, event_id, aux_data)?;
|
let tm = self.generate_generic_event_tm(buf, subservice, time_stamp, event_id, aux_data)?;
|
||||||
sender.send_tm(tm)?;
|
sender
|
||||||
|
.send_tm(tm)
|
||||||
|
.map_err(|e| EcssTmErrorWithSend::SendError(e))?;
|
||||||
self.msg_count += 1;
|
self.msg_count += 1;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -266,7 +268,7 @@ mod tests {
|
|||||||
impl EcssTmSenderCore for TestSender {
|
impl EcssTmSenderCore for TestSender {
|
||||||
type Error = ();
|
type Error = ();
|
||||||
|
|
||||||
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<()>> {
|
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
|
||||||
assert!(tm.source_data().is_some());
|
assert!(tm.source_data().is_some());
|
||||||
let src_data = tm.source_data().unwrap();
|
let src_data = tm.source_data().unwrap();
|
||||||
assert!(src_data.len() >= 4);
|
assert!(src_data.len() >= 4);
|
||||||
|
@ -238,33 +238,14 @@ pub mod alloc_mod {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::events::SeverityInfo;
|
use crate::events::SeverityInfo;
|
||||||
use spacepackets::tm::PusTm;
|
use crate::pus::MpscTmAsVecSender;
|
||||||
use std::sync::mpsc::{channel, SendError, TryRecvError};
|
use std::sync::mpsc::{channel, TryRecvError};
|
||||||
use std::vec::Vec;
|
|
||||||
|
|
||||||
const INFO_EVENT: EventU32TypedSev<SeverityInfo> =
|
const INFO_EVENT: EventU32TypedSev<SeverityInfo> =
|
||||||
EventU32TypedSev::<SeverityInfo>::const_new(1, 0);
|
EventU32TypedSev::<SeverityInfo>::const_new(1, 0);
|
||||||
const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5);
|
const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5);
|
||||||
const EMPTY_STAMP: [u8; 7] = [0; 7];
|
const EMPTY_STAMP: [u8; 7] = [0; 7];
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct EventTmSender {
|
|
||||||
sender: std::sync::mpsc::Sender<Vec<u8>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl EcssTmSenderCore for EventTmSender {
|
|
||||||
type Error = SendError<Vec<u8>>;
|
|
||||||
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<Self::Error>> {
|
|
||||||
let mut vec = Vec::new();
|
|
||||||
tm.append_to_vec(&mut vec)
|
|
||||||
.map_err(|e| EcssTmErrorWithSend::EcssTmError(e.into()))?;
|
|
||||||
self.sender
|
|
||||||
.send(vec)
|
|
||||||
.map_err(EcssTmErrorWithSend::SendError)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_basic_man() -> PusEventDispatcher<(), EventU32> {
|
fn create_basic_man() -> PusEventDispatcher<(), EventU32> {
|
||||||
let reporter = EventReporter::new(0x02, 128).expect("Creating event repoter failed");
|
let reporter = EventReporter::new(0x02, 128).expect("Creating event repoter failed");
|
||||||
let backend = DefaultPusMgmtBackendProvider::<EventU32>::default();
|
let backend = DefaultPusMgmtBackendProvider::<EventU32>::default();
|
||||||
@ -275,7 +256,7 @@ mod tests {
|
|||||||
fn test_basic() {
|
fn test_basic() {
|
||||||
let mut event_man = create_basic_man();
|
let mut event_man = create_basic_man();
|
||||||
let (event_tx, event_rx) = channel();
|
let (event_tx, event_rx) = channel();
|
||||||
let mut sender = EventTmSender { sender: event_tx };
|
let mut sender = MpscTmAsVecSender::new(event_tx);
|
||||||
let event_sent = event_man
|
let event_sent = event_man
|
||||||
.generate_pus_event_tm(&mut sender, &EMPTY_STAMP, INFO_EVENT, None)
|
.generate_pus_event_tm(&mut sender, &EMPTY_STAMP, INFO_EVENT, None)
|
||||||
.expect("Sending info event failed");
|
.expect("Sending info event failed");
|
||||||
@ -289,7 +270,7 @@ mod tests {
|
|||||||
fn test_disable_event() {
|
fn test_disable_event() {
|
||||||
let mut event_man = create_basic_man();
|
let mut event_man = create_basic_man();
|
||||||
let (event_tx, event_rx) = channel();
|
let (event_tx, event_rx) = channel();
|
||||||
let mut sender = EventTmSender { sender: event_tx };
|
let mut sender = MpscTmAsVecSender::new(event_tx);
|
||||||
let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT);
|
let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT);
|
||||||
assert!(res.is_ok());
|
assert!(res.is_ok());
|
||||||
assert!(res.unwrap());
|
assert!(res.unwrap());
|
||||||
@ -312,7 +293,7 @@ mod tests {
|
|||||||
fn test_reenable_event() {
|
fn test_reenable_event() {
|
||||||
let mut event_man = create_basic_man();
|
let mut event_man = create_basic_man();
|
||||||
let (event_tx, event_rx) = channel();
|
let (event_tx, event_rx) = channel();
|
||||||
let mut sender = EventTmSender { sender: event_tx };
|
let mut sender = MpscTmAsVecSender::new(event_tx);
|
||||||
let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT);
|
let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT);
|
||||||
assert!(res.is_ok());
|
assert!(res.is_ok());
|
||||||
assert!(res.unwrap());
|
assert!(res.unwrap());
|
||||||
|
@ -19,9 +19,12 @@ pub mod verification;
|
|||||||
#[cfg(feature = "alloc")]
|
#[cfg(feature = "alloc")]
|
||||||
pub use alloc_mod::*;
|
pub use alloc_mod::*;
|
||||||
|
|
||||||
|
#[cfg(feature = "std")]
|
||||||
|
pub use std_mod::*;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum EcssTmErrorWithSend<E> {
|
pub enum EcssTmErrorWithSend<E> {
|
||||||
/// Errors related to sending the verification telemetry to a TM recipient
|
/// Errors related to sending the telemetry to a TM recipient
|
||||||
SendError(E),
|
SendError(E),
|
||||||
EcssTmError(EcssTmError),
|
EcssTmError(EcssTmError),
|
||||||
}
|
}
|
||||||
@ -32,7 +35,7 @@ impl<E> From<EcssTmError> for EcssTmErrorWithSend<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generic error type which is also able to wrap a user send error with the user supplied type E.
|
/// Generic error type for PUS TM handling.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum EcssTmError {
|
pub enum EcssTmError {
|
||||||
/// Errors related to the time stamp format of the telemetry
|
/// Errors related to the time stamp format of the telemetry
|
||||||
@ -61,7 +64,7 @@ impl From<ByteConversionError> for EcssTmError {
|
|||||||
pub trait EcssTmSenderCore: Send {
|
pub trait EcssTmSenderCore: Send {
|
||||||
type Error;
|
type Error;
|
||||||
|
|
||||||
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<Self::Error>>;
|
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "alloc")]
|
#[cfg(feature = "alloc")]
|
||||||
@ -89,6 +92,111 @@ mod alloc_mod {
|
|||||||
impl_downcast!(EcssTmSender assoc Error);
|
impl_downcast!(EcssTmSender assoc Error);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "std")]
|
||||||
|
pub mod std_mod {
|
||||||
|
use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError};
|
||||||
|
use crate::pus::EcssTmSenderCore;
|
||||||
|
use alloc::vec::Vec;
|
||||||
|
use spacepackets::ecss::PusError;
|
||||||
|
use spacepackets::tm::PusTm;
|
||||||
|
use std::sync::mpsc::SendError;
|
||||||
|
use std::sync::{mpsc, RwLockWriteGuard};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum MpscPusInStoreSendError {
|
||||||
|
LockError,
|
||||||
|
PusError(PusError),
|
||||||
|
StoreError(StoreError),
|
||||||
|
SendError(SendError<StoreAddr>),
|
||||||
|
RxDisconnected(StoreAddr),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<PusError> for MpscPusInStoreSendError {
|
||||||
|
fn from(value: PusError) -> Self {
|
||||||
|
MpscPusInStoreSendError::PusError(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl From<SendError<StoreAddr>> for MpscPusInStoreSendError {
|
||||||
|
fn from(value: SendError<StoreAddr>) -> Self {
|
||||||
|
MpscPusInStoreSendError::SendError(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl From<StoreError> for MpscPusInStoreSendError {
|
||||||
|
fn from(value: StoreError) -> Self {
|
||||||
|
MpscPusInStoreSendError::StoreError(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct MpscTmInStoreSender {
|
||||||
|
store_helper: SharedPool,
|
||||||
|
sender: mpsc::Sender<StoreAddr>,
|
||||||
|
pub ignore_poison_errors: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EcssTmSenderCore for MpscTmInStoreSender {
|
||||||
|
type Error = MpscPusInStoreSendError;
|
||||||
|
|
||||||
|
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
|
||||||
|
let operation = |mut store: RwLockWriteGuard<ShareablePoolProvider>| {
|
||||||
|
let (addr, slice) = store.free_element(tm.len_packed())?;
|
||||||
|
tm.write_to_bytes(slice)?;
|
||||||
|
self.sender.send(addr)?;
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
match self.store_helper.write() {
|
||||||
|
Ok(pool) => operation(pool),
|
||||||
|
Err(e) => {
|
||||||
|
if self.ignore_poison_errors {
|
||||||
|
operation(e.into_inner())
|
||||||
|
} else {
|
||||||
|
Err(MpscPusInStoreSendError::LockError)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MpscTmInStoreSender {
|
||||||
|
pub fn new(store_helper: SharedPool, sender: mpsc::Sender<StoreAddr>) -> Self {
|
||||||
|
Self {
|
||||||
|
store_helper,
|
||||||
|
sender,
|
||||||
|
ignore_poison_errors: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum MpscAsVecSenderError {
|
||||||
|
PusError(PusError),
|
||||||
|
SendError(SendError<Vec<u8>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct MpscTmAsVecSender {
|
||||||
|
sender: mpsc::Sender<Vec<u8>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MpscTmAsVecSender {
|
||||||
|
pub fn new(sender: mpsc::Sender<Vec<u8>>) -> Self {
|
||||||
|
Self { sender }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl EcssTmSenderCore for MpscTmAsVecSender {
|
||||||
|
type Error = MpscAsVecSenderError;
|
||||||
|
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
|
||||||
|
let mut vec = Vec::new();
|
||||||
|
tm.append_to_vec(&mut vec)
|
||||||
|
.map_err(MpscAsVecSenderError::PusError)?;
|
||||||
|
self.sender
|
||||||
|
.send(vec)
|
||||||
|
.map_err(MpscAsVecSenderError::SendError)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||||
pub enum GenericTcCheckError {
|
pub enum GenericTcCheckError {
|
||||||
NotEnoughAppData,
|
NotEnoughAppData,
|
||||||
|
@ -102,10 +102,7 @@ use crate::seq_count::SequenceCountProviderCore;
|
|||||||
#[cfg(all(feature = "crossbeam", feature = "std"))]
|
#[cfg(all(feature = "crossbeam", feature = "std"))]
|
||||||
pub use stdmod::CrossbeamVerifSender;
|
pub use stdmod::CrossbeamVerifSender;
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
pub use stdmod::{
|
pub use stdmod::{MpscVerifSender, SharedStdVerifReporterWithSender, StdVerifReporterWithSender};
|
||||||
MpscVerifSender, SharedStdVerifReporterWithSender, StdVerifReporterWithSender,
|
|
||||||
StdVerifSenderError,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard.
|
/// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard.
|
||||||
///
|
///
|
||||||
@ -520,7 +517,12 @@ impl VerificationReporterCore {
|
|||||||
{
|
{
|
||||||
sender
|
sender
|
||||||
.send_tm(sendable.pus_tm.take().unwrap())
|
.send_tm(sendable.pus_tm.take().unwrap())
|
||||||
.map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?;
|
.map_err(|e| {
|
||||||
|
VerificationOrSendErrorWithToken(
|
||||||
|
EcssTmErrorWithSend::SendError(e),
|
||||||
|
sendable.token.unwrap(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
Ok(sendable.send_success_acceptance_success(Some(seq_counter)))
|
Ok(sendable.send_success_acceptance_success(Some(seq_counter)))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -532,7 +534,12 @@ impl VerificationReporterCore {
|
|||||||
) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateNone>> {
|
) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateNone>> {
|
||||||
sender
|
sender
|
||||||
.send_tm(sendable.pus_tm.take().unwrap())
|
.send_tm(sendable.pus_tm.take().unwrap())
|
||||||
.map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?;
|
.map_err(|e| {
|
||||||
|
VerificationOrSendErrorWithToken(
|
||||||
|
EcssTmErrorWithSend::SendError(e),
|
||||||
|
sendable.token.unwrap(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
sendable.send_success_verif_failure(Some(seq_counter));
|
sendable.send_success_verif_failure(Some(seq_counter));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -591,7 +598,12 @@ impl VerificationReporterCore {
|
|||||||
> {
|
> {
|
||||||
sender
|
sender
|
||||||
.send_tm(sendable.pus_tm.take().unwrap())
|
.send_tm(sendable.pus_tm.take().unwrap())
|
||||||
.map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?;
|
.map_err(|e| {
|
||||||
|
VerificationOrSendErrorWithToken(
|
||||||
|
EcssTmErrorWithSend::SendError(e),
|
||||||
|
sendable.token.unwrap(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
Ok(sendable.send_success_start_success(Some(seq_counter)))
|
Ok(sendable.send_success_start_success(Some(seq_counter)))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -627,7 +639,12 @@ impl VerificationReporterCore {
|
|||||||
) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateAccepted>> {
|
) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateAccepted>> {
|
||||||
sender
|
sender
|
||||||
.send_tm(sendable.pus_tm.take().unwrap())
|
.send_tm(sendable.pus_tm.take().unwrap())
|
||||||
.map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?;
|
.map_err(|e| {
|
||||||
|
VerificationOrSendErrorWithToken(
|
||||||
|
EcssTmErrorWithSend::SendError(e),
|
||||||
|
sendable.token.unwrap(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
sendable.send_success_verif_failure(Some(seq_counter));
|
sendable.send_success_verif_failure(Some(seq_counter));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -738,7 +755,12 @@ impl VerificationReporterCore {
|
|||||||
) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateStarted>> {
|
) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateStarted>> {
|
||||||
sender
|
sender
|
||||||
.send_tm(sendable.pus_tm.take().unwrap())
|
.send_tm(sendable.pus_tm.take().unwrap())
|
||||||
.map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?;
|
.map_err(|e| {
|
||||||
|
VerificationOrSendErrorWithToken(
|
||||||
|
EcssTmErrorWithSend::SendError(e),
|
||||||
|
sendable.token.unwrap(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
sendable.send_success_step_or_completion_success(Some(seq_counter));
|
sendable.send_success_step_or_completion_success(Some(seq_counter));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -751,7 +773,12 @@ impl VerificationReporterCore {
|
|||||||
) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateStarted>> {
|
) -> Result<(), VerificationOrSendErrorWithToken<E, TcStateStarted>> {
|
||||||
sender
|
sender
|
||||||
.send_tm(sendable.pus_tm.take().unwrap())
|
.send_tm(sendable.pus_tm.take().unwrap())
|
||||||
.map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?;
|
.map_err(|e| {
|
||||||
|
VerificationOrSendErrorWithToken(
|
||||||
|
EcssTmErrorWithSend::SendError(e),
|
||||||
|
sendable.token.unwrap(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
sendable.send_success_verif_failure(Some(seq_counter));
|
sendable.send_success_verif_failure(Some(seq_counter));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -1223,33 +1250,15 @@ mod alloc_mod {
|
|||||||
mod stdmod {
|
mod stdmod {
|
||||||
use super::alloc_mod::VerificationReporterWithSender;
|
use super::alloc_mod::VerificationReporterWithSender;
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError};
|
use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr};
|
||||||
|
use crate::pus::MpscPusInStoreSendError;
|
||||||
use delegate::delegate;
|
use delegate::delegate;
|
||||||
use spacepackets::tm::PusTm;
|
use spacepackets::tm::PusTm;
|
||||||
use std::sync::{mpsc, Arc, Mutex, RwLockWriteGuard};
|
use std::sync::{mpsc, Arc, Mutex, RwLockWriteGuard};
|
||||||
|
|
||||||
pub type StdVerifReporterWithSender = VerificationReporterWithSender<StdVerifSenderError>;
|
pub type StdVerifReporterWithSender = VerificationReporterWithSender<MpscPusInStoreSendError>;
|
||||||
pub type SharedStdVerifReporterWithSender = Arc<Mutex<StdVerifReporterWithSender>>;
|
pub type SharedStdVerifReporterWithSender = Arc<Mutex<StdVerifReporterWithSender>>;
|
||||||
|
|
||||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
|
||||||
pub enum StdVerifSenderError {
|
|
||||||
PoisonError,
|
|
||||||
StoreError(StoreError),
|
|
||||||
RxDisconnected(StoreAddr),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<StoreError> for StdVerifSenderError {
|
|
||||||
fn from(e: StoreError) -> Self {
|
|
||||||
StdVerifSenderError::StoreError(e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<StoreError> for EcssTmErrorWithSend<StdVerifSenderError> {
|
|
||||||
fn from(e: StoreError) -> Self {
|
|
||||||
EcssTmErrorWithSend::SendError(e.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
trait SendBackend: Send {
|
trait SendBackend: Send {
|
||||||
fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr>;
|
fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr>;
|
||||||
}
|
}
|
||||||
@ -1297,11 +1306,11 @@ mod stdmod {
|
|||||||
|
|
||||||
//noinspection RsTraitImplementation
|
//noinspection RsTraitImplementation
|
||||||
impl EcssTmSenderCore for MpscVerifSender {
|
impl EcssTmSenderCore for MpscVerifSender {
|
||||||
type Error = StdVerifSenderError;
|
type Error = MpscPusInStoreSendError;
|
||||||
|
|
||||||
delegate!(
|
delegate!(
|
||||||
to self.base {
|
to self.base {
|
||||||
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<StdVerifSenderError>>;
|
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>;
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -1332,25 +1341,26 @@ mod stdmod {
|
|||||||
//noinspection RsTraitImplementation
|
//noinspection RsTraitImplementation
|
||||||
#[cfg(feature = "crossbeam")]
|
#[cfg(feature = "crossbeam")]
|
||||||
impl EcssTmSenderCore for CrossbeamVerifSender {
|
impl EcssTmSenderCore for CrossbeamVerifSender {
|
||||||
type Error = StdVerifSenderError;
|
type Error = MpscPusInStoreSendError;
|
||||||
|
|
||||||
delegate!(
|
delegate!(
|
||||||
to self.base {
|
to self.base {
|
||||||
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<StdVerifSenderError>>;
|
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error>;
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: SendBackend + Clone + 'static> EcssTmSenderCore for StdSenderBase<S> {
|
impl<S: SendBackend + Clone + 'static> EcssTmSenderCore for StdSenderBase<S> {
|
||||||
type Error = StdVerifSenderError;
|
type Error = MpscPusInStoreSendError;
|
||||||
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<Self::Error>> {
|
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
|
||||||
let operation = |mut mg: RwLockWriteGuard<ShareablePoolProvider>| {
|
let operation = |mut mg: RwLockWriteGuard<ShareablePoolProvider>| {
|
||||||
let (addr, buf) = mg.free_element(tm.len_packed())?;
|
let (addr, buf) = mg.free_element(tm.len_packed())?;
|
||||||
tm.write_to_bytes(buf).map_err(EcssTmError::PusError)?;
|
tm.write_to_bytes(buf)
|
||||||
|
.map_err(MpscPusInStoreSendError::PusError)?;
|
||||||
drop(mg);
|
drop(mg);
|
||||||
self.tx.send(addr).map_err(|_| {
|
self.tx
|
||||||
EcssTmErrorWithSend::SendError(StdVerifSenderError::RxDisconnected(addr))
|
.send(addr)
|
||||||
})?;
|
.map_err(|_| MpscPusInStoreSendError::RxDisconnected(addr))?;
|
||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
match self.tm_store.write() {
|
match self.tm_store.write() {
|
||||||
@ -1359,9 +1369,7 @@ mod stdmod {
|
|||||||
if self.ignore_poison_error {
|
if self.ignore_poison_error {
|
||||||
operation(poison_error.into_inner())
|
operation(poison_error.into_inner())
|
||||||
} else {
|
} else {
|
||||||
Err(EcssTmErrorWithSend::SendError(
|
Err(MpscPusInStoreSendError::LockError)
|
||||||
StdVerifSenderError::PoisonError,
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1427,7 +1435,7 @@ mod tests {
|
|||||||
|
|
||||||
impl EcssTmSenderCore for TestSender {
|
impl EcssTmSenderCore for TestSender {
|
||||||
type Error = ();
|
type Error = ();
|
||||||
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<Self::Error>> {
|
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
|
||||||
assert_eq!(PusPacket::service(&tm), 1);
|
assert_eq!(PusPacket::service(&tm), 1);
|
||||||
assert!(tm.source_data().is_some());
|
assert!(tm.source_data().is_some());
|
||||||
let mut time_stamp = [0; 7];
|
let mut time_stamp = [0; 7];
|
||||||
@ -1457,8 +1465,8 @@ mod tests {
|
|||||||
|
|
||||||
impl EcssTmSenderCore for FallibleSender {
|
impl EcssTmSenderCore for FallibleSender {
|
||||||
type Error = DummyError;
|
type Error = DummyError;
|
||||||
fn send_tm(&mut self, _: PusTm) -> Result<(), EcssTmErrorWithSend<DummyError>> {
|
fn send_tm(&mut self, _: PusTm) -> Result<(), Self::Error> {
|
||||||
Err(EcssTmErrorWithSend::SendError(DummyError {}))
|
Err(DummyError {})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,8 +7,8 @@ use satrs_core::params::{Params, ParamsHeapless, WritableToBeBytes};
|
|||||||
use satrs_core::pus::event_man::{
|
use satrs_core::pus::event_man::{
|
||||||
DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher,
|
DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher,
|
||||||
};
|
};
|
||||||
use satrs_core::pus::{EcssTmErrorWithSend, EcssTmSenderCore};
|
use satrs_core::pus::EcssTmSenderCore;
|
||||||
use spacepackets::ecss::PusPacket;
|
use spacepackets::ecss::{PusError, PusPacket};
|
||||||
use spacepackets::tm::PusTm;
|
use spacepackets::tm::PusTm;
|
||||||
use std::sync::mpsc::{channel, SendError, TryRecvError};
|
use std::sync::mpsc::{channel, SendError, TryRecvError};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
@ -18,20 +18,26 @@ const INFO_EVENT: EventU32TypedSev<SeverityInfo> =
|
|||||||
const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5);
|
const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5);
|
||||||
const EMPTY_STAMP: [u8; 7] = [0; 7];
|
const EMPTY_STAMP: [u8; 7] = [0; 7];
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum CustomTmSenderError {
|
||||||
|
SendError(SendError<Vec<u8>>),
|
||||||
|
PusError(PusError),
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct EventTmSender {
|
struct EventTmSender {
|
||||||
sender: std::sync::mpsc::Sender<Vec<u8>>,
|
sender: std::sync::mpsc::Sender<Vec<u8>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EcssTmSenderCore for EventTmSender {
|
impl EcssTmSenderCore for EventTmSender {
|
||||||
type Error = SendError<Vec<u8>>;
|
type Error = CustomTmSenderError;
|
||||||
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<Self::Error>> {
|
fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> {
|
||||||
let mut vec = Vec::new();
|
let mut vec = Vec::new();
|
||||||
tm.append_to_vec(&mut vec)
|
tm.append_to_vec(&mut vec)
|
||||||
.map_err(|e| EcssTmErrorWithSend::EcssTmError(e.into()))?;
|
.map_err(|e| CustomTmSenderError::PusError(e))?;
|
||||||
self.sender
|
self.sender
|
||||||
.send(vec)
|
.send(vec)
|
||||||
.map_err(EcssTmErrorWithSend::SendError)?;
|
.map_err(CustomTmSenderError::SendError)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,7 @@ use satrs_core::event_man::{
|
|||||||
};
|
};
|
||||||
use satrs_core::events::EventU32;
|
use satrs_core::events::EventU32;
|
||||||
use satrs_core::hk::HkRequest;
|
use satrs_core::hk::HkRequest;
|
||||||
use satrs_core::pool::{LocalPool, PoolCfg, StoreAddr};
|
use satrs_core::pool::{LocalPool, PoolCfg};
|
||||||
use satrs_core::pus::event_man::{
|
use satrs_core::pus::event_man::{
|
||||||
DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken,
|
DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken,
|
||||||
PusEventDispatcher,
|
PusEventDispatcher,
|
||||||
@ -27,7 +27,7 @@ use satrs_core::pus::hk::Subservice as HkSubservice;
|
|||||||
use satrs_core::pus::verification::{
|
use satrs_core::pus::verification::{
|
||||||
MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender,
|
MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender,
|
||||||
};
|
};
|
||||||
use satrs_core::pus::{EcssTmErrorWithSend, EcssTmSenderCore};
|
use satrs_core::pus::MpscTmInStoreSender;
|
||||||
use satrs_core::seq_count::SeqCountProviderSimple;
|
use satrs_core::seq_count::SeqCountProviderSimple;
|
||||||
use satrs_core::spacepackets::{
|
use satrs_core::spacepackets::{
|
||||||
time::cds::TimeProvider,
|
time::cds::TimeProvider,
|
||||||
@ -39,36 +39,10 @@ use satrs_example::{RequestTargetId, OBSW_SERVER_ADDR, SERVER_PORT};
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::net::{IpAddr, SocketAddr};
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::sync::mpsc::{channel, TryRecvError};
|
use std::sync::mpsc::{channel, TryRecvError};
|
||||||
use std::sync::{mpsc, Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct EventTmSender {
|
|
||||||
store_helper: TmStore,
|
|
||||||
sender: mpsc::Sender<StoreAddr>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl EventTmSender {
|
|
||||||
fn new(store_helper: TmStore, sender: mpsc::Sender<StoreAddr>) -> Self {
|
|
||||||
Self {
|
|
||||||
store_helper,
|
|
||||||
sender,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl EcssTmSenderCore for EventTmSender {
|
|
||||||
type Error = mpsc::SendError<StoreAddr>;
|
|
||||||
|
|
||||||
fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend<Self::Error>> {
|
|
||||||
let addr = self.store_helper.add_pus_tm(&tm);
|
|
||||||
self.sender
|
|
||||||
.send(addr)
|
|
||||||
.map_err(EcssTmErrorWithSend::SendError)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
setup_logger().expect("setting up logging with fern failed");
|
setup_logger().expect("setting up logging with fern failed");
|
||||||
println!("Running OBSW example");
|
println!("Running OBSW example");
|
||||||
@ -199,7 +173,7 @@ fn main() {
|
|||||||
.name("Event".to_string())
|
.name("Event".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let mut timestamp: [u8; 7] = [0; 7];
|
let mut timestamp: [u8; 7] = [0; 7];
|
||||||
let mut sender = EventTmSender::new(tm_store, tm_funnel_tx);
|
let mut sender = MpscTmInStoreSender::new(tm_store.pool, tm_funnel_tx);
|
||||||
let mut time_provider = TimeProvider::new_with_u16_days(0, 0);
|
let mut time_provider = TimeProvider::new_with_u16_days(0, 0);
|
||||||
let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| {
|
let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| {
|
||||||
reporter_event_handler
|
reporter_event_handler
|
||||||
|
Loading…
Reference in New Issue
Block a user