getting there..

This commit is contained in:
Robin Müller 2023-07-09 20:05:45 +02:00
parent 180e770392
commit e46b88384f
Signed by: muellerr
GPG Key ID: A649FB78196E3849
12 changed files with 326 additions and 219 deletions

View File

@ -64,7 +64,7 @@ optional = true
# version = "0.6" # version = "0.6"
# path = "../spacepackets" # path = "../spacepackets"
git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
rev = "4969d6c33c6" rev = "e3d2d885385"
default-features = false default-features = false
[dev-dependencies] [dev-dependencies]

View File

@ -1,4 +1,4 @@
use crate::pus::{source_buffer_large_enough, EcssTmtcErrorWithSend}; use crate::pus::{source_buffer_large_enough, EcssTmtcError};
use spacepackets::ecss::{EcssEnumeration, PusError}; use spacepackets::ecss::{EcssEnumeration, PusError};
use spacepackets::tm::PusTm; use spacepackets::tm::PusTm;
use spacepackets::tm::PusTmSecondaryHeader; use spacepackets::tm::PusTmSecondaryHeader;
@ -34,7 +34,7 @@ impl EventReporterBase {
time_stamp: &[u8], time_stamp: &[u8],
event_id: impl EcssEnumeration, event_id: impl EcssEnumeration,
aux_data: Option<&[u8]>, aux_data: Option<&[u8]>,
) -> Result<(), EcssTmtcErrorWithSend> { ) -> Result<(), EcssTmtcError> {
self.generate_and_send_generic_tm( self.generate_and_send_generic_tm(
buf, buf,
Subservice::TmInfoReport, Subservice::TmInfoReport,
@ -52,7 +52,7 @@ impl EventReporterBase {
time_stamp: &[u8], time_stamp: &[u8],
event_id: impl EcssEnumeration, event_id: impl EcssEnumeration,
aux_data: Option<&[u8]>, aux_data: Option<&[u8]>,
) -> Result<(), EcssTmtcErrorWithSend> { ) -> Result<(), EcssTmtcError> {
self.generate_and_send_generic_tm( self.generate_and_send_generic_tm(
buf, buf,
Subservice::TmLowSeverityReport, Subservice::TmLowSeverityReport,
@ -70,7 +70,7 @@ impl EventReporterBase {
time_stamp: &[u8], time_stamp: &[u8],
event_id: impl EcssEnumeration, event_id: impl EcssEnumeration,
aux_data: Option<&[u8]>, aux_data: Option<&[u8]>,
) -> Result<(), EcssTmtcErrorWithSend> { ) -> Result<(), EcssTmtcError> {
self.generate_and_send_generic_tm( self.generate_and_send_generic_tm(
buf, buf,
Subservice::TmMediumSeverityReport, Subservice::TmMediumSeverityReport,
@ -88,7 +88,7 @@ impl EventReporterBase {
time_stamp: &[u8], time_stamp: &[u8],
event_id: impl EcssEnumeration, event_id: impl EcssEnumeration,
aux_data: Option<&[u8]>, aux_data: Option<&[u8]>,
) -> Result<(), EcssTmtcErrorWithSend> { ) -> Result<(), EcssTmtcError> {
self.generate_and_send_generic_tm( self.generate_and_send_generic_tm(
buf, buf,
Subservice::TmHighSeverityReport, Subservice::TmHighSeverityReport,
@ -107,7 +107,7 @@ impl EventReporterBase {
time_stamp: &[u8], time_stamp: &[u8],
event_id: impl EcssEnumeration, event_id: impl EcssEnumeration,
aux_data: Option<&[u8]>, aux_data: Option<&[u8]>,
) -> Result<(), EcssTmtcErrorWithSend> { ) -> Result<(), EcssTmtcError> {
let tm = self.generate_generic_event_tm(buf, subservice, time_stamp, event_id, aux_data)?; let tm = self.generate_generic_event_tm(buf, subservice, time_stamp, event_id, aux_data)?;
sender.send_tm(tm.into())?; sender.send_tm(tm.into())?;
self.msg_count += 1; self.msg_count += 1;
@ -121,7 +121,7 @@ impl EventReporterBase {
time_stamp: &'a [u8], time_stamp: &'a [u8],
event_id: impl EcssEnumeration, event_id: impl EcssEnumeration,
aux_data: Option<&[u8]>, aux_data: Option<&[u8]>,
) -> Result<PusTm, EcssTmtcErrorWithSend> { ) -> Result<PusTm, EcssTmtcError> {
let mut src_data_len = event_id.size(); let mut src_data_len = event_id.size();
if let Some(aux_data) = aux_data { if let Some(aux_data) = aux_data {
src_data_len += aux_data.len(); src_data_len += aux_data.len();
@ -138,7 +138,7 @@ impl EventReporterBase {
let mut current_idx = 0; let mut current_idx = 0;
event_id event_id
.write_to_be_bytes(&mut buf[0..event_id.size()]) .write_to_be_bytes(&mut buf[0..event_id.size()])
.map_err(PusError::ByteConversionError)?; .map_err(PusError::ByteConversion)?;
current_idx += event_id.size(); current_idx += event_id.size();
if let Some(aux_data) = aux_data { if let Some(aux_data) = aux_data {
buf[current_idx..current_idx + aux_data.len()].copy_from_slice(aux_data); buf[current_idx..current_idx + aux_data.len()].copy_from_slice(aux_data);
@ -178,7 +178,7 @@ mod alloc_mod {
time_stamp: &[u8], time_stamp: &[u8],
event_id: impl EcssEnumeration, event_id: impl EcssEnumeration,
aux_data: Option<&[u8]>, aux_data: Option<&[u8]>,
) -> Result<(), EcssTmtcErrorWithSend> { ) -> Result<(), EcssTmtcError> {
self.reporter.event_info( self.reporter.event_info(
self.source_data_buf.as_mut_slice(), self.source_data_buf.as_mut_slice(),
sender, sender,
@ -194,7 +194,7 @@ mod alloc_mod {
time_stamp: &[u8], time_stamp: &[u8],
event_id: impl EcssEnumeration, event_id: impl EcssEnumeration,
aux_data: Option<&[u8]>, aux_data: Option<&[u8]>,
) -> Result<(), EcssTmtcErrorWithSend> { ) -> Result<(), EcssTmtcError> {
self.reporter.event_low_severity( self.reporter.event_low_severity(
self.source_data_buf.as_mut_slice(), self.source_data_buf.as_mut_slice(),
sender, sender,
@ -210,7 +210,7 @@ mod alloc_mod {
time_stamp: &[u8], time_stamp: &[u8],
event_id: impl EcssEnumeration, event_id: impl EcssEnumeration,
aux_data: Option<&[u8]>, aux_data: Option<&[u8]>,
) -> Result<(), EcssTmtcErrorWithSend> { ) -> Result<(), EcssTmtcError> {
self.reporter.event_medium_severity( self.reporter.event_medium_severity(
self.source_data_buf.as_mut_slice(), self.source_data_buf.as_mut_slice(),
sender, sender,
@ -226,7 +226,7 @@ mod alloc_mod {
time_stamp: &[u8], time_stamp: &[u8],
event_id: impl EcssEnumeration, event_id: impl EcssEnumeration,
aux_data: Option<&[u8]>, aux_data: Option<&[u8]>,
) -> Result<(), EcssTmtcErrorWithSend> { ) -> Result<(), EcssTmtcError> {
self.reporter.event_high_severity( self.reporter.event_high_severity(
self.source_data_buf.as_mut_slice(), self.source_data_buf.as_mut_slice(),
sender, sender,
@ -243,7 +243,7 @@ mod tests {
use super::*; use super::*;
use crate::events::{EventU32, Severity}; use crate::events::{EventU32, Severity};
use crate::pus::tests::CommonTmInfo; use crate::pus::tests::CommonTmInfo;
use crate::pus::{EcssSender, PusTmWrapper}; use crate::pus::{EcssChannel, PusTmWrapper};
use crate::SenderId; use crate::SenderId;
use spacepackets::ByteConversionError; use spacepackets::ByteConversionError;
use std::cell::RefCell; use std::cell::RefCell;
@ -268,14 +268,14 @@ mod tests {
pub service_queue: RefCell<VecDeque<TmInfo>>, pub service_queue: RefCell<VecDeque<TmInfo>>,
} }
impl EcssSender for TestSender { impl EcssChannel for TestSender {
fn id(&self) -> SenderId { fn id(&self) -> SenderId {
0 0
} }
} }
impl EcssTmSenderCore for TestSender { impl EcssTmSenderCore for TestSender {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcErrorWithSend> { fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
match tm { match tm {
PusTmWrapper::InStore(_) => { PusTmWrapper::InStore(_) => {
panic!("TestSender: unexpected call with address"); panic!("TestSender: unexpected call with address");
@ -425,7 +425,7 @@ mod tests {
let err = reporter.event_info(sender, &time_stamp_empty, event, None); let err = reporter.event_info(sender, &time_stamp_empty, event, None);
assert!(err.is_err()); assert!(err.is_err());
let err = err.unwrap_err(); let err = err.unwrap_err();
if let EcssTmtcErrorWithSend::EcssTmtcError(EcssTmtcErrorWithSend::ByteConversion( if let EcssTmtcError::EcssTmtcError(EcssTmtcError::ByteConversion(
ByteConversionError::ToSliceTooSmall(missmatch), ByteConversionError::ToSliceTooSmall(missmatch),
)) = err )) = err
{ {

View File

@ -13,7 +13,7 @@ pub use crate::pus::event::EventReporter;
use crate::pus::verification::TcStateToken; use crate::pus::verification::TcStateToken;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
use crate::pus::EcssTmSenderCore; use crate::pus::EcssTmSenderCore;
use crate::pus::EcssTmtcErrorWithSend; use crate::pus::EcssTmtcError;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub use alloc_mod::*; pub use alloc_mod::*;
@ -96,12 +96,12 @@ pub struct EventRequestWithToken<Event: GenericEvent = EventU32> {
#[derive(Debug)] #[derive(Debug)]
pub enum EventManError { pub enum EventManError {
EcssTmtcError(EcssTmtcErrorWithSend), EcssTmtcError(EcssTmtcError),
SeverityMissmatch(Severity, Severity), SeverityMissmatch(Severity, Severity),
} }
impl From<EcssTmtcErrorWithSend> for EventManError { impl From<EcssTmtcError> for EventManError {
fn from(v: EcssTmtcErrorWithSend) -> Self { fn from(v: EcssTmtcError) -> Self {
Self::EcssTmtcError(v) Self::EcssTmtcError(v)
} }
} }

View File

@ -5,8 +5,8 @@ use crate::pus::verification::{
StdVerifReporterWithSender, TcStateAccepted, TcStateToken, VerificationToken, StdVerifReporterWithSender, TcStateAccepted, TcStateToken, VerificationToken,
}; };
use crate::pus::{ use crate::pus::{
AcceptedTc, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult, AcceptedTc, EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult,
PusPacketHandlingError, PusServiceBase, PusServiceHandler, PusPacketHandlingError, PusServiceBase, PusServiceHandler, ReceivedTcWrapper,
}; };
use spacepackets::ecss::event::Subservice; use spacepackets::ecss::event::Subservice;
use spacepackets::ecss::PusPacket; use spacepackets::ecss::PusPacket;
@ -21,15 +21,14 @@ pub struct PusService5EventHandler {
impl PusService5EventHandler { impl PusService5EventHandler {
pub fn new( pub fn new(
receiver: Receiver<AcceptedTc>, tc_receiver: Box<dyn EcssTcReceiver>,
tc_pool: SharedPool,
tm_sender: Box<dyn EcssTmSender>, tm_sender: Box<dyn EcssTmSender>,
tm_apid: u16, tm_apid: u16,
verification_handler: StdVerifReporterWithSender, verification_handler: StdVerifReporterWithSender,
event_request_tx: Sender<EventRequestWithToken>, event_request_tx: Sender<EventRequestWithToken>,
) -> Self { ) -> Self {
Self { Self {
psb: PusServiceBase::new(receiver, tc_pool, tm_sender, tm_apid, verification_handler), psb: PusServiceBase::new(tc_receiver, tm_sender, tm_apid, verification_handler),
event_request_tx, event_request_tx,
} }
} }
@ -45,11 +44,16 @@ impl PusServiceHandler for PusService5EventHandler {
fn handle_one_tc( fn handle_one_tc(
&mut self, &mut self,
addr: StoreAddr, tc_in_store_with_token: ReceivedTcWrapper,
token: VerificationToken<TcStateAccepted>,
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { ) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
self.copy_tc_to_buf(addr)?; let ReceivedTcWrapper {
let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf).unwrap(); tc,
token,
pool_guard,
} = tc_in_store_with_token;
// TODO: Better token handling..
let token = token.expect("invalid token");
let accepted_token = VerificationToken::<TcStateAccepted>::try_from(token).unwrap();
let subservice = tc.subservice(); let subservice = tc.subservice();
let srv = Subservice::try_from(subservice); let srv = Subservice::try_from(subservice);
if srv.is_err() { if srv.is_err() {
@ -70,7 +74,7 @@ impl PusServiceHandler for PusService5EventHandler {
.psb .psb
.verification_handler .verification_handler
.borrow_mut() .borrow_mut()
.start_success(token, Some(&stamp)) .start_success(accepted_token, Some(&stamp))
.map_err(|_| PartialPusHandlingError::Verification); .map_err(|_| PartialPusHandlingError::Verification);
let partial_error = start_token.clone().err(); let partial_error = start_token.clone().err();
let mut token: TcStateToken = token.into(); let mut token: TcStateToken = token.into();
@ -91,7 +95,7 @@ impl PusServiceHandler for PusService5EventHandler {
self.event_request_tx self.event_request_tx
.send(event_req_with_token) .send(event_req_with_token)
.map_err(|_| { .map_err(|_| {
PusPacketHandlingError::SendError("Forwarding event request failed".into()) PusPacketHandlingError::Other("Forwarding event request failed".into())
})?; })?;
if let Some(partial_error) = partial_error { if let Some(partial_error) = partial_error {
return Ok(PusPacketHandlerResult::RequestHandledPartialSuccess( return Ok(PusPacketHandlerResult::RequestHandledPartialSuccess(

View File

@ -28,8 +28,8 @@ pub mod verification;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub use alloc_mod::*; pub use alloc_mod::*;
use crate::pool::{StoreAddr, StoreError}; use crate::pool::{PoolGuard, PoolRwGuard, StoreAddr, StoreError};
use crate::pus::verification::TcStateToken; use crate::pus::verification::{TcStateAccepted, TcStateToken, VerificationToken};
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub use std_mod::*; pub use std_mod::*;
@ -51,6 +51,12 @@ impl<'tm> From<PusTm<'tm>> for PusTmWrapper<'tm> {
} }
} }
pub type TcAddrWithToken = (StoreAddr, TcStateToken);
/// Generic abstraction for a telecommand being sent around after is has been accepted.
/// The actual telecommand is stored inside a pre-allocated pool structure.
pub type AcceptedTc = (StoreAddr, VerificationToken<TcStateAccepted>);
/// Generic error type for sending something via a message queue. /// Generic error type for sending something via a message queue.
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
pub enum GenericSendError { pub enum GenericSendError {
@ -74,67 +80,100 @@ impl Display for GenericSendError {
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl Error for GenericSendError {} impl Error for GenericSendError {}
/// Generic error type for sending something via a message queue.
#[derive(Debug, Copy, Clone)]
pub enum GenericRecvError {
Empty,
TxDisconnected,
}
impl Display for GenericRecvError {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self {
Self::TxDisconnected => {
write!(f, "tx side has disconnected")
}
Self::Empty => {
write!(f, "nothing to receive")
}
}
}
}
#[cfg(feature = "std")]
impl Error for GenericRecvError {}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum EcssTmtcErrorWithSend { pub enum EcssTmtcError {
StoreLock, StoreLock,
Store(StoreError), Store(StoreError),
Pus(PusError), Pus(PusError),
CantSendAddr(StoreAddr), CantSendAddr(StoreAddr),
Send(GenericSendError), Send(GenericSendError),
Recv(GenericRecvError),
} }
impl Display for EcssTmtcErrorWithSend { impl Display for EcssTmtcError {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self { match self {
EcssTmtcErrorWithSend::StoreLock => { EcssTmtcError::StoreLock => {
write!(f, "store lock error") write!(f, "store lock error")
} }
EcssTmtcErrorWithSend::Store(store) => { EcssTmtcError::Store(store) => {
write!(f, "store error: {store}") write!(f, "store error: {store}")
} }
EcssTmtcErrorWithSend::Pus(pus_e) => { EcssTmtcError::Pus(pus_e) => {
write!(f, "PUS error: {pus_e}") write!(f, "PUS error: {pus_e}")
} }
EcssTmtcErrorWithSend::CantSendAddr(addr) => { EcssTmtcError::CantSendAddr(addr) => {
write!(f, "can not send address {addr}") write!(f, "can not send address {addr}")
} }
EcssTmtcErrorWithSend::Send(send_e) => { EcssTmtcError::Send(send_e) => {
write!(f, "send error {send_e}") write!(f, "send error {send_e}")
} }
EcssTmtcError::Recv(recv_e) => {
write!(f, "recv error {recv_e}")
}
} }
} }
} }
impl From<StoreError> for EcssTmtcErrorWithSend { impl From<StoreError> for EcssTmtcError {
fn from(value: StoreError) -> Self { fn from(value: StoreError) -> Self {
Self::Store(value) Self::Store(value)
} }
} }
impl From<PusError> for EcssTmtcErrorWithSend { impl From<PusError> for EcssTmtcError {
fn from(value: PusError) -> Self { fn from(value: PusError) -> Self {
Self::Pus(value) Self::Pus(value)
} }
} }
impl From<GenericSendError> for EcssTmtcErrorWithSend { impl From<GenericSendError> for EcssTmtcError {
fn from(value: GenericSendError) -> Self { fn from(value: GenericSendError) -> Self {
Self::Send(value) Self::Send(value)
} }
} }
impl From<GenericRecvError> for EcssTmtcError {
fn from(value: GenericRecvError) -> Self {
Self::Recv(value)
}
}
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl Error for EcssTmtcErrorWithSend { impl Error for EcssTmtcError {
fn source(&self) -> Option<&(dyn Error + 'static)> { fn source(&self) -> Option<&(dyn Error + 'static)> {
match self { match self {
EcssTmtcErrorWithSend::Store(e) => Some(e), EcssTmtcError::Store(e) => Some(e),
EcssTmtcErrorWithSend::Pus(e) => Some(e), EcssTmtcError::Pus(e) => Some(e),
EcssTmtcErrorWithSend::Send(e) => Some(e), EcssTmtcError::Send(e) => Some(e),
_ => None, _ => None,
} }
} }
} }
pub trait EcssSender: Send { pub trait EcssChannel: Send {
/// Each sender can have an ID associated with it /// Each sender can have an ID associated with it
fn id(&self) -> SenderId; fn id(&self) -> SenderId;
fn name(&self) -> &'static str { fn name(&self) -> &'static str {
@ -145,16 +184,35 @@ pub trait EcssSender: Send {
/// Generic trait for a user supplied sender object. /// Generic trait for a user supplied sender object.
/// ///
/// This sender object is responsible for sending PUS telemetry to a TM sink. /// This sender object is responsible for sending PUS telemetry to a TM sink.
pub trait EcssTmSenderCore: EcssSender { pub trait EcssTmSenderCore: EcssChannel {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcErrorWithSend>; fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError>;
} }
/// Generic trait for a user supplied sender object. /// Generic trait for a user supplied sender object.
/// ///
/// This sender object is responsible for sending PUS telecommands to a TC recipient. Each /// This sender object is responsible for sending PUS telecommands to a TC recipient. Each
/// telecommand can optionally have a token which contains its verification state. /// telecommand can optionally have a token which contains its verification state.
pub trait EcssTcSenderCore: EcssSender { pub trait EcssTcSenderCore: EcssChannel {
fn send_tc(&self, tc: PusTc, token: Option<TcStateToken>) -> Result<(), EcssTmtcErrorWithSend>; fn send_tc(&self, tc: PusTc, token: Option<TcStateToken>) -> Result<(), EcssTmtcError>;
}
pub struct ReceivedTcWrapper<'raw_tc> {
pub pool_guard: PoolGuard<'raw_tc>,
pub tc: PusTc<'raw_tc>,
pub token: Option<TcStateToken>,
}
/// Generic trait for a user supplied receiver object.
pub trait EcssTcReceiverCore: EcssChannel {
fn recv_tc<'buf>(&self, buf: &'buf mut [u8]) -> Result<ReceivedTcWrapper<'buf>, EcssTmtcError>;
}
/// Generic trait for objects which can receive ECSS PUS telecommands. This trait is
/// implemented by the [crate::tmtc::pus_distrib::PusDistributor] objects to allow passing PUS TC
/// packets into it.
pub trait ReceivesEcssPusTc {
type Error;
fn pass_pus_tc(&mut self, header: &SpHeader, pus_tc: &PusTc) -> Result<(), Self::Error>;
} }
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
@ -200,50 +258,66 @@ mod alloc_mod {
dyn_clone::clone_trait_object!(EcssTcSender); dyn_clone::clone_trait_object!(EcssTcSender);
impl_downcast!(EcssTcSender); impl_downcast!(EcssTcSender);
}
/// Generic trait for objects which can receive ECSS PUS telecommands. This trait is /// Extension trait for [EcssTcReceiverCore].
/// implemented by the [crate::tmtc::pus_distrib::PusDistributor] objects to allow passing PUS TC ///
/// packets into it. /// It provides additional functionality, for example by implementing the [Downcast] trait
pub trait ReceivesEcssPusTc { /// and the [DynClone] trait.
type Error; ///
fn pass_pus_tc(&mut self, header: &SpHeader, pus_tc: &PusTc) -> Result<(), Self::Error>; /// [Downcast] is implemented to allow passing the sender as a boxed trait object and still
/// retrieve the concrete type at a later point.
///
/// [DynClone] allows cloning the trait object as long as the boxed object implements
/// [Clone].
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub trait EcssTcReceiver: EcssTcReceiverCore + Downcast + DynClone {}
/// Blanket implementation for all types which implement [EcssTcReceiverCore] and are clonable.
impl<T> EcssTcReceiver for T where T: EcssTcReceiverCore + Clone + 'static {}
dyn_clone::clone_trait_object!(EcssTcReceiver);
impl_downcast!(EcssTcReceiver);
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub mod std_mod { pub mod std_mod {
use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError}; use crate::pool::{SharedPool, StoreAddr};
use crate::pus::verification::{ use crate::pus::verification::{
StdVerifReporterWithSender, TcStateAccepted, VerificationToken, StdVerifReporterWithSender, TcStateAccepted, TcStateToken, VerificationToken,
}; };
use crate::pus::{ use crate::pus::{
EcssSender, EcssTmSender, EcssTmSenderCore, EcssTmtcErrorWithSend, GenericSendError, AcceptedTc, EcssChannel, EcssTcReceiver, EcssTcReceiverCore, EcssTmSender,
PusTmWrapper, EcssTmSenderCore, EcssTmtcError, GenericRecvError, GenericSendError, PusTmWrapper,
ReceivedTcWrapper, TcAddrWithToken,
}; };
use crate::tmtc::tm_helper::SharedTmStore;
use crate::SenderId; use crate::SenderId;
use alloc::boxed::Box; use alloc::boxed::Box;
use alloc::vec::Vec; use alloc::vec::Vec;
use spacepackets::ecss::{PusError, SerializablePusPacket}; use spacepackets::ecss::PusError;
use spacepackets::tc::PusTc;
use spacepackets::time::cds::TimeProvider; use spacepackets::time::cds::TimeProvider;
use spacepackets::time::std_mod::StdTimestampError; use spacepackets::time::std_mod::StdTimestampError;
use spacepackets::time::TimeWriter; use spacepackets::time::TimeWriter;
use spacepackets::tm::PusTm;
use spacepackets::{ByteConversionError, SizeMissmatch};
use std::cell::RefCell; use std::cell::RefCell;
use std::format;
use std::string::String; use std::string::String;
use std::sync::mpsc::SendError; use std::sync::mpsc;
use std::sync::{mpsc, RwLockWriteGuard}; use std::sync::mpsc::{SendError, TryRecvError};
use thiserror::Error; use thiserror::Error;
#[derive(Clone)] #[derive(Clone)]
pub struct MpscTmInStoreSender { pub struct MpscTmInStoreSender {
id: SenderId, id: SenderId,
name: &'static str, name: &'static str,
store_helper: SharedPool, shared_tm_store: SharedTmStore,
sender: mpsc::Sender<StoreAddr>, sender: mpsc::Sender<StoreAddr>,
pub ignore_poison_errors: bool, pub ignore_poison_errors: bool,
} }
impl EcssSender for MpscTmInStoreSender { impl EcssChannel for MpscTmInStoreSender {
fn id(&self) -> SenderId { fn id(&self) -> SenderId {
self.id self.id
} }
@ -253,39 +327,22 @@ pub mod std_mod {
} }
} }
impl From<SendError<StoreAddr>> for EcssTmtcErrorWithSend { impl From<SendError<StoreAddr>> for EcssTmtcError {
fn from(_: SendError<StoreAddr>) -> Self { fn from(_: SendError<StoreAddr>) -> Self {
Self::Send(GenericSendError::RxDisconnected) Self::Send(GenericSendError::RxDisconnected)
} }
} }
impl MpscTmInStoreSender { impl MpscTmInStoreSender {
pub fn send_direct_tm( pub fn send_direct_tm(&self, tm: PusTm) -> Result<(), EcssTmtcError> {
&self, let addr = self.shared_tm_store.add_pus_tm(&tm)?;
tmtc: impl SerializablePusPacket, self.sender
) -> Result<(), EcssTmtcErrorWithSend> { .send(addr)
let operation = |mut store: RwLockWriteGuard<ShareablePoolProvider>| { .map_err(|_| EcssTmtcError::Send(GenericSendError::RxDisconnected))
let (addr, slice) = store.free_element(tmtc.len_packed())?;
tmtc.write_to_bytes(slice)?;
self.sender.send(addr)?;
Ok(())
};
match self.store_helper.write() {
Ok(pool) => operation(pool),
Err(e) => {
if self.ignore_poison_errors {
operation(e.into_inner())
} else {
Err(EcssTmtcErrorWithSend::Send(
GenericSendError::RxDisconnected,
))
}
}
}
} }
} }
impl EcssTmSenderCore for MpscTmInStoreSender { impl EcssTmSenderCore for MpscTmInStoreSender {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcErrorWithSend> { fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
match tm { match tm {
PusTmWrapper::InStore(addr) => self.sender.send(addr).map_err(|e| e.into()), PusTmWrapper::InStore(addr) => self.sender.send(addr).map_err(|e| e.into()),
PusTmWrapper::Direct(tm) => self.send_direct_tm(tm), PusTmWrapper::Direct(tm) => self.send_direct_tm(tm),
@ -297,19 +354,66 @@ pub mod std_mod {
pub fn new( pub fn new(
id: SenderId, id: SenderId,
name: &'static str, name: &'static str,
store_helper: SharedPool, shared_tm_store: SharedTmStore,
sender: mpsc::Sender<StoreAddr>, sender: mpsc::Sender<StoreAddr>,
) -> Self { ) -> Self {
Self { Self {
id, id,
name, name,
store_helper, shared_tm_store,
sender, sender,
ignore_poison_errors: false, ignore_poison_errors: false,
} }
} }
} }
pub struct MpscTcInStoreReceiver {
id: SenderId,
name: &'static str,
shared_tc_pool: SharedPool,
receiver: mpsc::Receiver<TcAddrWithToken>,
pub ignore_poison_errors: bool,
}
impl EcssChannel for MpscTcInStoreReceiver {
fn id(&self) -> SenderId {
self.id
}
fn name(&self) -> &'static str {
self.name
}
}
impl EcssTcReceiverCore for MpscTcInStoreReceiver {
fn recv_tc<'buf>(
&self,
buf: &'buf mut [u8],
) -> Result<ReceivedTcWrapper<'buf>, EcssTmtcError> {
let (addr, token) = self.receiver.try_recv().map_err(|e| match e {
TryRecvError::Empty => GenericRecvError::Empty,
TryRecvError::Disconnected => GenericRecvError::TxDisconnected,
})?;
let shared_tc_pool = self
.shared_tc_pool
.read()
.map_err(|_| EcssTmtcError::StoreLock)?;
let tc_raw = shared_tc_pool.read(&addr)?;
if buf.len() < tc_raw.len() {
return Err(
PusError::ByteConversion(ByteConversionError::ToSliceTooSmall(SizeMissmatch {
found: buf.len(),
expected: tc_raw.len(),
}))
.into(),
);
}
buf[..tc_raw.len()].copy_from_slice(tc_raw);
let (tc, _) = PusTc::from_bytes(buf)?;
Ok((tc, token))
}
}
/// This class can be used if frequent heap allocations during run-time are not an issue. /// This class can be used if frequent heap allocations during run-time are not an issue.
/// PUS TM packets will be sent around as [Vec]s. Please note that the current implementation /// PUS TM packets will be sent around as [Vec]s. Please note that the current implementation
/// of this class can not deal with store addresses, so it is assumed that is is always /// of this class can not deal with store addresses, so it is assumed that is is always
@ -321,7 +425,7 @@ pub mod std_mod {
name: &'static str, name: &'static str,
} }
impl From<SendError<Vec<u8>>> for EcssTmtcErrorWithSend { impl From<SendError<Vec<u8>>> for EcssTmtcError {
fn from(_: SendError<Vec<u8>>) -> Self { fn from(_: SendError<Vec<u8>>) -> Self {
Self::Send(GenericSendError::RxDisconnected) Self::Send(GenericSendError::RxDisconnected)
} }
@ -333,7 +437,7 @@ pub mod std_mod {
} }
} }
impl EcssSender for MpscTmAsVecSender { impl EcssChannel for MpscTmAsVecSender {
fn id(&self) -> SenderId { fn id(&self) -> SenderId {
self.id self.id
} }
@ -343,13 +447,12 @@ pub mod std_mod {
} }
impl EcssTmSenderCore for MpscTmAsVecSender { impl EcssTmSenderCore for MpscTmAsVecSender {
fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcErrorWithSend> { fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> {
match tm { match tm {
PusTmWrapper::InStore(addr) => Err(EcssTmtcErrorWithSend::CantSendAddr(addr)), PusTmWrapper::InStore(addr) => Err(EcssTmtcError::CantSendAddr(addr)),
PusTmWrapper::Direct(tm) => { PusTmWrapper::Direct(tm) => {
let mut vec = Vec::new(); let mut vec = Vec::new();
tm.append_to_vec(&mut vec) tm.append_to_vec(&mut vec).map_err(EcssTmtcError::Pus)?;
.map_err(EcssTmtcErrorWithSend::Pus)?;
self.sender.send(vec)?; self.sender.send(vec)?;
Ok(()) Ok(())
} }
@ -359,36 +462,32 @@ pub mod std_mod {
#[derive(Debug, Clone, Error)] #[derive(Debug, Clone, Error)]
pub enum PusPacketHandlingError { pub enum PusPacketHandlingError {
#[error("Generic PUS error: {0}")] #[error("generic PUS error: {0}")]
PusError(#[from] PusError), Pus(#[from] PusError),
#[error("Wrong service number {0} for packet handler")] #[error("wrong service number {0} for packet handler")]
WrongService(u8), WrongService(u8),
#[error("Invalid subservice {0}")] #[error("invalid subservice {0}")]
InvalidSubservice(u8), InvalidSubservice(u8),
#[error("Not enough application data available: {0}")] #[error("not enough application data available: {0}")]
NotEnoughAppData(String), NotEnoughAppData(String),
#[error("Invalid application data")] #[error("invalid application data")]
InvalidAppData(String), InvalidAppData(String),
#[error("Generic store error: {0}")] #[error("generic ECSS tmtc error: {0}")]
StoreError(#[from] StoreError), EcssTmtc(#[from] EcssTmtcError),
#[error("Error with the pool RwGuard: {0}")] #[error("other error {0}")]
RwGuardError(String), Other(String),
#[error("MQ send error: {0}")]
SendError(String),
#[error("TX message queue side has disconnected")]
QueueDisconnected,
#[error("Other error {0}")]
OtherError(String),
} }
#[derive(Debug, Clone, Error)] #[derive(Debug, Clone, Error)]
pub enum PartialPusHandlingError { pub enum PartialPusHandlingError {
#[error("Generic timestamp generation error")] #[error("generic timestamp generation error")]
Time(#[from] StdTimestampError), Time(#[from] StdTimestampError),
#[error("Error sending telemetry: {0}")] #[error("error sending telemetry: {0}")]
TmSend(#[from] EcssTmtcErrorWithSend), TmSend(#[from] EcssTmtcError),
#[error("Error sending verification message")] #[error("error sending verification message")]
Verification, Verification,
#[error("invalid verification token")]
NoVerificationToken,
} }
/// Generic result type for handlers which can process PUS packets. /// Generic result type for handlers which can process PUS packets.
@ -396,8 +495,8 @@ pub mod std_mod {
pub enum PusPacketHandlerResult { pub enum PusPacketHandlerResult {
RequestHandled, RequestHandled,
RequestHandledPartialSuccess(PartialPusHandlingError), RequestHandledPartialSuccess(PartialPusHandlingError),
SubserviceNotImplemented(u8, VerificationToken<TcStateAccepted>), SubserviceNotImplemented(u8, TcStateToken),
CustomSubservice(u8, VerificationToken<TcStateAccepted>), CustomSubservice(u8, TcStateToken),
Empty, Empty,
} }
@ -407,16 +506,11 @@ pub mod std_mod {
} }
} }
/// Generic abstraction for a telecommand being sent around after is has been accepted.
/// The actual telecommand is stored inside a pre-allocated pool structure.
pub type AcceptedTc = (StoreAddr, VerificationToken<TcStateAccepted>);
/// Base class for handlers which can handle PUS TC packets. Right now, the message queue /// Base class for handlers which can handle PUS TC packets. Right now, the message queue
/// backend is constrained to [mpsc::channel]s and the verification reporter /// backend is constrained to [mpsc::channel]s and the verification reporter
/// is constrained to the [StdVerifReporterWithSender]. /// is constrained to the [StdVerifReporterWithSender].
pub struct PusServiceBase { pub struct PusServiceBase {
pub tc_rx: mpsc::Receiver<AcceptedTc>, pub tc_receiver: Box<dyn EcssTcReceiver>,
pub tc_store: SharedPool,
pub tm_sender: Box<dyn EcssTmSender>, pub tm_sender: Box<dyn EcssTmSender>,
pub tm_apid: u16, pub tm_apid: u16,
/// The verification handler is wrapped in a [RefCell] to allow the interior mutability /// The verification handler is wrapped in a [RefCell] to allow the interior mutability
@ -428,15 +522,13 @@ pub mod std_mod {
impl PusServiceBase { impl PusServiceBase {
pub fn new( pub fn new(
receiver: mpsc::Receiver<AcceptedTc>, tc_receiver: Box<dyn EcssTcReceiver>,
tc_pool: SharedPool,
tm_sender: Box<dyn EcssTmSender>, tm_sender: Box<dyn EcssTmSender>,
tm_apid: u16, tm_apid: u16,
verification_handler: StdVerifReporterWithSender, verification_handler: StdVerifReporterWithSender,
) -> Self { ) -> Self {
Self { Self {
tc_rx: receiver, tc_receiver,
tc_store: tc_pool,
tm_apid, tm_apid,
tm_sender, tm_sender,
verification_handler: RefCell::new(verification_handler), verification_handler: RefCell::new(verification_handler),
@ -472,8 +564,7 @@ pub mod std_mod {
fn psb(&self) -> &PusServiceBase; fn psb(&self) -> &PusServiceBase;
fn handle_one_tc( fn handle_one_tc(
&mut self, &mut self,
addr: StoreAddr, tc_in_store_with_token: ReceivedTcWrapper,
token: VerificationToken<TcStateAccepted>,
) -> Result<PusPacketHandlerResult, PusPacketHandlingError>; ) -> Result<PusPacketHandlerResult, PusPacketHandlingError>;
fn copy_tc_to_buf(&mut self, addr: StoreAddr) -> Result<(), PusPacketHandlingError> { fn copy_tc_to_buf(&mut self, addr: StoreAddr) -> Result<(), PusPacketHandlingError> {
@ -482,7 +573,7 @@ pub mod std_mod {
let mut tc_pool = psb_mut let mut tc_pool = psb_mut
.tc_store .tc_store
.write() .write()
.map_err(|e| PusPacketHandlingError::RwGuardError(format!("{e}")))?; .map_err(|_| PusPacketHandlingError::EcssTmtc(EcssTmtcError::StoreLock))?;
let tc_guard = tc_pool.read_with_guard(addr); let tc_guard = tc_pool.read_with_guard(addr);
let tc_raw = tc_guard.read().unwrap(); let tc_raw = tc_guard.read().unwrap();
psb_mut.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); psb_mut.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw);
@ -490,26 +581,23 @@ pub mod std_mod {
} }
fn handle_next_packet(&mut self) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { fn handle_next_packet(&mut self) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
return match self.psb().tc_rx.try_recv() { match self.psb().tc_receiver.recv_tc()? {
Ok((addr, token)) => self.handle_one_tc(addr, token), Ok((addr, token)) => self.handle_one_tc(addr, token),
Err(e) => match e { Err(e) => match e {
mpsc::TryRecvError::Empty => Ok(PusPacketHandlerResult::Empty), TryRecvError::Empty => Ok(PusPacketHandlerResult::Empty),
mpsc::TryRecvError::Disconnected => { TryRecvError::Disconnected => Err(PusPacketHandlingError::EcssTmtc(
Err(PusPacketHandlingError::QueueDisconnected) EcssTmtcError::Recv(GenericRecvError::TxDisconnected),
} )),
}, },
}; }
} }
} }
} }
pub(crate) fn source_buffer_large_enough( pub(crate) fn source_buffer_large_enough(cap: usize, len: usize) -> Result<(), EcssTmtcError> {
cap: usize,
len: usize,
) -> Result<(), EcssTmtcErrorWithSend> {
if len > cap { if len > cap {
return Err( return Err(
PusError::ByteConversionError(ByteConversionError::ToSliceTooSmall(SizeMissmatch { PusError::ByteConversion(ByteConversionError::ToSliceTooSmall(SizeMissmatch {
found: cap, found: cap,
expected: len, expected: len,
})) }))

View File

@ -2,8 +2,8 @@ use crate::pool::{SharedPool, StoreAddr};
use crate::pus::scheduler::PusScheduler; use crate::pus::scheduler::PusScheduler;
use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken}; use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken};
use crate::pus::{ use crate::pus::{
AcceptedTc, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, AcceptedTc, EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError,
PusServiceHandler, PusServiceBase, PusServiceHandler, ReceivedTcWrapper,
}; };
use spacepackets::ecss::{scheduling, PusPacket}; use spacepackets::ecss::{scheduling, PusPacket};
use spacepackets::tc::PusTc; use spacepackets::tc::PusTc;
@ -26,15 +26,14 @@ pub struct PusService11SchedHandler {
impl PusService11SchedHandler { impl PusService11SchedHandler {
pub fn new( pub fn new(
receiver: Receiver<AcceptedTc>, tc_receiver: Box<dyn EcssTcReceiver>,
tc_pool: SharedPool,
tm_sender: Box<dyn EcssTmSender>, tm_sender: Box<dyn EcssTmSender>,
tm_apid: u16, tm_apid: u16,
verification_handler: StdVerifReporterWithSender, verification_handler: StdVerifReporterWithSender,
scheduler: PusScheduler, scheduler: PusScheduler,
) -> Self { ) -> Self {
Self { Self {
psb: PusServiceBase::new(receiver, tc_pool, tm_sender, tm_apid, verification_handler), psb: PusServiceBase::new(tc_receiver, tm_sender, tm_apid, verification_handler),
scheduler, scheduler,
} }
} }
@ -58,11 +57,13 @@ impl PusServiceHandler for PusService11SchedHandler {
fn handle_one_tc( fn handle_one_tc(
&mut self, &mut self,
addr: StoreAddr, tc_in_store_with_token: ReceivedTcWrapper,
token: VerificationToken<TcStateAccepted>,
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { ) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
self.copy_tc_to_buf(addr)?; let ReceivedTcWrapper {
let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf).unwrap(); tc,
pool_guard,
token,
} = tc_in_store_with_token;
let std_service = scheduling::Subservice::try_from(tc.subservice()); let std_service = scheduling::Subservice::try_from(tc.subservice());
if std_service.is_err() { if std_service.is_err() {
return Ok(PusPacketHandlerResult::CustomSubservice( return Ok(PusPacketHandlerResult::CustomSubservice(

View File

@ -2,7 +2,7 @@ use crate::pool::{SharedPool, StoreAddr};
use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken}; use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken};
use crate::pus::{ use crate::pus::{
AcceptedTc, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult, AcceptedTc, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult,
PusPacketHandlingError, PusServiceBase, PusServiceHandler, PusTmWrapper, PusPacketHandlingError, PusServiceBase, PusServiceHandler, PusTmWrapper, ReceivedTcWrapper,
}; };
use spacepackets::ecss::PusPacket; use spacepackets::ecss::PusPacket;
use spacepackets::tc::PusTc; use spacepackets::tc::PusTc;
@ -41,11 +41,13 @@ impl PusServiceHandler for PusService17TestHandler {
fn handle_one_tc( fn handle_one_tc(
&mut self, &mut self,
addr: StoreAddr, tc_in_store_with_token: ReceivedTcWrapper,
token: VerificationToken<TcStateAccepted>,
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { ) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
self.copy_tc_to_buf(addr)?; let ReceivedTcWrapper {
let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf)?; tc,
pool_guard,
token,
} = tc_in_store_with_token;
if tc.service() != 17 { if tc.service() != 17 {
return Err(PusPacketHandlingError::WrongService(tc.service())); return Err(PusPacketHandlingError::WrongService(tc.service()));
} }

View File

@ -73,7 +73,7 @@
//! The [integration test](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/fsrc-core/tests/verification_test.rs) //! The [integration test](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/fsrc-core/tests/verification_test.rs)
//! for the verification module contains examples how this module could be used in a more complex //! for the verification module contains examples how this module could be used in a more complex
//! context involving multiple threads //! context involving multiple threads
use crate::pus::{source_buffer_large_enough, EcssTmSenderCore, EcssTmtcErrorWithSend}; use crate::pus::{source_buffer_large_enough, EcssTmSenderCore, EcssTmtcError};
use core::fmt::{Debug, Display, Formatter}; use core::fmt::{Debug, Display, Formatter};
use core::hash::{Hash, Hasher}; use core::hash::{Hash, Hasher};
use core::marker::PhantomData; use core::marker::PhantomData;
@ -172,10 +172,10 @@ impl RequestId {
/// If a verification operation fails, the passed token will be returned as well. This allows /// If a verification operation fails, the passed token will be returned as well. This allows
/// re-trying the operation at a later point. /// re-trying the operation at a later point.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct VerificationOrSendErrorWithToken<T>(pub EcssTmtcErrorWithSend, pub VerificationToken<T>); pub struct VerificationOrSendErrorWithToken<T>(pub EcssTmtcError, pub VerificationToken<T>);
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct VerificationErrorWithToken<T>(pub EcssTmtcErrorWithSend, pub VerificationToken<T>); pub struct VerificationErrorWithToken<T>(pub EcssTmtcError, pub VerificationToken<T>);
impl<T> From<VerificationErrorWithToken<T>> for VerificationOrSendErrorWithToken<T> { impl<T> From<VerificationErrorWithToken<T>> for VerificationOrSendErrorWithToken<T> {
fn from(value: VerificationErrorWithToken<T>) -> Self { fn from(value: VerificationErrorWithToken<T>) -> Self {
@ -205,7 +205,7 @@ impl WasAtLeastAccepted for TcStateAccepted {}
impl WasAtLeastAccepted for TcStateStarted {} impl WasAtLeastAccepted for TcStateStarted {}
impl WasAtLeastAccepted for TcStateCompleted {} impl WasAtLeastAccepted for TcStateCompleted {}
#[derive(Debug, Eq, PartialEq)] #[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum TcStateToken { pub enum TcStateToken {
None(VerificationToken<TcStateNone>), None(VerificationToken<TcStateNone>),
Accepted(VerificationToken<TcStateAccepted>), Accepted(VerificationToken<TcStateAccepted>),
@ -230,6 +230,19 @@ impl TryFrom<TcStateToken> for VerificationToken<TcStateAccepted> {
} }
} }
} }
impl TryFrom<TcStateToken> for VerificationToken<TcStateStarted> {
type Error = ();
fn try_from(value: TcStateToken) -> Result<Self, Self::Error> {
if let TcStateToken::Started(token) = value {
Ok(token)
} else {
Err(())
}
}
}
impl From<VerificationToken<TcStateAccepted>> for TcStateToken { impl From<VerificationToken<TcStateAccepted>> for TcStateToken {
fn from(t: VerificationToken<TcStateAccepted>) -> Self { fn from(t: VerificationToken<TcStateAccepted>) -> Self {
TcStateToken::Accepted(t) TcStateToken::Accepted(t)
@ -641,8 +654,7 @@ impl VerificationReporterCore {
msg_count: u16, msg_count: u16,
time_stamp: Option<&'src_data [u8]>, time_stamp: Option<&'src_data [u8]>,
step: impl EcssEnumeration, step: impl EcssEnumeration,
) -> Result<VerificationSendable<'src_data, TcStateStarted, VerifSuccess>, EcssTmtcErrorWithSend> ) -> Result<VerificationSendable<'src_data, TcStateStarted, VerifSuccess>, EcssTmtcError> {
{
Ok(VerificationSendable::new_no_token( Ok(VerificationSendable::new_no_token(
self.create_pus_verif_success_tm( self.create_pus_verif_success_tm(
src_data_buf, src_data_buf,
@ -772,7 +784,7 @@ impl VerificationReporterCore {
req_id: &RequestId, req_id: &RequestId,
time_stamp: Option<&'src_data [u8]>, time_stamp: Option<&'src_data [u8]>,
step: Option<&(impl EcssEnumeration + ?Sized)>, step: Option<&(impl EcssEnumeration + ?Sized)>,
) -> Result<PusTm<'src_data>, EcssTmtcErrorWithSend> { ) -> Result<PusTm<'src_data>, EcssTmtcError> {
let mut source_data_len = size_of::<u32>(); let mut source_data_len = size_of::<u32>();
if let Some(step) = step { if let Some(step) = step {
source_data_len += step.size(); source_data_len += step.size();
@ -808,7 +820,7 @@ impl VerificationReporterCore {
req_id: &RequestId, req_id: &RequestId,
step: Option<&(impl EcssEnumeration + ?Sized)>, step: Option<&(impl EcssEnumeration + ?Sized)>,
params: &FailParams<'src_data, '_>, params: &FailParams<'src_data, '_>,
) -> Result<PusTm<'src_data>, EcssTmtcErrorWithSend> { ) -> Result<PusTm<'src_data>, EcssTmtcError> {
let mut idx = 0; let mut idx = 0;
let mut source_data_len = RequestId::SIZE_AS_BYTES + params.failure_code.size(); let mut source_data_len = RequestId::SIZE_AS_BYTES + params.failure_code.size();
if let Some(step) = step { if let Some(step) = step {
@ -829,7 +841,7 @@ impl VerificationReporterCore {
params params
.failure_code .failure_code
.write_to_be_bytes(&mut src_data_buf[idx..idx + params.failure_code.size()]) .write_to_be_bytes(&mut src_data_buf[idx..idx + params.failure_code.size()])
.map_err(PusError::ByteConversionError)?; .map_err(PusError::ByteConversion)?;
idx += params.failure_code.size(); idx += params.failure_code.size();
if let Some(failure_data) = params.failure_data { if let Some(failure_data) = params.failure_data {
src_data_buf[idx..idx + failure_data.len()].copy_from_slice(failure_data); src_data_buf[idx..idx + failure_data.len()].copy_from_slice(failure_data);
@ -1063,7 +1075,7 @@ mod alloc_mod {
sender: &mut (impl EcssTmSenderCore + ?Sized), sender: &mut (impl EcssTmSenderCore + ?Sized),
time_stamp: Option<&[u8]>, time_stamp: Option<&[u8]>,
step: impl EcssEnumeration, step: impl EcssEnumeration,
) -> Result<(), EcssTmtcErrorWithSend> { ) -> Result<(), EcssTmtcError> {
let seq_count = self let seq_count = self
.seq_count_provider .seq_count_provider
.as_ref() .as_ref()
@ -1250,7 +1262,7 @@ mod alloc_mod {
token: &VerificationToken<TcStateStarted>, token: &VerificationToken<TcStateStarted>,
time_stamp: Option<&[u8]>, time_stamp: Option<&[u8]>,
step: impl EcssEnumeration, step: impl EcssEnumeration,
) -> Result<(), EcssTmtcErrorWithSend> { ) -> Result<(), EcssTmtcError> {
self.reporter self.reporter
.step_success(token, self.sender.as_mut(), time_stamp, step) .step_success(token, self.sender.as_mut(), time_stamp, step)
} }
@ -1482,11 +1494,11 @@ mod tests {
use crate::pool::{LocalPool, PoolCfg, SharedPool}; use crate::pool::{LocalPool, PoolCfg, SharedPool};
use crate::pus::tests::CommonTmInfo; use crate::pus::tests::CommonTmInfo;
use crate::pus::verification::{ use crate::pus::verification::{
EcssTmSenderCore, EcssTmtcErrorWithSend, FailParams, FailParamsWithStep, RequestId, EcssTmSenderCore, EcssTmtcError, FailParams, FailParamsWithStep, RequestId, TcStateNone,
TcStateNone, VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender, VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender,
VerificationToken, VerificationToken,
}; };
use crate::pus::{EcssSender, EcssTmtcErrorWithSend, MpscTmInStoreSender, PusTmWrapper}; use crate::pus::{EcssChannel, EcssTmtcError, MpscTmInStoreSender, PusTmWrapper};
use crate::SenderId; use crate::SenderId;
use alloc::boxed::Box; use alloc::boxed::Box;
use alloc::format; use alloc::format;
@ -1521,7 +1533,7 @@ mod tests {
pub service_queue: RefCell<VecDeque<TmInfo>>, pub service_queue: RefCell<VecDeque<TmInfo>>,
} }
impl EcssSender for TestSender { impl EcssChannel for TestSender {
fn id(&self) -> SenderId { fn id(&self) -> SenderId {
0 0
} }
@ -1569,7 +1581,7 @@ mod tests {
#[derive(Default, Clone)] #[derive(Default, Clone)]
struct FallibleSender {} struct FallibleSender {}
impl EcssSender for FallibleSender { impl EcssChannel for FallibleSender {
fn id(&self) -> SenderId { fn id(&self) -> SenderId {
0 0
} }
@ -1705,7 +1717,7 @@ mod tests {
let err = res.unwrap_err(); let err = res.unwrap_err();
assert_eq!(err.1, tok); assert_eq!(err.1, tok);
match err.0 { match err.0 {
EcssTmtcErrorWithSend::SendError(e) => { EcssTmtcError::SendError(e) => {
assert_eq!(e, DummyError {}) assert_eq!(e, DummyError {})
} }
_ => panic!("{}", format!("Unexpected error {:?}", err.0)), _ => panic!("{}", format!("Unexpected error {:?}", err.0)),
@ -1776,8 +1788,7 @@ mod tests {
let err_with_token = res.unwrap_err(); let err_with_token = res.unwrap_err();
assert_eq!(err_with_token.1, tok); assert_eq!(err_with_token.1, tok);
match err_with_token.0 { match err_with_token.0 {
EcssTmtcErrorWithSend::EcssTmtcError(EcssTmtcErrorWithSend::ByteConversion(e)) => { EcssTmtcError::EcssTmtcError(EcssTmtcError::ByteConversion(e)) => match e {
match e {
ByteConversionError::ToSliceTooSmall(missmatch) => { ByteConversionError::ToSliceTooSmall(missmatch) => {
assert_eq!( assert_eq!(
missmatch.expected, missmatch.expected,
@ -1788,8 +1799,7 @@ mod tests {
_ => { _ => {
panic!("{}", format!("Unexpected error {:?}", e)) panic!("{}", format!("Unexpected error {:?}", e))
} }
} },
}
_ => { _ => {
panic!("{}", format!("Unexpected error {:?}", err_with_token.0)) panic!("{}", format!("Unexpected error {:?}", err_with_token.0))
} }

View File

@ -133,7 +133,7 @@ impl<E: 'static> ReceivesTcCore for PusDistributor<E> {
fn pass_tc(&mut self, tm_raw: &[u8]) -> Result<(), Self::Error> { fn pass_tc(&mut self, tm_raw: &[u8]) -> Result<(), Self::Error> {
// Convert to ccsds and call pass_ccsds // Convert to ccsds and call pass_ccsds
let (sp_header, _) = SpHeader::from_be_bytes(tm_raw) let (sp_header, _) = SpHeader::from_be_bytes(tm_raw)
.map_err(|e| PusDistribError::PusError(PusError::ByteConversionError(e)))?; .map_err(|e| PusDistribError::PusError(PusError::ByteConversion(e)))?;
self.pass_ccsds(&sp_header, tm_raw) self.pass_ccsds(&sp_header, tm_raw)
} }
} }

View File

@ -8,9 +8,11 @@ pub use std_mod::*;
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub mod std_mod { pub mod std_mod {
use crate::pool::{SharedPool, StoreAddr}; use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr};
use crate::pus::EcssTmtcError;
use spacepackets::ecss::SerializablePusPacket; use spacepackets::ecss::SerializablePusPacket;
use spacepackets::tm::PusTm; use spacepackets::tm::PusTm;
use std::sync::{Arc, RwLock};
#[derive(Clone)] #[derive(Clone)]
pub struct SharedTmStore { pub struct SharedTmStore {
@ -18,21 +20,23 @@ pub mod std_mod {
} }
impl SharedTmStore { impl SharedTmStore {
pub fn new(backing_pool: SharedPool) -> Self { pub fn new(backing_pool: ShareablePoolProvider) -> Self {
Self { pool: backing_pool } Self {
pool: Arc::new(RwLock::new(backing_pool)),
}
} }
pub fn backing_pool(&self) -> SharedPool { pub fn backing_pool(&self) -> SharedPool {
self.pool.clone() self.pool.clone()
} }
pub fn add_pus_tm(&mut self, pus_tm: &PusTm) -> StoreAddr { pub fn add_pus_tm(&self, pus_tm: &PusTm) -> Result<StoreAddr, EcssTmtcError> {
let mut pg = self.pool.write().expect("error locking TM store"); let mut pg = self.pool.write().map_err(|_| EcssTmtcError::StoreLock)?;
let (addr, buf) = pg.free_element(pus_tm.len_packed()).expect("Store error"); let (addr, buf) = pg.free_element(pus_tm.len_packed())?;
pus_tm pus_tm
.write_to_bytes(buf) .write_to_bytes(buf)
.expect("writing PUS TM to store failed"); .expect("writing PUS TM to store failed");
addr Ok(addr)
} }
} }
} }

View File

@ -64,8 +64,8 @@ fn main() {
(15, 1024), (15, 1024),
(15, 2048), (15, 2048),
])); ]));
let tm_store = SharedTmStore::new(Arc::new(RwLock::new(Box::new(tm_pool)))); let shared_tm_store = SharedTmStore::new(Box::new(tm_pool));
let tm_store_event = tm_store.clone(); let tm_store_event = shared_tm_store.clone();
let tc_pool = LocalPool::new(PoolCfg::new(vec![ let tc_pool = LocalPool::new(PoolCfg::new(vec![
(30, 32), (30, 32),
(15, 64), (15, 64),
@ -87,7 +87,7 @@ fn main() {
let verif_sender = MpscTmInStoreSender::new( let verif_sender = MpscTmInStoreSender::new(
TmSenderId::PusVerification as SenderId, TmSenderId::PusVerification as SenderId,
"verif_sender", "verif_sender",
tm_store.backing_pool(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
); );
let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap(); let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap();
@ -135,13 +135,13 @@ fn main() {
tc_receiver: tc_source_rx, tc_receiver: tc_source_rx,
}; };
let tm_args = TmArgs { let tm_args = TmArgs {
tm_store: tm_store.clone(), tm_store: shared_tm_store.clone(),
tm_sink_sender: tm_funnel_tx.clone(), tm_sink_sender: tm_funnel_tx.clone(),
tm_server_rx, tm_server_rx,
}; };
let aocs_tm_funnel = tm_funnel_tx.clone(); let aocs_tm_funnel = tm_funnel_tx.clone();
let mut aocs_tm_store = tm_store.clone(); let aocs_tm_store = shared_tm_store.clone();
let (pus_test_tx, pus_test_rx) = channel(); let (pus_test_tx, pus_test_rx) = channel();
let (pus_event_tx, pus_event_rx) = channel(); let (pus_event_tx, pus_event_rx) = channel();
@ -158,7 +158,7 @@ fn main() {
let test_srv_tm_sender = MpscTmInStoreSender::new( let test_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusTest as SenderId, TmSenderId::PusTest as SenderId,
"PUS_17_TM_SENDER", "PUS_17_TM_SENDER",
tm_store.backing_pool().clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
); );
let pus17_handler = PusService17TestHandler::new( let pus17_handler = PusService17TestHandler::new(
@ -176,7 +176,7 @@ fn main() {
let sched_srv_tm_sender = MpscTmInStoreSender::new( let sched_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusSched as SenderId, TmSenderId::PusSched as SenderId,
"PUS_11_TM_SENDER", "PUS_11_TM_SENDER",
tm_store.backing_pool().clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
); );
let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5))
@ -197,7 +197,7 @@ fn main() {
let event_srv_tm_sender = MpscTmInStoreSender::new( let event_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusEvent as SenderId, TmSenderId::PusEvent as SenderId,
"PUS_5_TM_SENDER", "PUS_5_TM_SENDER",
tm_store.backing_pool().clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
); );
let pus_5_handler = PusService5EventHandler::new( let pus_5_handler = PusService5EventHandler::new(
@ -213,7 +213,7 @@ fn main() {
let action_srv_tm_sender = MpscTmInStoreSender::new( let action_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusAction as SenderId, TmSenderId::PusAction as SenderId,
"PUS_8_TM_SENDER", "PUS_8_TM_SENDER",
tm_store.backing_pool().clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
); );
let pus_8_handler = PusService8ActionHandler::new( let pus_8_handler = PusService8ActionHandler::new(
@ -229,7 +229,7 @@ fn main() {
let hk_srv_tm_sender = MpscTmInStoreSender::new( let hk_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusHk as SenderId, TmSenderId::PusHk as SenderId,
"PUS_3_TM_SENDER", "PUS_3_TM_SENDER",
tm_store.backing_pool().clone(), shared_tm_store.clone(),
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
); );
let pus_3_handler = PusService3HkHandler::new( let pus_3_handler = PusService3HkHandler::new(
@ -262,7 +262,7 @@ fn main() {
if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() { if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally update // Read the TM, set sequence counter and message counter, and finally update
// the CRC. // the CRC.
let shared_pool = tm_store.backing_pool(); let shared_pool = shared_tm_store.backing_pool();
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed"); let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
let tm_raw = pool_guard let tm_raw = pool_guard
.modify(&addr) .modify(&addr)
@ -297,12 +297,8 @@ fn main() {
.name("Event".to_string()) .name("Event".to_string())
.spawn(move || { .spawn(move || {
let mut timestamp: [u8; 7] = [0; 7]; let mut timestamp: [u8; 7] = [0; 7];
let mut sender = MpscTmInStoreSender::new( let mut sender =
1, MpscTmInStoreSender::new(1, "event_sender", tm_store_event.clone(), tm_funnel_tx);
"event_sender",
tm_store_event.backing_pool(),
tm_funnel_tx,
);
let mut time_provider = TimeProvider::new_with_u16_days(0, 0); let mut time_provider = TimeProvider::new_with_u16_days(0, 0);
let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| {
reporter_event_handler reporter_event_handler
@ -393,7 +389,9 @@ fn main() {
Some(&buf), Some(&buf),
true, true,
); );
let addr = aocs_tm_store.add_pus_tm(&pus_tm); let addr = aocs_tm_store
.add_pus_tm(&pus_tm)
.expect("Adding PUS TM failed");
aocs_tm_funnel.send(addr).expect("Sending HK TM failed"); aocs_tm_funnel.send(addr).expect("Sending HK TM failed");
} }
} }

View File

@ -86,7 +86,7 @@ impl PusService8ActionHandler {
), ),
) )
.expect("Sending start failure failed"); .expect("Sending start failure failed");
return Err(PusPacketHandlingError::OtherError(format!( return Err(PusPacketHandlingError::Other(format!(
"Unknown target ID {target_id}" "Unknown target ID {target_id}"
))); )));
} }