diff --git a/satrs-core/Cargo.toml b/satrs-core/Cargo.toml index 895be4f..22e9d5b 100644 --- a/satrs-core/Cargo.toml +++ b/satrs-core/Cargo.toml @@ -61,10 +61,10 @@ default-features = false optional = true [dependencies.spacepackets] -version = "0.6" +# version = "0.6" # path = "../spacepackets" -# git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" -# rev = "4485ed26699d32" +git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" +rev = "784564a20ed" default-features = false [dev-dependencies] diff --git a/satrs-core/src/event_man.rs b/satrs-core/src/event_man.rs index c556e99..5c56b85 100644 --- a/satrs-core/src/event_man.rs +++ b/satrs-core/src/event_man.rs @@ -21,7 +21,7 @@ doc = ::embed_doc_image::embed_image!("event_man_arch", "images/event_man_arch.p //! ![Event flow][event_man_arch] //! //! The event manager has a listener table abstracted by the [ListenerTable], which maps -//! listener groups identified by [ListenerKey]s to a [sender ID][SenderId]. +//! listener groups identified by [ListenerKey]s to a [sender ID][ChannelId]. //! It also contains a sender table abstracted by the [SenderTable] which maps these sender IDs //! to a concrete [SendEventProvider]s. A simple approach would be to use one send event provider //! for each OBSW thread and then subscribe for all interesting events for a particular thread @@ -66,7 +66,7 @@ use core::slice::Iter; #[cfg(feature = "alloc")] use hashbrown::HashMap; -use crate::SenderId; +use crate::ChannelId; #[cfg(feature = "std")] pub use stdmod::*; @@ -88,15 +88,11 @@ pub type EventU16WithAuxData = EventWithAuxData; pub trait SendEventProvider { type Error; - fn id(&self) -> SenderId; - fn send_no_data(&mut self, event: Provider) -> Result<(), Self::Error> { + fn id(&self) -> ChannelId; + fn send_no_data(&self, event: Provider) -> Result<(), Self::Error> { self.send(event, None) } - fn send( - &mut self, - event: Provider, - aux_data: Option, - ) -> Result<(), Self::Error>; + fn send(&self, event: Provider, aux_data: Option) -> Result<(), Self::Error>; } /// Generic abstraction for an event receiver. @@ -107,22 +103,22 @@ pub trait EventReceiver { /// To allow returning arbitrary additional auxiliary data, a mutable slice is passed to the /// [Self::receive] call as well. Receivers can write data to this slice, but care must be taken /// to avoid panics due to size missmatches or out of bound writes. - fn receive(&mut self) -> Option<(Event, Option)>; + fn receive(&self) -> Option<(Event, Option)>; } pub trait ListenerTable { fn get_listeners(&self) -> Vec; fn contains_listener(&self, key: &ListenerKey) -> bool; - fn get_listener_ids(&self, key: &ListenerKey) -> Option>; - fn add_listener(&mut self, key: ListenerKey, sender_id: SenderId) -> bool; + fn get_listener_ids(&self, key: &ListenerKey) -> Option>; + fn add_listener(&mut self, key: ListenerKey, sender_id: ChannelId) -> bool; fn remove_duplicates(&mut self, key: &ListenerKey); } pub trait SenderTable { - fn contains_send_event_provider(&self, id: &SenderId) -> bool; + fn contains_send_event_provider(&self, id: &ChannelId) -> bool; fn get_send_event_provider( &mut self, - id: &SenderId, + id: &ChannelId, ) -> Option<&mut Box>>; fn add_send_event_provider( &mut self, @@ -175,7 +171,7 @@ pub enum EventRoutingResult { pub enum EventRoutingError { SendError(E), NoSendersForKey(ListenerKey), - NoSenderForId(SenderId), + NoSenderForId(ChannelId), } #[derive(Debug)] @@ -190,12 +186,12 @@ impl EventManager { } /// Subscribe for a unique event. - pub fn subscribe_single(&mut self, event: &Event, sender_id: SenderId) { + pub fn subscribe_single(&mut self, event: &Event, sender_id: ChannelId) { self.update_listeners(ListenerKey::Single(event.raw_as_largest_type()), sender_id); } /// Subscribe for an event group. - pub fn subscribe_group(&mut self, group_id: LargestGroupIdRaw, sender_id: SenderId) { + pub fn subscribe_group(&mut self, group_id: LargestGroupIdRaw, sender_id: ChannelId) { self.update_listeners(ListenerKey::Group(group_id), sender_id); } @@ -203,7 +199,7 @@ impl EventManager { /// /// For example, this can be useful for a handler component which sends every event as /// a telemetry packet. - pub fn subscribe_all(&mut self, sender_id: SenderId) { + pub fn subscribe_all(&mut self, sender_id: ChannelId) { self.update_listeners(ListenerKey::All, sender_id); } } @@ -249,7 +245,7 @@ impl } } - fn update_listeners(&mut self, key: ListenerKey, sender_id: SenderId) { + fn update_listeners(&mut self, key: ListenerKey, sender_id: ChannelId) { self.listener_table.add_listener(key, sender_id); } @@ -315,7 +311,7 @@ impl #[derive(Default)] pub struct DefaultListenerTableProvider { - listeners: HashMap>, + listeners: HashMap>, } pub struct DefaultSenderTableProvider< @@ -324,7 +320,7 @@ pub struct DefaultSenderTableProvider< AuxDataProvider = Params, > { senders: HashMap< - SenderId, + ChannelId, Box>, >, } @@ -352,11 +348,11 @@ impl ListenerTable for DefaultListenerTableProvider { self.listeners.contains_key(key) } - fn get_listener_ids(&self, key: &ListenerKey) -> Option> { + fn get_listener_ids(&self, key: &ListenerKey) -> Option> { self.listeners.get(key).map(|vec| vec.iter()) } - fn add_listener(&mut self, key: ListenerKey, sender_id: SenderId) -> bool { + fn add_listener(&mut self, key: ListenerKey, sender_id: ChannelId) -> bool { if let Some(existing_list) = self.listeners.get_mut(&key) { existing_list.push(sender_id); } else { @@ -378,13 +374,13 @@ impl SenderTable for DefaultSenderTableProvider { - fn contains_send_event_provider(&self, id: &SenderId) -> bool { + fn contains_send_event_provider(&self, id: &ChannelId) -> bool { self.senders.contains_key(id) } fn get_send_event_provider( &mut self, - id: &SenderId, + id: &ChannelId, ) -> Option<&mut Box>> { self.senders.get_mut(id).filter(|sender| sender.id() == *id) @@ -424,7 +420,7 @@ pub mod stdmod { } } impl EventReceiver for MpscEventReceiver { - fn receive(&mut self) -> Option> { + fn receive(&self) -> Option> { if let Ok(event_and_data) = self.mpsc_receiver.try_recv() { return Some(event_and_data); } @@ -453,7 +449,7 @@ pub mod stdmod { fn id(&self) -> u32 { self.id } - fn send(&mut self, event: Event, aux_data: Option) -> Result<(), Self::Error> { + fn send(&self, event: Event, aux_data: Option) -> Result<(), Self::Error> { self.sender.send((event, aux_data)) } } @@ -490,7 +486,7 @@ mod tests { fn id(&self) -> u32 { self.id } - fn send(&mut self, event: EventU32, aux_data: Option) -> Result<(), Self::Error> { + fn send(&self, event: EventU32, aux_data: Option) -> Result<(), Self::Error> { self.mpsc_sender.send((event, aux_data)) } } diff --git a/satrs-core/src/lib.rs b/satrs-core/src/lib.rs index 4fed5c6..940300e 100644 --- a/satrs-core/src/lib.rs +++ b/satrs-core/src/lib.rs @@ -45,5 +45,5 @@ pub mod tmtc; pub use spacepackets; -// Generic sender ID type. -pub type SenderId = u32; +// Generic channel ID type. +pub type ChannelId = u32; diff --git a/satrs-core/src/pus/event.rs b/satrs-core/src/pus/event.rs index 57f378a..c1dfc5d 100644 --- a/satrs-core/src/pus/event.rs +++ b/satrs-core/src/pus/event.rs @@ -1,12 +1,12 @@ -use crate::pus::{source_buffer_large_enough, EcssTmtcError, EcssTmtcErrorWithSend}; -use spacepackets::ecss::EcssEnumeration; +use crate::pus::{source_buffer_large_enough, EcssTmtcError}; +use spacepackets::ecss::{EcssEnumeration, PusError}; use spacepackets::tm::PusTm; use spacepackets::tm::PusTmSecondaryHeader; use spacepackets::{SpHeader, MAX_APID}; use crate::pus::EcssTmSenderCore; #[cfg(feature = "alloc")] -pub use allocvec::EventReporter; +pub use alloc_mod::EventReporter; pub use spacepackets::ecss::event::*; pub struct EventReporterBase { @@ -27,14 +27,14 @@ impl EventReporterBase { }) } - pub fn event_info( + pub fn event_info( &mut self, buf: &mut [u8], - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: &[u8], event_id: impl EcssEnumeration, aux_data: Option<&[u8]>, - ) -> Result<(), EcssTmtcErrorWithSend> { + ) -> Result<(), EcssTmtcError> { self.generate_and_send_generic_tm( buf, Subservice::TmInfoReport, @@ -45,14 +45,14 @@ impl EventReporterBase { ) } - pub fn event_low_severity( + pub fn event_low_severity( &mut self, buf: &mut [u8], - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: &[u8], event_id: impl EcssEnumeration, aux_data: Option<&[u8]>, - ) -> Result<(), EcssTmtcErrorWithSend> { + ) -> Result<(), EcssTmtcError> { self.generate_and_send_generic_tm( buf, Subservice::TmLowSeverityReport, @@ -63,14 +63,14 @@ impl EventReporterBase { ) } - pub fn event_medium_severity( + pub fn event_medium_severity( &mut self, buf: &mut [u8], - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: &[u8], event_id: impl EcssEnumeration, aux_data: Option<&[u8]>, - ) -> Result<(), EcssTmtcErrorWithSend> { + ) -> Result<(), EcssTmtcError> { self.generate_and_send_generic_tm( buf, Subservice::TmMediumSeverityReport, @@ -81,14 +81,14 @@ impl EventReporterBase { ) } - pub fn event_high_severity( + pub fn event_high_severity( &mut self, buf: &mut [u8], - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: &[u8], event_id: impl EcssEnumeration, aux_data: Option<&[u8]>, - ) -> Result<(), EcssTmtcErrorWithSend> { + ) -> Result<(), EcssTmtcError> { self.generate_and_send_generic_tm( buf, Subservice::TmHighSeverityReport, @@ -99,19 +99,17 @@ impl EventReporterBase { ) } - fn generate_and_send_generic_tm( + fn generate_and_send_generic_tm( &mut self, buf: &mut [u8], subservice: Subservice, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: &[u8], event_id: impl EcssEnumeration, aux_data: Option<&[u8]>, - ) -> Result<(), EcssTmtcErrorWithSend> { + ) -> Result<(), EcssTmtcError> { let tm = self.generate_generic_event_tm(buf, subservice, time_stamp, event_id, aux_data)?; - sender - .send_tm(tm.into()) - .map_err(|e| EcssTmtcErrorWithSend::SendError(e))?; + sender.send_tm(tm.into())?; self.msg_count += 1; Ok(()) } @@ -138,7 +136,9 @@ impl EventReporterBase { Some(time_stamp), ); let mut current_idx = 0; - event_id.write_to_be_bytes(&mut buf[0..event_id.size()])?; + event_id + .write_to_be_bytes(&mut buf[0..event_id.size()]) + .map_err(PusError::ByteConversion)?; current_idx += event_id.size(); if let Some(aux_data) = aux_data { buf[current_idx..current_idx + aux_data.len()].copy_from_slice(aux_data); @@ -154,7 +154,7 @@ impl EventReporterBase { } #[cfg(feature = "alloc")] -mod allocvec { +mod alloc_mod { use super::*; use alloc::vec; use alloc::vec::Vec; @@ -172,13 +172,13 @@ mod allocvec { reporter, }) } - pub fn event_info( + pub fn event_info( &mut self, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: &[u8], event_id: impl EcssEnumeration, aux_data: Option<&[u8]>, - ) -> Result<(), EcssTmtcErrorWithSend> { + ) -> Result<(), EcssTmtcError> { self.reporter.event_info( self.source_data_buf.as_mut_slice(), sender, @@ -188,13 +188,13 @@ mod allocvec { ) } - pub fn event_low_severity( + pub fn event_low_severity( &mut self, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: &[u8], event_id: impl EcssEnumeration, aux_data: Option<&[u8]>, - ) -> Result<(), EcssTmtcErrorWithSend> { + ) -> Result<(), EcssTmtcError> { self.reporter.event_low_severity( self.source_data_buf.as_mut_slice(), sender, @@ -204,13 +204,13 @@ mod allocvec { ) } - pub fn event_medium_severity( + pub fn event_medium_severity( &mut self, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: &[u8], event_id: impl EcssEnumeration, aux_data: Option<&[u8]>, - ) -> Result<(), EcssTmtcErrorWithSend> { + ) -> Result<(), EcssTmtcError> { self.reporter.event_medium_severity( self.source_data_buf.as_mut_slice(), sender, @@ -220,13 +220,13 @@ mod allocvec { ) } - pub fn event_high_severity( + pub fn event_high_severity( &mut self, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: &[u8], event_id: impl EcssEnumeration, aux_data: Option<&[u8]>, - ) -> Result<(), EcssTmtcErrorWithSend> { + ) -> Result<(), EcssTmtcError> { self.reporter.event_high_severity( self.source_data_buf.as_mut_slice(), sender, @@ -243,8 +243,8 @@ mod tests { use super::*; use crate::events::{EventU32, Severity}; use crate::pus::tests::CommonTmInfo; - use crate::pus::{EcssSender, PusTmWrapper}; - use crate::SenderId; + use crate::pus::{EcssChannel, PusTmWrapper}; + use crate::ChannelId; use spacepackets::ByteConversionError; use std::cell::RefCell; use std::collections::VecDeque; @@ -268,16 +268,14 @@ mod tests { pub service_queue: RefCell>, } - impl EcssSender for TestSender { - fn id(&self) -> SenderId { + impl EcssChannel for TestSender { + fn id(&self) -> ChannelId { 0 } } impl EcssTmSenderCore for TestSender { - type Error = (); - - fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error> { + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { match tm { PusTmWrapper::InStore(_) => { panic!("TestSender: unexpected call with address"); @@ -427,9 +425,9 @@ mod tests { let err = reporter.event_info(sender, &time_stamp_empty, event, None); assert!(err.is_err()); let err = err.unwrap_err(); - if let EcssTmtcErrorWithSend::EcssTmtcError(EcssTmtcError::ByteConversion( - ByteConversionError::ToSliceTooSmall(missmatch), - )) = err + if let EcssTmtcError::Pus(PusError::ByteConversion(ByteConversionError::ToSliceTooSmall( + missmatch, + ))) = err { assert_eq!(missmatch.expected, 4); assert_eq!(missmatch.found, expected_found_len); diff --git a/satrs-core/src/pus/event_man.rs b/satrs-core/src/pus/event_man.rs index 0c701e6..132e24c 100644 --- a/satrs-core/src/pus/event_man.rs +++ b/satrs-core/src/pus/event_man.rs @@ -13,7 +13,7 @@ pub use crate::pus::event::EventReporter; use crate::pus::verification::TcStateToken; #[cfg(feature = "alloc")] use crate::pus::EcssTmSenderCore; -use crate::pus::EcssTmtcErrorWithSend; +use crate::pus::EcssTmtcError; #[cfg(feature = "alloc")] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] pub use alloc_mod::*; @@ -95,13 +95,13 @@ pub struct EventRequestWithToken { } #[derive(Debug)] -pub enum EventManError { - EcssTmtcError(EcssTmtcErrorWithSend), +pub enum EventManError { + EcssTmtcError(EcssTmtcError), SeverityMissmatch(Severity, Severity), } -impl From> for EventManError { - fn from(v: EcssTmtcErrorWithSend) -> Self { +impl From for EventManError { + fn from(v: EcssTmtcError) -> Self { Self::EcssTmtcError(v) } } @@ -173,13 +173,13 @@ pub mod alloc_mod { self.backend.disable_event_reporting(event) } - pub fn generate_pus_event_tm_generic( + pub fn generate_pus_event_tm_generic( &mut self, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: &[u8], event: Event, aux_data: Option<&[u8]>, - ) -> Result> { + ) -> Result { if !self.backend.event_enabled(&event) { return Ok(false); } @@ -223,13 +223,13 @@ pub mod alloc_mod { self.backend.disable_event_reporting(event.as_ref()) } - pub fn generate_pus_event_tm( + pub fn generate_pus_event_tm( &mut self, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: &[u8], event: EventU32TypedSev, aux_data: Option<&[u8]>, - ) -> Result> { + ) -> Result { self.generate_pus_event_tm_generic(sender, time_stamp, event.into(), aux_data) } } diff --git a/satrs-core/src/pus/event_srv.rs b/satrs-core/src/pus/event_srv.rs index 0ff81ee..8f3abb4 100644 --- a/satrs-core/src/pus/event_srv.rs +++ b/satrs-core/src/pus/event_srv.rs @@ -5,14 +5,14 @@ use crate::pus::verification::{ StdVerifReporterWithSender, TcStateAccepted, TcStateToken, VerificationToken, }; use crate::pus::{ - AcceptedTc, PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError, - PusServiceBase, PusServiceHandler, + EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult, + PusPacketHandlingError, PusServiceBase, PusServiceHandler, }; -use crate::tmtc::tm_helper::SharedTmStore; use spacepackets::ecss::event::Subservice; use spacepackets::ecss::PusPacket; use spacepackets::tc::PusTc; -use std::sync::mpsc::{Receiver, Sender}; +use std::boxed::Box; +use std::sync::mpsc::Sender; pub struct PusService5EventHandler { psb: PusServiceBase, @@ -21,20 +21,18 @@ pub struct PusService5EventHandler { impl PusService5EventHandler { pub fn new( - receiver: Receiver, - tc_pool: SharedPool, - tm_tx: Sender, - tm_store: SharedTmStore, + tc_receiver: Box, + shared_tc_store: SharedPool, + tm_sender: Box, tm_apid: u16, verification_handler: StdVerifReporterWithSender, event_request_tx: Sender, ) -> Self { Self { psb: PusServiceBase::new( - receiver, - tc_pool, - tm_tx, - tm_store, + tc_receiver, + shared_tc_store, + tm_sender, tm_apid, verification_handler, ), @@ -57,7 +55,7 @@ impl PusServiceHandler for PusService5EventHandler { token: VerificationToken, ) -> Result { self.copy_tc_to_buf(addr)?; - let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf).unwrap(); + let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf)?; let subservice = tc.subservice(); let srv = Subservice::try_from(subservice); if srv.is_err() { @@ -99,7 +97,7 @@ impl PusServiceHandler for PusService5EventHandler { self.event_request_tx .send(event_req_with_token) .map_err(|_| { - PusPacketHandlingError::SendError("Forwarding event request failed".into()) + PusPacketHandlingError::Other("Forwarding event request failed".into()) })?; if let Some(partial_error) = partial_error { return Ok(PusPacketHandlerResult::RequestHandledPartialSuccess( diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index cc5f9ce..7325b43 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -2,17 +2,17 @@ //! //! This module contains structures to make working with the PUS C standard easier. //! The satrs-example application contains various usage examples of these components. -use crate::pus::verification::TcStateToken; -use crate::SenderId; +use crate::ChannelId; +use core::fmt::{Display, Formatter}; #[cfg(feature = "alloc")] use downcast_rs::{impl_downcast, Downcast}; #[cfg(feature = "alloc")] use dyn_clone::DynClone; use spacepackets::ecss::PusError; use spacepackets::tc::PusTc; -use spacepackets::time::TimestampError; use spacepackets::tm::PusTm; -use spacepackets::{ByteConversionError, SizeMissmatch}; +use spacepackets::{ByteConversionError, SizeMissmatch, SpHeader}; +use std::error::Error; pub mod event; pub mod event_man; @@ -28,7 +28,8 @@ pub mod verification; #[cfg(feature = "alloc")] pub use alloc_mod::*; -use crate::pool::StoreAddr; +use crate::pool::{StoreAddr, StoreError}; +use crate::pus::verification::{TcStateAccepted, TcStateToken, VerificationToken}; #[cfg(feature = "std")] pub use std_mod::*; @@ -50,66 +51,191 @@ impl<'tm> From> for PusTmWrapper<'tm> { } } -#[derive(Debug, Clone)] -pub enum EcssTmtcErrorWithSend { - /// Errors related to sending the telemetry to a TMTC recipient - SendError(E), - EcssTmtcError(EcssTmtcError), +pub type TcAddrWithToken = (StoreAddr, TcStateToken); + +/// Generic abstraction for a telecommand being sent around after is has been accepted. +/// The actual telecommand is stored inside a pre-allocated pool structure. +pub type AcceptedTc = (StoreAddr, VerificationToken); + +/// Generic error type for sending something via a message queue. +#[derive(Debug, Copy, Clone)] +pub enum GenericSendError { + RxDisconnected, + QueueFull(u32), } -impl From for EcssTmtcErrorWithSend { - fn from(value: EcssTmtcError) -> Self { - Self::EcssTmtcError(value) +impl Display for GenericSendError { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + match self { + GenericSendError::RxDisconnected => { + write!(f, "rx side has disconnected") + } + GenericSendError::QueueFull(max_cap) => { + write!(f, "queue with max capacity of {max_cap} is full") + } + } } } -/// Generic error type for PUS TM handling. +#[cfg(feature = "std")] +impl Error for GenericSendError {} + +/// Generic error type for sending something via a message queue. +#[derive(Debug, Copy, Clone)] +pub enum GenericRecvError { + Empty, + TxDisconnected, +} + +impl Display for GenericRecvError { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + match self { + Self::TxDisconnected => { + write!(f, "tx side has disconnected") + } + Self::Empty => { + write!(f, "nothing to receive") + } + } + } +} + +#[cfg(feature = "std")] +impl Error for GenericRecvError {} + #[derive(Debug, Clone)] pub enum EcssTmtcError { - /// Errors related to the time stamp format of the telemetry - Timestamp(TimestampError), - /// Errors related to byte conversion, for example insufficient buffer size for given data - ByteConversion(ByteConversionError), - /// Errors related to PUS packet format + StoreLock, + Store(StoreError), Pus(PusError), + CantSendAddr(StoreAddr), + Send(GenericSendError), + Recv(GenericRecvError), +} + +impl Display for EcssTmtcError { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + match self { + EcssTmtcError::StoreLock => { + write!(f, "store lock error") + } + EcssTmtcError::Store(store) => { + write!(f, "store error: {store}") + } + EcssTmtcError::Pus(pus_e) => { + write!(f, "PUS error: {pus_e}") + } + EcssTmtcError::CantSendAddr(addr) => { + write!(f, "can not send address {addr}") + } + EcssTmtcError::Send(send_e) => { + write!(f, "send error {send_e}") + } + EcssTmtcError::Recv(recv_e) => { + write!(f, "recv error {recv_e}") + } + } + } +} + +impl From for EcssTmtcError { + fn from(value: StoreError) -> Self { + Self::Store(value) + } } impl From for EcssTmtcError { - fn from(e: PusError) -> Self { - EcssTmtcError::Pus(e) + fn from(value: PusError) -> Self { + Self::Pus(value) } } -impl From for EcssTmtcError { - fn from(e: ByteConversionError) -> Self { - EcssTmtcError::ByteConversion(e) +impl From for EcssTmtcError { + fn from(value: GenericSendError) -> Self { + Self::Send(value) } } -pub trait EcssSender: Send { +impl From for EcssTmtcError { + fn from(value: GenericRecvError) -> Self { + Self::Recv(value) + } +} + +#[cfg(feature = "std")] +impl Error for EcssTmtcError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + EcssTmtcError::Store(e) => Some(e), + EcssTmtcError::Pus(e) => Some(e), + EcssTmtcError::Send(e) => Some(e), + _ => None, + } + } +} +pub trait EcssChannel: Send { /// Each sender can have an ID associated with it - fn id(&self) -> SenderId; + fn id(&self) -> ChannelId; fn name(&self) -> &'static str { "unset" } } + /// Generic trait for a user supplied sender object. /// /// This sender object is responsible for sending PUS telemetry to a TM sink. -pub trait EcssTmSenderCore: EcssSender { - type Error; - - fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error>; +pub trait EcssTmSenderCore: EcssChannel { + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError>; } /// Generic trait for a user supplied sender object. /// /// This sender object is responsible for sending PUS telecommands to a TC recipient. Each /// telecommand can optionally have a token which contains its verification state. -pub trait EcssTcSenderCore: EcssSender { - type Error; +pub trait EcssTcSenderCore: EcssChannel { + fn send_tc(&self, tc: PusTc, token: Option) -> Result<(), EcssTmtcError>; +} - fn send_tc(&self, tc: PusTc, token: Option) -> Result<(), Self::Error>; +pub struct ReceivedTcWrapper { + pub store_addr: StoreAddr, + pub token: Option, +} + +#[derive(Debug, Clone)] +pub enum TryRecvTmtcError { + Error(EcssTmtcError), + Empty, +} + +impl From for TryRecvTmtcError { + fn from(value: EcssTmtcError) -> Self { + Self::Error(value) + } +} + +impl From for TryRecvTmtcError { + fn from(value: PusError) -> Self { + Self::Error(value.into()) + } +} + +impl From for TryRecvTmtcError { + fn from(value: StoreError) -> Self { + Self::Error(value.into()) + } +} + +/// Generic trait for a user supplied receiver object. +pub trait EcssTcReceiverCore: EcssChannel { + fn recv_tc(&self) -> Result; +} + +/// Generic trait for objects which can receive ECSS PUS telecommands. This trait is +/// implemented by the [crate::tmtc::pus_distrib::PusDistributor] objects to allow passing PUS TC +/// packets into it. +pub trait ReceivesEcssPusTc { + type Error; + fn pass_pus_tc(&mut self, header: &SpHeader, pus_tc: &PusTc) -> Result<(), Self::Error>; } #[cfg(feature = "alloc")] @@ -133,8 +259,8 @@ mod alloc_mod { /// Blanket implementation for all types which implement [EcssTmSenderCore] and are clonable. impl EcssTmSender for T where T: EcssTmSenderCore + Clone + 'static {} - dyn_clone::clone_trait_object!( EcssTmSender); - impl_downcast!(EcssTmSender assoc Error); + dyn_clone::clone_trait_object!(EcssTmSender); + impl_downcast!(EcssTmSender); /// Extension trait for [EcssTcSenderCore]. /// @@ -153,54 +279,66 @@ mod alloc_mod { /// Blanket implementation for all types which implement [EcssTcSenderCore] and are clonable. impl EcssTcSender for T where T: EcssTcSenderCore + Clone + 'static {} - dyn_clone::clone_trait_object!( EcssTcSender); - impl_downcast!(EcssTcSender assoc Error); + dyn_clone::clone_trait_object!(EcssTcSender); + impl_downcast!(EcssTcSender); + + /// Extension trait for [EcssTcReceiverCore]. + /// + /// It provides additional functionality, for example by implementing the [Downcast] trait + /// and the [DynClone] trait. + /// + /// [Downcast] is implemented to allow passing the sender as a boxed trait object and still + /// retrieve the concrete type at a later point. + /// + /// [DynClone] allows cloning the trait object as long as the boxed object implements + /// [Clone]. + #[cfg(feature = "alloc")] + #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] + pub trait EcssTcReceiver: EcssTcReceiverCore + Downcast {} + + /// Blanket implementation for all types which implement [EcssTcReceiverCore] and are clonable. + impl EcssTcReceiver for T where T: EcssTcReceiverCore + 'static {} + + impl_downcast!(EcssTcReceiver); } #[cfg(feature = "std")] pub mod std_mod { - use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError}; + use crate::pool::{SharedPool, StoreAddr}; use crate::pus::verification::{ StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; - use crate::pus::{EcssSender, EcssTmSenderCore, PusTmWrapper}; + use crate::pus::{ + EcssChannel, EcssTcReceiver, EcssTcReceiverCore, EcssTmSender, EcssTmSenderCore, + EcssTmtcError, GenericRecvError, GenericSendError, PusTmWrapper, ReceivedTcWrapper, + TcAddrWithToken, TryRecvTmtcError, + }; use crate::tmtc::tm_helper::SharedTmStore; - use crate::SenderId; + use crate::ChannelId; + use alloc::boxed::Box; use alloc::vec::Vec; - use spacepackets::ecss::{PusError, SerializablePusPacket}; + use spacepackets::ecss::PusError; use spacepackets::time::cds::TimeProvider; - use spacepackets::time::{StdTimestampError, TimeWriter}; + use spacepackets::time::StdTimestampError; + use spacepackets::time::TimeWriter; + use spacepackets::tm::PusTm; use std::cell::RefCell; - use std::format; use std::string::String; - use std::sync::{mpsc, RwLockWriteGuard}; + use std::sync::mpsc; + use std::sync::mpsc::{SendError, TryRecvError}; use thiserror::Error; - #[derive(Debug, Clone, Error)] - pub enum MpscTmInStoreSenderError { - #[error("RwGuard lock error")] - StoreLock, - #[error("Generic PUS error: {0}")] - Pus(#[from] PusError), - #[error("Generic store error: {0}")] - Store(#[from] StoreError), - #[error("MPSC channel send error: {0}")] - Send(#[from] mpsc::SendError), - #[error("RX handle has disconnected")] - RxDisconnected, - } - #[derive(Clone)] pub struct MpscTmInStoreSender { - id: SenderId, + id: ChannelId, name: &'static str, - store_helper: SharedPool, + shared_tm_store: SharedTmStore, sender: mpsc::Sender, pub ignore_poison_errors: bool, } - impl EcssSender for MpscTmInStoreSender { - fn id(&self) -> SenderId { + impl EcssChannel for MpscTmInStoreSender { + fn id(&self) -> ChannelId { self.id } @@ -209,39 +347,24 @@ pub mod std_mod { } } + impl From> for EcssTmtcError { + fn from(_: SendError) -> Self { + Self::Send(GenericSendError::RxDisconnected) + } + } impl MpscTmInStoreSender { - pub fn send_direct_tm( - &self, - tmtc: impl SerializablePusPacket, - ) -> Result<(), MpscTmInStoreSenderError> { - let operation = |mut store: RwLockWriteGuard| { - let (addr, slice) = store.free_element(tmtc.len_packed())?; - tmtc.write_to_bytes(slice)?; - self.sender.send(addr)?; - Ok(()) - }; - match self.store_helper.write() { - Ok(pool) => operation(pool), - Err(e) => { - if self.ignore_poison_errors { - operation(e.into_inner()) - } else { - Err(MpscTmInStoreSenderError::StoreLock) - } - } - } + pub fn send_direct_tm(&self, tm: PusTm) -> Result<(), EcssTmtcError> { + let addr = self.shared_tm_store.add_pus_tm(&tm)?; + self.sender + .send(addr) + .map_err(|_| EcssTmtcError::Send(GenericSendError::RxDisconnected)) } } impl EcssTmSenderCore for MpscTmInStoreSender { - type Error = MpscTmInStoreSenderError; - - fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error> { + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { match tm { - PusTmWrapper::InStore(addr) => self - .sender - .send(addr) - .map_err(MpscTmInStoreSenderError::Send), + PusTmWrapper::InStore(addr) => self.sender.send(addr).map_err(|e| e.into()), PusTmWrapper::Direct(tm) => self.send_direct_tm(tm), } } @@ -249,31 +372,66 @@ pub mod std_mod { impl MpscTmInStoreSender { pub fn new( - id: SenderId, + id: ChannelId, name: &'static str, - store_helper: SharedPool, + shared_tm_store: SharedTmStore, sender: mpsc::Sender, ) -> Self { Self { id, name, - store_helper, + shared_tm_store, sender, ignore_poison_errors: false, } } } - #[derive(Debug, Clone, Error)] - pub enum MpscTmAsVecSenderError { - #[error("Generic PUS error: {0}")] - Pus(#[from] PusError), - #[error("MPSC channel send error: {0}")] - Send(#[from] mpsc::SendError>), - #[error("can not handle store addresses")] - CantSendAddr(StoreAddr), - #[error("RX handle has disconnected")] - RxDisconnected, + pub struct MpscTcInStoreReceiver { + id: ChannelId, + name: &'static str, + receiver: mpsc::Receiver, + pub ignore_poison_errors: bool, + } + + impl EcssChannel for MpscTcInStoreReceiver { + fn id(&self) -> ChannelId { + self.id + } + + fn name(&self) -> &'static str { + self.name + } + } + + impl EcssTcReceiverCore for MpscTcInStoreReceiver { + fn recv_tc(&self) -> Result { + let (store_addr, token) = self.receiver.try_recv().map_err(|e| match e { + TryRecvError::Empty => TryRecvTmtcError::Empty, + TryRecvError::Disconnected => { + TryRecvTmtcError::Error(EcssTmtcError::from(GenericRecvError::TxDisconnected)) + } + })?; + Ok(ReceivedTcWrapper { + store_addr, + token: Some(token), + }) + } + } + + impl MpscTcInStoreReceiver { + pub fn new( + id: ChannelId, + name: &'static str, + receiver: mpsc::Receiver, + ) -> Self { + Self { + id, + name, + receiver, + ignore_poison_errors: false, + } + } } /// This class can be used if frequent heap allocations during run-time are not an issue. @@ -282,19 +440,25 @@ pub mod std_mod { /// going to be called with direct packets. #[derive(Clone)] pub struct MpscTmAsVecSender { - id: SenderId, + id: ChannelId, sender: mpsc::Sender>, name: &'static str, } + impl From>> for EcssTmtcError { + fn from(_: SendError>) -> Self { + Self::Send(GenericSendError::RxDisconnected) + } + } + impl MpscTmAsVecSender { pub fn new(id: u32, name: &'static str, sender: mpsc::Sender>) -> Self { Self { id, sender, name } } } - impl EcssSender for MpscTmAsVecSender { - fn id(&self) -> SenderId { + impl EcssChannel for MpscTmAsVecSender { + fn id(&self) -> ChannelId { self.id } fn name(&self) -> &'static str { @@ -303,18 +467,13 @@ pub mod std_mod { } impl EcssTmSenderCore for MpscTmAsVecSender { - type Error = MpscTmAsVecSenderError; - - fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error> { + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { match tm { - PusTmWrapper::InStore(addr) => Err(MpscTmAsVecSenderError::CantSendAddr(addr)), + PusTmWrapper::InStore(addr) => Err(EcssTmtcError::CantSendAddr(addr)), PusTmWrapper::Direct(tm) => { let mut vec = Vec::new(); - tm.append_to_vec(&mut vec) - .map_err(MpscTmAsVecSenderError::Pus)?; - self.sender - .send(vec) - .map_err(MpscTmAsVecSenderError::Send)?; + tm.append_to_vec(&mut vec).map_err(EcssTmtcError::Pus)?; + self.sender.send(vec)?; Ok(()) } } @@ -323,36 +482,34 @@ pub mod std_mod { #[derive(Debug, Clone, Error)] pub enum PusPacketHandlingError { - #[error("Generic PUS error: {0}")] - PusError(#[from] PusError), - #[error("Wrong service number {0} for packet handler")] + #[error("generic PUS error: {0}")] + Pus(#[from] PusError), + #[error("wrong service number {0} for packet handler")] WrongService(u8), - #[error("Invalid subservice {0}")] + #[error("invalid subservice {0}")] InvalidSubservice(u8), - #[error("Not enough application data available: {0}")] + #[error("not enough application data available: {0}")] NotEnoughAppData(String), - #[error("Invalid application data")] + #[error("invalid application data")] InvalidAppData(String), - #[error("Generic store error: {0}")] - StoreError(#[from] StoreError), - #[error("Error with the pool RwGuard: {0}")] - RwGuardError(String), - #[error("MQ send error: {0}")] - SendError(String), - #[error("TX message queue side has disconnected")] - QueueDisconnected, - #[error("Other error {0}")] - OtherError(String), + #[error("generic ECSS tmtc error: {0}")] + EcssTmtc(#[from] EcssTmtcError), + #[error("invalid verification token")] + InvalidVerificationToken, + #[error("other error {0}")] + Other(String), } #[derive(Debug, Clone, Error)] pub enum PartialPusHandlingError { - #[error("Generic timestamp generation error")] - Time(StdTimestampError), - #[error("Error sending telemetry: {0}")] - TmSend(String), - #[error("Error sending verification message")] + #[error("generic timestamp generation error")] + Time(#[from] StdTimestampError), + #[error("error sending telemetry: {0}")] + TmSend(#[from] EcssTmtcError), + #[error("error sending verification message")] Verification, + #[error("invalid verification token")] + NoVerificationToken, } /// Generic result type for handlers which can process PUS packets. @@ -371,18 +528,13 @@ pub mod std_mod { } } - /// Generic abstraction for a telecommand being sent around after is has been accepted. - /// The actual telecommand is stored inside a pre-allocated pool structure. - pub type AcceptedTc = (StoreAddr, VerificationToken); - - /// Base class for handlers which can handle PUS TC packets. Right now, the message queue - /// backend is constrained to [mpsc::channel]s and the verification reporter - /// is constrained to the [StdVerifReporterWithSender]. + /// Base class for handlers which can handle PUS TC packets. Right now, the verification + /// reporter is constrained to the [StdVerifReporterWithSender] and the service handler + /// relies on TMTC packets being exchanged via a [SharedPool]. pub struct PusServiceBase { - pub tc_rx: mpsc::Receiver, - pub tc_store: SharedPool, - pub tm_tx: mpsc::Sender, - pub tm_store: SharedTmStore, + pub tc_receiver: Box, + pub shared_tc_store: SharedPool, + pub tm_sender: Box, pub tm_apid: u16, /// The verification handler is wrapped in a [RefCell] to allow the interior mutability /// pattern. This makes writing methods which are not mutable a lot easier. @@ -393,19 +545,17 @@ pub mod std_mod { impl PusServiceBase { pub fn new( - receiver: mpsc::Receiver, - tc_pool: SharedPool, - tm_tx: mpsc::Sender, - tm_store: SharedTmStore, + tc_receiver: Box, + shared_tc_store: SharedPool, + tm_sender: Box, tm_apid: u16, verification_handler: StdVerifReporterWithSender, ) -> Self { Self { - tc_rx: receiver, - tc_store: tc_pool, + tc_receiver, + shared_tc_store, tm_apid, - tm_tx, - tm_store, + tm_sender, verification_handler: RefCell::new(verification_handler), pus_buf: [0; 2048], pus_size: 0, @@ -447,9 +597,9 @@ pub mod std_mod { // Keep locked section as short as possible. let psb_mut = self.psb_mut(); let mut tc_pool = psb_mut - .tc_store + .shared_tc_store .write() - .map_err(|e| PusPacketHandlingError::RwGuardError(format!("{e}")))?; + .map_err(|_| PusPacketHandlingError::EcssTmtc(EcssTmtcError::StoreLock))?; let tc_guard = tc_pool.read_with_guard(addr); let tc_raw = tc_guard.read().unwrap(); psb_mut.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); @@ -457,27 +607,34 @@ pub mod std_mod { } fn handle_next_packet(&mut self) -> Result { - return match self.psb().tc_rx.try_recv() { - Ok((addr, token)) => self.handle_one_tc(addr, token), - Err(e) => match e { - mpsc::TryRecvError::Empty => Ok(PusPacketHandlerResult::Empty), - mpsc::TryRecvError::Disconnected => { - Err(PusPacketHandlingError::QueueDisconnected) + match self.psb().tc_receiver.recv_tc() { + Ok(ReceivedTcWrapper { store_addr, token }) => { + if token.is_none() { + return Err(PusPacketHandlingError::InvalidVerificationToken); } + let token = token.unwrap(); + let accepted_token = VerificationToken::::try_from(token) + .map_err(|_| PusPacketHandlingError::InvalidVerificationToken)?; + self.handle_one_tc(store_addr, accepted_token) + } + Err(e) => match e { + TryRecvTmtcError::Error(e) => Err(PusPacketHandlingError::EcssTmtc(e)), + TryRecvTmtcError::Empty => Ok(PusPacketHandlerResult::Empty), }, - }; + } } } } pub(crate) fn source_buffer_large_enough(cap: usize, len: usize) -> Result<(), EcssTmtcError> { if len > cap { - return Err(EcssTmtcError::ByteConversion( - ByteConversionError::ToSliceTooSmall(SizeMissmatch { + return Err( + PusError::ByteConversion(ByteConversionError::ToSliceTooSmall(SizeMissmatch { found: cap, expected: len, - }), - )); + })) + .into(), + ); } Ok(()) } diff --git a/satrs-core/src/pus/scheduler_srv.rs b/satrs-core/src/pus/scheduler_srv.rs index 87f0830..52fc33a 100644 --- a/satrs-core/src/pus/scheduler_srv.rs +++ b/satrs-core/src/pus/scheduler_srv.rs @@ -2,13 +2,13 @@ use crate::pool::{SharedPool, StoreAddr}; use crate::pus::scheduler::PusScheduler; use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken}; use crate::pus::{ - AcceptedTc, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHandler, + EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, + PusServiceHandler, }; -use crate::tmtc::tm_helper::SharedTmStore; use spacepackets::ecss::{scheduling, PusPacket}; use spacepackets::tc::PusTc; use spacepackets::time::cds::TimeProvider; -use std::sync::mpsc::{Receiver, Sender}; +use std::boxed::Box; /// This is a helper class for [std] environments to handle generic PUS 11 (scheduling service) /// packets. This handler is constrained to using the [PusScheduler], but is able to process @@ -25,20 +25,18 @@ pub struct PusService11SchedHandler { impl PusService11SchedHandler { pub fn new( - receiver: Receiver, - tc_pool: SharedPool, - tm_tx: Sender, - tm_store: SharedTmStore, + tc_receiver: Box, + shared_tc_store: SharedPool, + tm_sender: Box, tm_apid: u16, verification_handler: StdVerifReporterWithSender, scheduler: PusScheduler, ) -> Self { Self { psb: PusServiceBase::new( - receiver, - tc_pool, - tm_tx, - tm_store, + tc_receiver, + shared_tc_store, + tm_sender, tm_apid, verification_handler, ), @@ -69,8 +67,9 @@ impl PusServiceHandler for PusService11SchedHandler { token: VerificationToken, ) -> Result { self.copy_tc_to_buf(addr)?; - let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf).unwrap(); - let std_service = scheduling::Subservice::try_from(tc.subservice()); + let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf)?; + let subservice = tc.subservice(); + let std_service = scheduling::Subservice::try_from(subservice); if std_service.is_err() { return Ok(PusPacketHandlerResult::CustomSubservice( tc.subservice(), @@ -126,7 +125,11 @@ impl PusServiceHandler for PusService11SchedHandler { .start_success(token, Some(&time_stamp)) .expect("Error sending start success"); - let mut pool = self.psb.tc_store.write().expect("Locking pool failed"); + let mut pool = self + .psb + .shared_tc_store + .write() + .expect("Locking pool failed"); self.scheduler .reset(pool.as_mut()) @@ -146,7 +149,11 @@ impl PusServiceHandler for PusService11SchedHandler { .start_success(token, Some(&time_stamp)) .expect("error sending start success"); - let mut pool = self.psb.tc_store.write().expect("locking pool failed"); + let mut pool = self + .psb + .shared_tc_store + .write() + .expect("locking pool failed"); self.scheduler .insert_wrapped_tc::(&tc, pool.as_mut()) .expect("insertion of activity into pool failed"); diff --git a/satrs-core/src/pus/test.rs b/satrs-core/src/pus/test.rs index 0e596fa..178493b 100644 --- a/satrs-core/src/pus/test.rs +++ b/satrs-core/src/pus/test.rs @@ -1,16 +1,14 @@ use crate::pool::{SharedPool, StoreAddr}; use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken}; use crate::pus::{ - AcceptedTc, PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError, - PusServiceBase, PusServiceHandler, + EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult, + PusPacketHandlingError, PusServiceBase, PusServiceHandler, PusTmWrapper, }; -use crate::tmtc::tm_helper::SharedTmStore; use spacepackets::ecss::PusPacket; use spacepackets::tc::PusTc; use spacepackets::tm::{PusTm, PusTmSecondaryHeader}; use spacepackets::SpHeader; -use std::format; -use std::sync::mpsc::{Receiver, Sender}; +use std::boxed::Box; /// This is a helper class for [std] environments to handle generic PUS 17 (test service) packets. /// This handler only processes ping requests and generates a ping reply for them accordingly. @@ -20,19 +18,17 @@ pub struct PusService17TestHandler { impl PusService17TestHandler { pub fn new( - receiver: Receiver, - tc_pool: SharedPool, - tm_tx: Sender, - tm_store: SharedTmStore, + tc_receiver: Box, + shared_tc_store: SharedPool, + tm_sender: Box, tm_apid: u16, verification_handler: StdVerifReporterWithSender, ) -> Self { Self { psb: PusServiceBase::new( - receiver, - tc_pool, - tm_tx, - tm_store, + tc_receiver, + shared_tc_store, + tm_sender, tm_apid, verification_handler, ), @@ -77,15 +73,15 @@ impl PusServiceHandler for PusService17TestHandler { let mut reply_header = SpHeader::tm_unseg(self.psb.tm_apid, 0, 0).unwrap(); let tc_header = PusTmSecondaryHeader::new_simple(17, 2, &time_stamp); let ping_reply = PusTm::new(&mut reply_header, tc_header, None, true); - let addr = self.psb.tm_store.add_pus_tm(&ping_reply); - if let Err(e) = self + let result = self .psb - .tm_tx - .send(addr) - .map_err(|e| PartialPusHandlingError::TmSend(format!("{e}"))) - { - partial_error = Some(e); + .tm_sender + .send_tm(PusTmWrapper::Direct(ping_reply)) + .map_err(PartialPusHandlingError::TmSend); + if let Err(err) = result { + partial_error = Some(err); } + if let Some(start_token) = start_token { if self .psb @@ -118,7 +114,7 @@ mod tests { use crate::pus::verification::{ RequestId, StdVerifReporterWithSender, VerificationReporterCfg, }; - use crate::pus::{MpscTmInStoreSender, PusServiceHandler}; + use crate::pus::{MpscTcInStoreReceiver, MpscTmInStoreSender, PusServiceHandler}; use crate::tmtc::tm_helper::SharedTmStore; use spacepackets::ecss::{PusPacket, SerializablePusPacket}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; @@ -137,24 +133,21 @@ mod tests { let tc_pool = LocalPool::new(pool_cfg.clone()); let tm_pool = LocalPool::new(pool_cfg); let tc_pool_shared = SharedPool::new(RwLock::new(Box::new(tc_pool))); - let tm_pool_shared = SharedPool::new(RwLock::new(Box::new(tm_pool))); - let shared_tm_store = SharedTmStore::new(tm_pool_shared.clone()); - let (test_srv_tx, test_srv_rx) = mpsc::channel(); + let shared_tm_store = SharedTmStore::new(Box::new(tm_pool)); + let tm_pool_shared = shared_tm_store.clone_backing_pool(); + let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel(); let (tm_tx, tm_rx) = mpsc::channel(); - let verif_sender = MpscTmInStoreSender::new( - 0, - "verif_sender", - shared_tm_store.backing_pool(), - tm_tx.clone(), - ); + let verif_sender = + MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tm_tx.clone()); let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let mut verification_handler = StdVerifReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); + let test_srv_tm_sender = MpscTmInStoreSender::new(0, "TEST_SENDER", shared_tm_store, tm_tx); + let test_srv_tc_receiver = MpscTcInStoreReceiver::new(0, "TEST_RECEIVER", test_srv_tc_rx); let mut pus_17_handler = PusService17TestHandler::new( - test_srv_rx, + Box::new(test_srv_tc_receiver), tc_pool_shared.clone(), - tm_tx, - shared_tm_store, + Box::new(test_srv_tm_sender), TEST_APID, verification_handler.clone(), ); @@ -171,7 +164,7 @@ mod tests { let addr = tc_pool.add(&pus_buf[..tc_size]).unwrap(); drop(tc_pool); // Send accepted TC to test service handler. - test_srv_tx.send((addr, token)).unwrap(); + test_srv_tc_tx.send((addr, token.into())).unwrap(); let result = pus_17_handler.handle_next_packet(); assert!(result.is_ok()); // We should see 4 replies in the TM queue now: Acceptance TM, Start TM, ping reply and diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index 445f10b..1f25c90 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -19,6 +19,7 @@ //! use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; //! use satrs_core::seq_count::SeqCountProviderSimple; //! use satrs_core::pus::MpscTmInStoreSender; +//! use satrs_core::tmtc::tm_helper::SharedTmStore; //! use spacepackets::ecss::PusPacket; //! use spacepackets::SpHeader; //! use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; @@ -28,9 +29,11 @@ //! const TEST_APID: u16 = 0x02; //! //! let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); -//! let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); +//! let tm_pool = LocalPool::new(pool_cfg.clone()); +//! let shared_tm_store = SharedTmStore::new(Box::new(tm_pool)); +//! let tm_store = shared_tm_store.clone_backing_pool(); //! let (verif_tx, verif_rx) = mpsc::channel(); -//! let sender = MpscTmInStoreSender::new(0, "Test Sender", shared_tm_pool.clone(), verif_tx); +//! let sender = MpscTmInStoreSender::new(0, "Test Sender", shared_tm_store, verif_tx); //! let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); //! let mut reporter = VerificationReporterWithSender::new(&cfg , Box::new(sender)); //! @@ -51,7 +54,7 @@ //! let addr = verif_rx.recv_timeout(Duration::from_millis(10)).unwrap(); //! let tm_len; //! { -//! let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); +//! let mut rg = tm_store.write().expect("Error locking shared pool"); //! let store_guard = rg.read_with_guard(addr); //! let slice = store_guard.read().expect("Error reading TM slice"); //! tm_len = slice.len(); @@ -73,9 +76,7 @@ //! The [integration test](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/fsrc-core/tests/verification_test.rs) //! for the verification module contains examples how this module could be used in a more complex //! context involving multiple threads -use crate::pus::{ - source_buffer_large_enough, EcssTmSenderCore, EcssTmtcError, EcssTmtcErrorWithSend, -}; +use crate::pus::{source_buffer_large_enough, EcssTmSenderCore, EcssTmtcError}; use core::fmt::{Debug, Display, Formatter}; use core::hash::{Hash, Hasher}; use core::marker::PhantomData; @@ -84,7 +85,7 @@ use core::mem::size_of; use delegate::delegate; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use spacepackets::ecss::{EcssEnumeration, SerializablePusPacket}; +use spacepackets::ecss::{EcssEnumeration, PusError, SerializablePusPacket}; use spacepackets::tc::PusTc; use spacepackets::tm::{PusTm, PusTmSecondaryHeader}; use spacepackets::{CcsdsPacket, PacketId, PacketSequenceCtrl}; @@ -174,17 +175,14 @@ impl RequestId { /// If a verification operation fails, the passed token will be returned as well. This allows /// re-trying the operation at a later point. #[derive(Debug, Clone)] -pub struct VerificationOrSendErrorWithToken( - pub EcssTmtcErrorWithSend, - pub VerificationToken, -); +pub struct VerificationOrSendErrorWithToken(pub EcssTmtcError, pub VerificationToken); #[derive(Debug, Clone)] pub struct VerificationErrorWithToken(pub EcssTmtcError, pub VerificationToken); -impl From> for VerificationOrSendErrorWithToken { +impl From> for VerificationOrSendErrorWithToken { fn from(value: VerificationErrorWithToken) -> Self { - VerificationOrSendErrorWithToken(value.0.into(), value.1) + VerificationOrSendErrorWithToken(value.0, value.1) } } /// Support token to allow type-state programming. This prevents calling the verification @@ -210,7 +208,7 @@ impl WasAtLeastAccepted for TcStateAccepted {} impl WasAtLeastAccepted for TcStateStarted {} impl WasAtLeastAccepted for TcStateCompleted {} -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum TcStateToken { None(VerificationToken), Accepted(VerificationToken), @@ -235,6 +233,19 @@ impl TryFrom for VerificationToken { } } } + +impl TryFrom for VerificationToken { + type Error = (); + + fn try_from(value: TcStateToken) -> Result { + if let TcStateToken::Started(token) = value { + Ok(token) + } else { + Err(()) + } + } +} + impl From> for TcStateToken { fn from(t: VerificationToken) -> Self { TcStateToken::Accepted(t) @@ -514,36 +525,26 @@ impl VerificationReporterCore { ) } - pub fn send_acceptance_success( + pub fn send_acceptance_success( &self, mut sendable: VerificationSendable<'_, TcStateNone, VerifSuccess>, - sender: &mut (impl EcssTmSenderCore + ?Sized), - ) -> Result, VerificationOrSendErrorWithToken> + sender: &mut (impl EcssTmSenderCore + ?Sized), + ) -> Result, VerificationOrSendErrorWithToken> { sender .send_tm(sendable.pus_tm.take().unwrap().into()) - .map_err(|e| { - VerificationOrSendErrorWithToken( - EcssTmtcErrorWithSend::SendError(e), - sendable.token.unwrap(), - ) - })?; + .map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; Ok(sendable.send_success_acceptance_success()) } - pub fn send_acceptance_failure( + pub fn send_acceptance_failure( &self, mut sendable: VerificationSendable<'_, TcStateNone, VerifFailure>, - sender: &mut (impl EcssTmSenderCore + ?Sized), - ) -> Result<(), VerificationOrSendErrorWithToken> { + sender: &mut (impl EcssTmSenderCore + ?Sized), + ) -> Result<(), VerificationOrSendErrorWithToken> { sender .send_tm(sendable.pus_tm.take().unwrap().into()) - .map_err(|e| { - VerificationOrSendErrorWithToken( - EcssTmtcErrorWithSend::SendError(e), - sendable.token.unwrap(), - ) - })?; + .map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; sendable.send_success_verif_failure(); Ok(()) } @@ -595,22 +596,15 @@ impl VerificationReporterCore { ) } - pub fn send_start_success( + pub fn send_start_success( &self, mut sendable: VerificationSendable<'_, TcStateAccepted, VerifSuccess>, - sender: &mut (impl EcssTmSenderCore + ?Sized), - ) -> Result< - VerificationToken, - VerificationOrSendErrorWithToken, - > { + sender: &mut (impl EcssTmSenderCore + ?Sized), + ) -> Result, VerificationOrSendErrorWithToken> + { sender .send_tm(sendable.pus_tm.take().unwrap().into()) - .map_err(|e| { - VerificationOrSendErrorWithToken( - EcssTmtcErrorWithSend::SendError(e), - sendable.token.unwrap(), - ) - })?; + .map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; Ok(sendable.send_success_start_success()) } @@ -640,19 +634,14 @@ impl VerificationReporterCore { ) } - pub fn send_start_failure( + pub fn send_start_failure( &self, mut sendable: VerificationSendable<'_, TcStateAccepted, VerifFailure>, - sender: &mut (impl EcssTmSenderCore + ?Sized), - ) -> Result<(), VerificationOrSendErrorWithToken> { + sender: &mut (impl EcssTmSenderCore + ?Sized), + ) -> Result<(), VerificationOrSendErrorWithToken> { sender .send_tm(sendable.pus_tm.take().unwrap().into()) - .map_err(|e| { - VerificationOrSendErrorWithToken( - EcssTmtcErrorWithSend::SendError(e), - sendable.token.unwrap(), - ) - })?; + .map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; sendable.send_success_verif_failure(); Ok(()) } @@ -763,36 +752,26 @@ impl VerificationReporterCore { ) } - pub fn send_step_or_completion_success( + pub fn send_step_or_completion_success( &self, mut sendable: VerificationSendable<'_, TcState, VerifSuccess>, - sender: &mut (impl EcssTmSenderCore + ?Sized), - ) -> Result<(), VerificationOrSendErrorWithToken> { + sender: &mut (impl EcssTmSenderCore + ?Sized), + ) -> Result<(), VerificationOrSendErrorWithToken> { sender .send_tm(sendable.pus_tm.take().unwrap().into()) - .map_err(|e| { - VerificationOrSendErrorWithToken( - EcssTmtcErrorWithSend::SendError(e), - sendable.token.unwrap(), - ) - })?; + .map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; sendable.send_success_step_or_completion_success(); Ok(()) } - pub fn send_step_or_completion_failure( + pub fn send_step_or_completion_failure( &self, mut sendable: VerificationSendable<'_, TcState, VerifFailure>, - sender: &mut (impl EcssTmSenderCore + ?Sized), - ) -> Result<(), VerificationOrSendErrorWithToken> { + sender: &mut (impl EcssTmSenderCore + ?Sized), + ) -> Result<(), VerificationOrSendErrorWithToken> { sender .send_tm(sendable.pus_tm.take().unwrap().into()) - .map_err(|e| { - VerificationOrSendErrorWithToken( - EcssTmtcErrorWithSend::SendError(e), - sendable.token.unwrap(), - ) - })?; + .map_err(|e| VerificationOrSendErrorWithToken(e, sendable.token.unwrap()))?; sendable.send_success_verif_failure(); Ok(()) } @@ -864,7 +843,8 @@ impl VerificationReporterCore { } params .failure_code - .write_to_be_bytes(&mut src_data_buf[idx..idx + params.failure_code.size()])?; + .write_to_be_bytes(&mut src_data_buf[idx..idx + params.failure_code.size()]) + .map_err(PusError::ByteConversion)?; idx += params.failure_code.size(); if let Some(failure_data) = params.failure_data { src_data_buf[idx..idx + failure_data.len()].copy_from_slice(failure_data); @@ -981,15 +961,13 @@ mod alloc_mod { } /// Package and send a PUS TM\[1, 1\] packet, see 8.1.2.1 of the PUS standard - pub fn acceptance_success( + pub fn acceptance_success( &mut self, token: VerificationToken, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: Option<&[u8]>, - ) -> Result< - VerificationToken, - VerificationOrSendErrorWithToken, - > { + ) -> Result, VerificationOrSendErrorWithToken> + { let seq_count = self .seq_count_provider .as_ref() @@ -1009,12 +987,12 @@ mod alloc_mod { } /// Package and send a PUS TM\[1, 2\] packet, see 8.1.2.2 of the PUS standard - pub fn acceptance_failure( + pub fn acceptance_failure( &mut self, token: VerificationToken, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), params: FailParams, - ) -> Result<(), VerificationOrSendErrorWithToken> { + ) -> Result<(), VerificationOrSendErrorWithToken> { let seq_count = self .seq_count_provider .as_ref() @@ -1036,14 +1014,14 @@ mod alloc_mod { /// Package and send a PUS TM\[1, 3\] packet, see 8.1.2.3 of the PUS standard. /// /// Requires a token previously acquired by calling [Self::acceptance_success]. - pub fn start_success( + pub fn start_success( &mut self, token: VerificationToken, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: Option<&[u8]>, ) -> Result< VerificationToken, - VerificationOrSendErrorWithToken, + VerificationOrSendErrorWithToken, > { let seq_count = self .seq_count_provider @@ -1067,12 +1045,12 @@ mod alloc_mod { /// /// Requires a token previously acquired by calling [Self::acceptance_success]. It consumes /// the token because verification handling is done. - pub fn start_failure( + pub fn start_failure( &mut self, token: VerificationToken, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), params: FailParams, - ) -> Result<(), VerificationOrSendErrorWithToken> { + ) -> Result<(), VerificationOrSendErrorWithToken> { let seq_count = self .seq_count_provider .as_ref() @@ -1094,13 +1072,13 @@ mod alloc_mod { /// Package and send a PUS TM\[1, 5\] packet, see 8.1.2.5 of the PUS standard. /// /// Requires a token previously acquired by calling [Self::start_success]. - pub fn step_success( + pub fn step_success( &mut self, token: &VerificationToken, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: Option<&[u8]>, step: impl EcssEnumeration, - ) -> Result<(), EcssTmtcErrorWithSend> { + ) -> Result<(), EcssTmtcError> { let seq_count = self .seq_count_provider .as_ref() @@ -1126,12 +1104,12 @@ mod alloc_mod { /// /// Requires a token previously acquired by calling [Self::start_success]. It consumes the /// token because verification handling is done. - pub fn step_failure( + pub fn step_failure( &mut self, token: VerificationToken, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), params: FailParamsWithStep, - ) -> Result<(), VerificationOrSendErrorWithToken> { + ) -> Result<(), VerificationOrSendErrorWithToken> { let seq_count = self .seq_count_provider .as_ref() @@ -1155,12 +1133,12 @@ mod alloc_mod { /// /// Requires a token previously acquired by calling [Self::start_success]. It consumes the /// token because verification handling is done. - pub fn completion_success( + pub fn completion_success( &mut self, token: VerificationToken, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: Option<&[u8]>, - ) -> Result<(), VerificationOrSendErrorWithToken> { + ) -> Result<(), VerificationOrSendErrorWithToken> { let seq_count = self .seq_count_provider .as_ref() @@ -1184,12 +1162,12 @@ mod alloc_mod { /// /// Requires a token previously acquired by calling [Self::start_success]. It consumes the /// token because verification handling is done. - pub fn completion_failure( + pub fn completion_failure( &mut self, token: VerificationToken, - sender: &mut (impl EcssTmSenderCore + ?Sized), + sender: &mut (impl EcssTmSenderCore + ?Sized), params: FailParams, - ) -> Result<(), VerificationOrSendErrorWithToken> { + ) -> Result<(), VerificationOrSendErrorWithToken> { let seq_count = self .seq_count_provider .as_ref() @@ -1213,23 +1191,20 @@ mod alloc_mod { /// Helper object which caches the sender passed as a trait object. Provides the same /// API as [VerificationReporter] but without the explicit sender arguments. #[derive(Clone)] - pub struct VerificationReporterWithSender { + pub struct VerificationReporterWithSender { pub reporter: VerificationReporter, - pub sender: Box>, + pub sender: Box, } - impl VerificationReporterWithSender { - pub fn new( - cfg: &VerificationReporterCfg, - sender: Box>, - ) -> Self { + impl VerificationReporterWithSender { + pub fn new(cfg: &VerificationReporterCfg, sender: Box) -> Self { let reporter = VerificationReporter::new(cfg); Self::new_from_reporter(reporter, sender) } pub fn new_from_reporter( reporter: VerificationReporter, - sender: Box>, + sender: Box, ) -> Self { Self { reporter, sender } } @@ -1249,10 +1224,8 @@ mod alloc_mod { &mut self, token: VerificationToken, time_stamp: Option<&[u8]>, - ) -> Result< - VerificationToken, - VerificationOrSendErrorWithToken, - > { + ) -> Result, VerificationOrSendErrorWithToken> + { self.reporter .acceptance_success(token, self.sender.as_mut(), time_stamp) } @@ -1261,7 +1234,7 @@ mod alloc_mod { &mut self, token: VerificationToken, params: FailParams, - ) -> Result<(), VerificationOrSendErrorWithToken> { + ) -> Result<(), VerificationOrSendErrorWithToken> { self.reporter .acceptance_failure(token, self.sender.as_mut(), params) } @@ -1272,7 +1245,7 @@ mod alloc_mod { time_stamp: Option<&[u8]>, ) -> Result< VerificationToken, - VerificationOrSendErrorWithToken, + VerificationOrSendErrorWithToken, > { self.reporter .start_success(token, self.sender.as_mut(), time_stamp) @@ -1282,7 +1255,7 @@ mod alloc_mod { &mut self, token: VerificationToken, params: FailParams, - ) -> Result<(), VerificationOrSendErrorWithToken> { + ) -> Result<(), VerificationOrSendErrorWithToken> { self.reporter .start_failure(token, self.sender.as_mut(), params) } @@ -1292,7 +1265,7 @@ mod alloc_mod { token: &VerificationToken, time_stamp: Option<&[u8]>, step: impl EcssEnumeration, - ) -> Result<(), EcssTmtcErrorWithSend> { + ) -> Result<(), EcssTmtcError> { self.reporter .step_success(token, self.sender.as_mut(), time_stamp, step) } @@ -1301,7 +1274,7 @@ mod alloc_mod { &mut self, token: VerificationToken, params: FailParamsWithStep, - ) -> Result<(), VerificationOrSendErrorWithToken> { + ) -> Result<(), VerificationOrSendErrorWithToken> { self.reporter .step_failure(token, self.sender.as_mut(), params) } @@ -1310,7 +1283,7 @@ mod alloc_mod { &mut self, token: VerificationToken, time_stamp: Option<&[u8]>, - ) -> Result<(), VerificationOrSendErrorWithToken> { + ) -> Result<(), VerificationOrSendErrorWithToken> { self.reporter .completion_success(token, self.sender.as_mut(), time_stamp) } @@ -1319,7 +1292,7 @@ mod alloc_mod { &mut self, token: VerificationToken, params: FailParams, - ) -> Result<(), VerificationOrSendErrorWithToken> { + ) -> Result<(), VerificationOrSendErrorWithToken> { self.reporter .completion_failure(token, self.sender.as_mut(), params) } @@ -1329,7 +1302,6 @@ mod alloc_mod { #[cfg(feature = "std")] mod std_mod { use crate::pus::verification::VerificationReporterWithSender; - use crate::pus::MpscTmInStoreSenderError; use std::sync::{Arc, Mutex}; // use super::alloc_mod::VerificationReporterWithSender; @@ -1341,7 +1313,7 @@ mod std_mod { // use spacepackets::ecss::SerializablePusPacket; // use std::sync::{mpsc, Arc, Mutex, RwLockWriteGuard}; // - pub type StdVerifReporterWithSender = VerificationReporterWithSender; + pub type StdVerifReporterWithSender = VerificationReporterWithSender; pub type SharedStdVerifReporterWithSender = Arc>; // // trait SendBackend: Send { @@ -1522,25 +1494,26 @@ mod std_mod { #[cfg(test)] mod tests { - use crate::pool::{LocalPool, PoolCfg, SharedPool}; + use crate::pool::{LocalPool, PoolCfg}; use crate::pus::tests::CommonTmInfo; use crate::pus::verification::{ EcssTmSenderCore, EcssTmtcError, FailParams, FailParamsWithStep, RequestId, TcStateNone, VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender, VerificationToken, }; - use crate::pus::{EcssSender, EcssTmtcErrorWithSend, MpscTmInStoreSender, PusTmWrapper}; - use crate::SenderId; + use crate::pus::{EcssChannel, MpscTmInStoreSender, PusTmWrapper}; + use crate::tmtc::tm_helper::SharedTmStore; + use crate::ChannelId; use alloc::boxed::Box; use alloc::format; - use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU8, PusPacket}; + use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU8, PusError, PusPacket}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tm::PusTm; use spacepackets::util::UnsignedEnum; use spacepackets::{ByteConversionError, CcsdsPacket, SpHeader}; use std::cell::RefCell; use std::collections::VecDeque; - use std::sync::{mpsc, Arc, RwLock}; + use std::sync::mpsc; use std::time::Duration; use std::vec; use std::vec::Vec; @@ -1564,8 +1537,8 @@ mod tests { pub service_queue: RefCell>, } - impl EcssSender for TestSender { - fn id(&self) -> SenderId { + impl EcssChannel for TestSender { + fn id(&self) -> ChannelId { 0 } fn name(&self) -> &'static str { @@ -1574,9 +1547,7 @@ mod tests { } impl EcssTmSenderCore for TestSender { - type Error = (); - - fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error> { + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { match tm { PusTmWrapper::InStore(_) => { panic!("TestSender: Can not deal with addresses"); @@ -1607,23 +1578,6 @@ mod tests { } } - #[derive(Debug, Copy, Clone, Eq, PartialEq)] - struct DummyError {} - #[derive(Default, Clone)] - struct FallibleSender {} - - impl EcssSender for FallibleSender { - fn id(&self) -> SenderId { - 0 - } - } - impl EcssTmSenderCore for FallibleSender { - type Error = DummyError; - fn send_tm(&self, _: PusTmWrapper) -> Result<(), Self::Error> { - Err(DummyError {}) - } - } - struct TestBase<'a> { vr: VerificationReporter, #[allow(dead_code)] @@ -1635,13 +1589,13 @@ mod tests { &mut self.vr } } - struct TestBaseWithHelper<'a, E> { - helper: VerificationReporterWithSender, + struct TestBaseWithHelper<'a> { + helper: VerificationReporterWithSender, #[allow(dead_code)] tc: PusTc<'a>, } - impl<'a, E> TestBaseWithHelper<'a, E> { + impl<'a> TestBaseWithHelper<'a> { fn rep(&mut self) -> &mut VerificationReporter { &mut self.helper.reporter } @@ -1672,10 +1626,7 @@ mod tests { (TestBase { vr: reporter, tc }, init_tok) } - fn base_with_helper_init() -> ( - TestBaseWithHelper<'static, ()>, - VerificationToken, - ) { + fn base_with_helper_init() -> (TestBaseWithHelper<'static>, VerificationToken) { let mut reporter = base_reporter(); let (tc, _) = base_tc_init(None); let init_tok = reporter.add_tc(&tc); @@ -1705,9 +1656,10 @@ mod tests { #[test] fn test_mpsc_verif_send_sync() { let pool = LocalPool::new(PoolCfg::new(vec![(8, 8)])); - let shared_pool: SharedPool = Arc::new(RwLock::new(Box::new(pool))); + let tm_store = Box::new(pool); + let shared_tm_store = SharedTmStore::new(tm_store); let (tx, _) = mpsc::channel(); - let mpsc_verif_sender = MpscTmInStoreSender::new(0, "verif_sender", shared_pool, tx); + let mpsc_verif_sender = MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store, tx); is_send(&mpsc_verif_sender); } @@ -1738,23 +1690,6 @@ mod tests { acceptance_check(sender, &tok.req_id); } - #[test] - fn test_acceptance_send_fails() { - let (mut b, tok) = base_init(false); - let mut faulty_sender = FallibleSender::default(); - let res = - b.vr.acceptance_success(tok, &mut faulty_sender, Some(&EMPTY_STAMP)); - assert!(res.is_err()); - let err = res.unwrap_err(); - assert_eq!(err.1, tok); - match err.0 { - EcssTmtcErrorWithSend::SendError(e) => { - assert_eq!(e, DummyError {}) - } - _ => panic!("{}", format!("Unexpected error {:?}", err.0)), - } - } - fn acceptance_fail_check(sender: &mut TestSender, req_id: RequestId, stamp_buf: [u8; 7]) { let cmp_info = TmInfo { common: CommonTmInfo { @@ -1819,7 +1754,7 @@ mod tests { let err_with_token = res.unwrap_err(); assert_eq!(err_with_token.1, tok); match err_with_token.0 { - EcssTmtcErrorWithSend::EcssTmtcError(EcssTmtcError::ByteConversion(e)) => match e { + EcssTmtcError::Pus(PusError::ByteConversion(e)) => match e { ByteConversionError::ToSliceTooSmall(missmatch) => { assert_eq!( missmatch.expected, @@ -2388,11 +2323,11 @@ mod tests { // TODO: maybe a bit more extensive testing, all I have time for right now fn test_seq_count_increment() { let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); - let shared_tm_pool: SharedPool = - Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); + let tm_pool = Box::new(LocalPool::new(pool_cfg.clone())); + let shared_tm_store = SharedTmStore::new(tm_pool); + let shared_tm_pool = shared_tm_store.clone_backing_pool(); let (verif_tx, verif_rx) = mpsc::channel(); - let sender = - MpscTmInStoreSender::new(0, "Verification Sender", shared_tm_pool.clone(), verif_tx); + let sender = MpscTmInStoreSender::new(0, "Verification Sender", shared_tm_store, verif_tx); let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let mut reporter = VerificationReporterWithSender::new(&cfg, Box::new(sender)); diff --git a/satrs-core/src/tmtc/mod.rs b/satrs-core/src/tmtc/mod.rs index c2a0aa6..5d5f306 100644 --- a/satrs-core/src/tmtc/mod.rs +++ b/satrs-core/src/tmtc/mod.rs @@ -9,7 +9,6 @@ use downcast_rs::{impl_downcast, Downcast}; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use spacepackets::tc::PusTc; use spacepackets::{ByteConversionError, SizeMissmatch, SpHeader}; #[cfg(feature = "alloc")] @@ -93,11 +92,3 @@ pub trait ReceivesCcsdsTc { type Error; fn pass_ccsds(&mut self, header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error>; } - -/// Generic trait for objects which can receive ECSS PUS telecommands. This trait is -/// implemented by the [crate::tmtc::pus_distrib::PusDistributor] objects to allow passing PUS TC -/// packets into it. -pub trait ReceivesEcssPusTc { - type Error; - fn pass_pus_tc(&mut self, header: &SpHeader, pus_tc: &PusTc) -> Result<(), Self::Error>; -} diff --git a/satrs-core/src/tmtc/pus_distrib.rs b/satrs-core/src/tmtc/pus_distrib.rs index bf591da..b6a5eab 100644 --- a/satrs-core/src/tmtc/pus_distrib.rs +++ b/satrs-core/src/tmtc/pus_distrib.rs @@ -61,7 +61,8 @@ //! .expect("Casting back to concrete type failed"); //! assert_eq!(concrete_handler_ref.handler_call_count, 1); //! ``` -use crate::tmtc::{ReceivesCcsdsTc, ReceivesEcssPusTc, ReceivesTcCore}; +use crate::pus::ReceivesEcssPusTc; +use crate::tmtc::{ReceivesCcsdsTc, ReceivesTcCore}; use alloc::boxed::Box; use core::fmt::{Display, Formatter}; use downcast_rs::Downcast; @@ -132,7 +133,7 @@ impl ReceivesTcCore for PusDistributor { fn pass_tc(&mut self, tm_raw: &[u8]) -> Result<(), Self::Error> { // Convert to ccsds and call pass_ccsds let (sp_header, _) = SpHeader::from_be_bytes(tm_raw) - .map_err(|e| PusDistribError::PusError(PusError::ByteConversionError(e)))?; + .map_err(|e| PusDistribError::PusError(PusError::ByteConversion(e)))?; self.pass_ccsds(&sp_header, tm_raw) } } diff --git a/satrs-core/src/tmtc/tm_helper.rs b/satrs-core/src/tmtc/tm_helper.rs index 2ef099c..da2bc1e 100644 --- a/satrs-core/src/tmtc/tm_helper.rs +++ b/satrs-core/src/tmtc/tm_helper.rs @@ -8,9 +8,11 @@ pub use std_mod::*; #[cfg(feature = "std")] pub mod std_mod { - use crate::pool::{SharedPool, StoreAddr}; + use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr}; + use crate::pus::EcssTmtcError; use spacepackets::ecss::SerializablePusPacket; use spacepackets::tm::PusTm; + use std::sync::{Arc, RwLock}; #[derive(Clone)] pub struct SharedTmStore { @@ -18,21 +20,23 @@ pub mod std_mod { } impl SharedTmStore { - pub fn new(backing_pool: SharedPool) -> Self { - Self { pool: backing_pool } + pub fn new(backing_pool: ShareablePoolProvider) -> Self { + Self { + pool: Arc::new(RwLock::new(backing_pool)), + } } - pub fn backing_pool(&self) -> SharedPool { + pub fn clone_backing_pool(&self) -> SharedPool { self.pool.clone() } - pub fn add_pus_tm(&mut self, pus_tm: &PusTm) -> StoreAddr { - let mut pg = self.pool.write().expect("error locking TM store"); - let (addr, buf) = pg.free_element(pus_tm.len_packed()).expect("Store error"); + pub fn add_pus_tm(&self, pus_tm: &PusTm) -> Result { + let mut pg = self.pool.write().map_err(|_| EcssTmtcError::StoreLock)?; + let (addr, buf) = pg.free_element(pus_tm.len_packed())?; pus_tm .write_to_bytes(buf) .expect("writing PUS TM to store failed"); - addr + Ok(addr) } } } diff --git a/satrs-core/tests/pus_verification.rs b/satrs-core/tests/pus_verification.rs index 313bc84..a533dbc 100644 --- a/satrs-core/tests/pus_verification.rs +++ b/satrs-core/tests/pus_verification.rs @@ -3,11 +3,12 @@ #[cfg(feature = "crossbeam")] pub mod crossbeam_test { use hashbrown::HashMap; - use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; + use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider}; use satrs_core::pus::verification::{ FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender, }; use satrs_core::pus::MpscTmInStoreSender; + use satrs_core::tmtc::tm_helper::SharedTmStore; use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, SerializablePusPacket}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tm::PusTm; @@ -35,13 +36,12 @@ pub mod crossbeam_test { let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); // Shared pool object to store the verification PUS telemetry let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); - let shared_tm_pool: SharedPool = - Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); + let shared_tm_store = SharedTmStore::new(Box::new(LocalPool::new(pool_cfg.clone()))); let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); let shared_tc_pool_1 = shared_tc_pool_0.clone(); let (tx, rx) = mpsc::channel(); let sender = - MpscTmInStoreSender::new(0, "verif_sender", shared_tm_pool.clone(), tx.clone()); + MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tx.clone()); let mut reporter_with_sender_0 = VerificationReporterWithSender::new(&cfg, Box::new(sender)); let mut reporter_with_sender_1 = reporter_with_sender_0.clone(); @@ -146,8 +146,9 @@ pub mod crossbeam_test { .recv_timeout(Duration::from_millis(50)) .expect("Packet reception timeout"); let tm_len; + let shared_tm_store = shared_tm_store.clone_backing_pool(); { - let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); + let mut rg = shared_tm_store.write().expect("Error locking shared pool"); let store_guard = rg.read_with_guard(verif_addr); let slice = store_guard.read().expect("Error reading TM slice"); tm_len = slice.len(); diff --git a/satrs-example/src/lib.rs b/satrs-example/src/lib.rs index 754be9b..31fb63d 100644 --- a/satrs-example/src/lib.rs +++ b/satrs-example/src/lib.rs @@ -73,3 +73,24 @@ pub mod hk_err { #[resultcode] pub const COLLECTION_INTERVAL_MISSING: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 3); } + +#[allow(clippy::enum_variant_names)] +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum TmSenderId { + PusVerification = 0, + PusTest = 1, + PusEvent = 2, + PusHk = 3, + PusAction = 4, + PusSched = 5, + AllEvents = 6, +} + +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum TcReceiverId { + PusTest = 1, + PusEvent = 2, + PusHk = 3, + PusAction = 4, + PusSched = 5, +} diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 85346c3..e6a4241 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -32,8 +32,10 @@ use satrs_core::pus::hk::Subservice as HkSubservice; use satrs_core::pus::scheduler::PusScheduler; use satrs_core::pus::scheduler_srv::PusService11SchedHandler; use satrs_core::pus::test::PusService17TestHandler; -use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; -use satrs_core::pus::MpscTmInStoreSender; +use satrs_core::pus::verification::{ + TcStateStarted, VerificationReporterCfg, VerificationReporterWithSender, VerificationToken, +}; +use satrs_core::pus::{MpscTcInStoreReceiver, MpscTmInStoreSender}; use satrs_core::seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}; use satrs_core::spacepackets::tm::PusTmZeroCopyWriter; use satrs_core::spacepackets::{ @@ -44,7 +46,8 @@ use satrs_core::spacepackets::{ }; use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::{AddressableId, TargetId}; -use satrs_example::{RequestTargetId, OBSW_SERVER_ADDR, SERVER_PORT}; +use satrs_core::ChannelId; +use satrs_example::{RequestTargetId, TcReceiverId, TmSenderId, OBSW_SERVER_ADDR, SERVER_PORT}; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::mpsc::{channel, TryRecvError}; @@ -63,8 +66,8 @@ fn main() { (15, 1024), (15, 2048), ])); - let tm_store = SharedTmStore::new(Arc::new(RwLock::new(Box::new(tm_pool)))); - let tm_store_event = tm_store.clone(); + let shared_tm_store = SharedTmStore::new(Box::new(tm_pool)); + let tm_store_event = shared_tm_store.clone(); let tc_pool = LocalPool::new(PoolCfg::new(vec![ (30, 32), (15, 64), @@ -84,9 +87,9 @@ fn main() { let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel(); let verif_sender = MpscTmInStoreSender::new( - 0, + TmSenderId::PusVerification as ChannelId, "verif_sender", - tm_store.backing_pool(), + shared_tm_store.clone(), tm_funnel_tx.clone(), ); let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap(); @@ -134,13 +137,13 @@ fn main() { tc_receiver: tc_source_rx, }; let tm_args = TmArgs { - tm_store: tm_store.clone(), + tm_store: shared_tm_store.clone(), tm_sink_sender: tm_funnel_tx.clone(), tm_server_rx, }; let aocs_tm_funnel = tm_funnel_tx.clone(); - let mut aocs_tm_store = tm_store.clone(); + let aocs_tm_store = shared_tm_store.clone(); let (pus_test_tx, pus_test_rx) = channel(); let (pus_event_tx, pus_event_rx) = channel(); @@ -154,11 +157,21 @@ fn main() { hk_service_receiver: pus_hk_tx, action_service_receiver: pus_action_tx, }; - let pus17_handler = PusService17TestHandler::new( - pus_test_rx, - tc_store.pool.clone(), + let test_srv_tm_sender = MpscTmInStoreSender::new( + TmSenderId::PusTest as ChannelId, + "PUS_17_TM_SENDER", + shared_tm_store.clone(), tm_funnel_tx.clone(), - tm_store.clone(), + ); + let test_srv_receiver = MpscTcInStoreReceiver::new( + TcReceiverId::PusTest as ChannelId, + "PUS_17_TC_RECV", + pus_test_rx, + ); + let pus17_handler = PusService17TestHandler::new( + Box::new(test_srv_receiver), + tc_store.pool.clone(), + Box::new(test_srv_tm_sender), PUS_APID, verif_reporter.clone(), ); @@ -166,13 +179,24 @@ fn main() { pus17_handler, test_srv_event_sender, }; + + let sched_srv_tm_sender = MpscTmInStoreSender::new( + TmSenderId::PusSched as ChannelId, + "PUS_11_TM_SENDER", + shared_tm_store.clone(), + tm_funnel_tx.clone(), + ); + let sched_srv_receiver = MpscTcInStoreReceiver::new( + TcReceiverId::PusSched as ChannelId, + "PUS_11_TC_RECV", + pus_sched_rx, + ); let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) .expect("Creating PUS Scheduler failed"); let pus_11_handler = PusService11SchedHandler::new( - pus_sched_rx, + Box::new(sched_srv_receiver), tc_store.pool.clone(), - tm_funnel_tx.clone(), - tm_store.clone(), + Box::new(sched_srv_tm_sender), PUS_APID, verif_reporter.clone(), scheduler, @@ -181,33 +205,61 @@ fn main() { pus_11_handler, tc_source_wrapper, }; - let pus_5_handler = PusService5EventHandler::new( - pus_event_rx, - tc_store.pool.clone(), + + let event_srv_tm_sender = MpscTmInStoreSender::new( + TmSenderId::PusEvent as ChannelId, + "PUS_5_TM_SENDER", + shared_tm_store.clone(), tm_funnel_tx.clone(), - tm_store.clone(), + ); + let event_srv_receiver = MpscTcInStoreReceiver::new( + TcReceiverId::PusEvent as ChannelId, + "PUS_5_TC_RECV", + pus_event_rx, + ); + let pus_5_handler = PusService5EventHandler::new( + Box::new(event_srv_receiver), + tc_store.pool.clone(), + Box::new(event_srv_tm_sender), PUS_APID, verif_reporter.clone(), event_request_tx, ); let mut pus_5_wrapper = Pus5Wrapper { pus_5_handler }; - let pus_8_handler = PusService8ActionHandler::new( - pus_action_rx, - tc_store.pool.clone(), + let action_srv_tm_sender = MpscTmInStoreSender::new( + TmSenderId::PusAction as ChannelId, + "PUS_8_TM_SENDER", + shared_tm_store.clone(), tm_funnel_tx.clone(), - tm_store.clone(), + ); + let action_srv_receiver = MpscTcInStoreReceiver::new( + TcReceiverId::PusAction as ChannelId, + "PUS_8_TC_RECV", + pus_action_rx, + ); + let pus_8_handler = PusService8ActionHandler::new( + Box::new(action_srv_receiver), + tc_store.pool.clone(), + Box::new(action_srv_tm_sender), PUS_APID, verif_reporter.clone(), request_map.clone(), ); let mut pus_8_wrapper = Pus8Wrapper { pus_8_handler }; - let pus_3_handler = PusService3HkHandler::new( - pus_hk_rx, - tc_store.pool.clone(), + let hk_srv_tm_sender = MpscTmInStoreSender::new( + TmSenderId::PusHk as ChannelId, + "PUS_3_TM_SENDER", + shared_tm_store.clone(), tm_funnel_tx.clone(), - tm_store.clone(), + ); + let hk_srv_receiver = + MpscTcInStoreReceiver::new(TcReceiverId::PusHk as ChannelId, "PUS_8_TC_RECV", pus_hk_rx); + let pus_3_handler = PusService3HkHandler::new( + Box::new(hk_srv_receiver), + tc_store.pool.clone(), + Box::new(hk_srv_tm_sender), PUS_APID, verif_reporter.clone(), request_map, @@ -234,7 +286,7 @@ fn main() { if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() { // Read the TM, set sequence counter and message counter, and finally update // the CRC. - let shared_pool = tm_store.backing_pool(); + let shared_pool = shared_tm_store.clone_backing_pool(); let mut pool_guard = shared_pool.write().expect("Locking TM pool failed"); let tm_raw = pool_guard .modify(&addr) @@ -270,15 +322,19 @@ fn main() { .spawn(move || { let mut timestamp: [u8; 7] = [0; 7]; let mut sender = MpscTmInStoreSender::new( - 1, - "event_sender", - tm_store_event.backing_pool(), + TmSenderId::AllEvents as ChannelId, + "ALL_EVENTS_TX", + tm_store_event.clone(), tm_funnel_tx, ); let mut time_provider = TimeProvider::new_with_u16_days(0, 0); let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { + let started_token: VerificationToken = event_req + .token + .try_into() + .expect("expected start verification token"); reporter_event_handler - .completion_success(event_req.token.try_into().unwrap(), Some(timestamp)) + .completion_success(started_token, Some(timestamp)) .expect("Sending completion success failed"); }; loop { @@ -365,7 +421,9 @@ fn main() { Some(&buf), true, ); - let addr = aocs_tm_store.add_pus_tm(&pus_tm); + let addr = aocs_tm_store + .add_pus_tm(&pus_tm) + .expect("Adding PUS TM failed"); aocs_tm_funnel.send(addr).expect("Sending HK TM failed"); } } diff --git a/satrs-example/src/pus/action.rs b/satrs-example/src/pus/action.rs index 0975c79..4dd3e55 100644 --- a/satrs-example/src/pus/action.rs +++ b/satrs-example/src/pus/action.rs @@ -5,15 +5,15 @@ use satrs_core::pus::verification::{ FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; use satrs_core::pus::{ - AcceptedTc, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHandler, + EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, + PusServiceHandler, }; use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::spacepackets::tc::PusTc; -use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::TargetId; use satrs_example::tmtc_err; use std::collections::HashMap; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::Sender; pub struct PusService8ActionHandler { psb: PusServiceBase, @@ -22,20 +22,18 @@ pub struct PusService8ActionHandler { impl PusService8ActionHandler { pub fn new( - receiver: Receiver, - tc_pool: SharedPool, - tm_tx: Sender, - tm_store: SharedTmStore, + tc_receiver: Box, + shared_tc_pool: SharedPool, + tm_sender: Box, tm_apid: u16, verification_handler: StdVerifReporterWithSender, request_handlers: HashMap>, ) -> Self { Self { psb: PusServiceBase::new( - receiver, - tc_pool, - tm_tx, - tm_store, + tc_receiver, + shared_tc_pool, + tm_sender, tm_apid, verification_handler, ), @@ -94,7 +92,7 @@ impl PusService8ActionHandler { ), ) .expect("Sending start failure failed"); - return Err(PusPacketHandlingError::OtherError(format!( + return Err(PusPacketHandlingError::Other(format!( "Unknown target ID {target_id}" ))); } diff --git a/satrs-example/src/pus/hk.rs b/satrs-example/src/pus/hk.rs index 56e0b4b..b7e70c2 100644 --- a/satrs-example/src/pus/hk.rs +++ b/satrs-example/src/pus/hk.rs @@ -6,15 +6,15 @@ use satrs_core::pus::verification::{ FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; use satrs_core::pus::{ - AcceptedTc, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHandler, + EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, + PusServiceHandler, }; use satrs_core::spacepackets::ecss::{hk, PusPacket}; use satrs_core::spacepackets::tc::PusTc; -use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::{AddressableId, TargetId}; use satrs_example::{hk_err, tmtc_err}; use std::collections::HashMap; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::Sender; pub struct PusService3HkHandler { psb: PusServiceBase, @@ -23,20 +23,18 @@ pub struct PusService3HkHandler { impl PusService3HkHandler { pub fn new( - receiver: Receiver, - tc_pool: SharedPool, - tm_tx: Sender, - tm_store: SharedTmStore, + tc_receiver: Box, + shared_tc_pool: SharedPool, + tm_sender: Box, tm_apid: u16, verification_handler: StdVerifReporterWithSender, request_handlers: HashMap>, ) -> Self { Self { psb: PusServiceBase::new( - receiver, - tc_pool, - tm_tx, - tm_store, + tc_receiver, + shared_tc_pool, + tm_sender, tm_apid, verification_handler, ), diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index 0a879f8..d8b1c5b 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -2,7 +2,7 @@ use crate::tmtc::MpscStoreAndSendError; use log::warn; use satrs_core::pool::StoreAddr; use satrs_core::pus::verification::{FailParams, StdVerifReporterWithSender}; -use satrs_core::pus::{AcceptedTc, PusPacketHandlerResult}; +use satrs_core::pus::{PusPacketHandlerResult, TcAddrWithToken}; use satrs_core::spacepackets::ecss::PusServiceId; use satrs_core::spacepackets::tc::PusTc; use satrs_core::spacepackets::time::cds::TimeProvider; @@ -17,11 +17,11 @@ pub mod scheduler; pub mod test; pub struct PusTcMpscRouter { - pub test_service_receiver: Sender, - pub event_service_receiver: Sender, - pub sched_service_receiver: Sender, - pub hk_service_receiver: Sender, - pub action_service_receiver: Sender, + pub test_service_receiver: Sender, + pub event_service_receiver: Sender, + pub sched_service_receiver: Sender, + pub hk_service_receiver: Sender, + pub action_service_receiver: Sender, } pub struct PusReceiver { @@ -86,20 +86,20 @@ impl PusReceiver { PusServiceId::Test => { self.pus_router .test_service_receiver - .send((store_addr, accepted_token))?; + .send((store_addr, accepted_token.into()))?; } PusServiceId::Housekeeping => self .pus_router .hk_service_receiver - .send((store_addr, accepted_token))?, + .send((store_addr, accepted_token.into()))?, PusServiceId::Event => self .pus_router .event_service_receiver - .send((store_addr, accepted_token))?, + .send((store_addr, accepted_token.into()))?, PusServiceId::Scheduling => self .pus_router .sched_service_receiver - .send((store_addr, accepted_token))?, + .send((store_addr, accepted_token.into()))?, _ => { let result = self.verif_reporter.start_failure( accepted_token, diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 864b40b..2a18dd3 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -10,12 +10,12 @@ use crate::ccsds::CcsdsReceiver; use crate::pus::{PusReceiver, PusTcMpscRouter}; use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; use satrs_core::pus::verification::StdVerifReporterWithSender; -use satrs_core::pus::AcceptedTc; +use satrs_core::pus::{ReceivesEcssPusTc, TcAddrWithToken}; use satrs_core::spacepackets::ecss::{PusPacket, SerializablePusPacket}; use satrs_core::spacepackets::tc::PusTc; use satrs_core::spacepackets::SpHeader; use satrs_core::tmtc::tm_helper::SharedTmStore; -use satrs_core::tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, ReceivesEcssPusTc}; +use satrs_core::tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc}; pub const PUS_APID: u16 = 0x02; @@ -42,7 +42,7 @@ pub enum MpscStoreAndSendError { #[error("Store error: {0}")] Store(#[from] StoreError), #[error("TC send error: {0}")] - TcSend(#[from] SendError), + TcSend(#[from] SendError), #[error("TMTC send error: {0}")] TmTcSend(#[from] SendError), } @@ -123,7 +123,7 @@ pub fn core_tmtc_task( let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, tm_rx: tm_args.tm_server_rx, - tm_store: tm_args.tm_store.backing_pool(), + tm_store: tm_args.tm_store.clone_backing_pool(), }; let mut tc_buf: [u8; 4096] = [0; 4096];