diff --git a/satrs-book/src/events.md b/satrs-book/src/events.md index 95c5df2..01dbb59 100644 --- a/satrs-book/src/events.md +++ b/satrs-book/src/events.md @@ -13,4 +13,4 @@ event components recommended by this framework do not really need this service. The following images shows how the flow of events could look like in a system where components can generate events, and where other system components might be interested in those events: -![Event flow](../../images/events/event_man_arch.png) +![Event flow](images/events/event_man_arch.png) diff --git a/satrs-book/src/images/events/event_man_arch.png b/satrs-book/src/images/events/event_man_arch.png new file mode 100644 index 0000000..6920a18 Binary files /dev/null and b/satrs-book/src/images/events/event_man_arch.png differ diff --git a/satrs-core/src/pus/event_srv.rs b/satrs-core/src/pus/event_srv.rs index 5f423f0..1d6bbd1 100644 --- a/satrs-core/src/pus/event_srv.rs +++ b/satrs-core/src/pus/event_srv.rs @@ -1,67 +1,60 @@ use crate::events::EventU32; -use crate::pool::{SharedPool, StoreAddr}; use crate::pus::event_man::{EventRequest, EventRequestWithToken}; -use crate::pus::verification::{ - StdVerifReporterWithSender, TcStateAccepted, TcStateToken, VerificationToken, -}; +use crate::pus::verification::TcStateToken; use crate::pus::{ EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult, - PusPacketHandlingError, PusServiceBase, PusServiceHandler, + PusPacketHandlingError, }; use alloc::boxed::Box; use spacepackets::ecss::event::Subservice; -use spacepackets::ecss::tc::PusTcReader; use spacepackets::ecss::PusPacket; use std::sync::mpsc::Sender; -pub struct PusService5EventHandler { - psb: PusServiceBase, +use super::verification::VerificationReporterWithSender; +use super::{EcssTcInMemConverter, PusServiceBase, PusServiceHandler}; + +pub struct PusService5EventHandler { + pub psb: PusServiceHandler, event_request_tx: Sender, } -impl PusService5EventHandler { +impl PusService5EventHandler { pub fn new( tc_receiver: Box, - shared_tc_store: SharedPool, tm_sender: Box, tm_apid: u16, - verification_handler: StdVerifReporterWithSender, + verification_handler: VerificationReporterWithSender, + tc_in_mem_converter: TcInMemConverter, event_request_tx: Sender, ) -> Self { Self { - psb: PusServiceBase::new( + psb: PusServiceHandler::new( tc_receiver, - shared_tc_store, tm_sender, tm_apid, verification_handler, + tc_in_mem_converter, ), event_request_tx, } } -} -impl PusServiceHandler for PusService5EventHandler { - fn psb_mut(&mut self) -> &mut PusServiceBase { - &mut self.psb - } - fn psb(&self) -> &PusServiceBase { - &self.psb - } - - fn handle_one_tc( - &mut self, - addr: StoreAddr, - token: VerificationToken, - ) -> Result { - self.copy_tc_to_buf(addr)?; - let (tc, _) = PusTcReader::new(&self.psb.pus_buf)?; + pub fn handle_one_tc(&mut self) -> Result { + let possible_packet = self.psb.retrieve_and_accept_next_packet()?; + if possible_packet.is_none() { + return Ok(PusPacketHandlerResult::Empty); + } + let ecss_tc_and_token = possible_packet.unwrap(); + let tc = self + .psb + .tc_in_mem_converter + .convert_ecss_tc_in_memory_to_reader(&ecss_tc_and_token)?; let subservice = tc.subservice(); let srv = Subservice::try_from(subservice); if srv.is_err() { return Ok(PusPacketHandlerResult::CustomSubservice( tc.subservice(), - token, + ecss_tc_and_token.token, )); } let handle_enable_disable_request = |enable: bool, stamp: [u8; 7]| { @@ -74,12 +67,13 @@ impl PusServiceHandler for PusService5EventHandler { let event_u32 = EventU32::from(u32::from_be_bytes(user_data[0..4].try_into().unwrap())); let start_token = self .psb + .common .verification_handler .borrow_mut() - .start_success(token, Some(&stamp)) + .start_success(ecss_tc_and_token.token, Some(&stamp)) .map_err(|_| PartialPusHandlingError::Verification); let partial_error = start_token.clone().err(); - let mut token: TcStateToken = token.into(); + let mut token: TcStateToken = ecss_tc_and_token.token.into(); if let Ok(start_token) = start_token { token = start_token.into(); } @@ -107,7 +101,7 @@ impl PusServiceHandler for PusService5EventHandler { Ok(PusPacketHandlerResult::RequestHandled) }; let mut partial_error = None; - let time_stamp = self.psb().get_current_timestamp(&mut partial_error); + let time_stamp = PusServiceBase::get_current_timestamp(&mut partial_error); match srv.unwrap() { Subservice::TmInfoReport | Subservice::TmLowSeverityReport @@ -123,7 +117,8 @@ impl PusServiceHandler for PusService5EventHandler { } Subservice::TcReportDisabledList | Subservice::TmDisabledEventsReport => { return Ok(PusPacketHandlerResult::SubserviceNotImplemented( - subservice, token, + subservice, + ecss_tc_and_token.token, )); } } diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index cc05a92..cb28562 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -55,12 +55,6 @@ impl<'tm> From> for PusTmWrapper<'tm> { } } -pub type TcAddrWithToken = (StoreAddr, TcStateToken); - -/// Generic abstraction for a telecommand being sent around after is has been accepted. -/// The actual telecommand is stored inside a pre-allocated pool structure. -pub type AcceptedTc = (StoreAddr, VerificationToken); - /// Generic error type for sending something via a message queue. #[derive(Debug, Copy, Clone)] pub enum GenericSendError { @@ -200,11 +194,71 @@ pub trait EcssTcSenderCore: EcssChannel { fn send_tc(&self, tc: PusTcCreator, token: Option) -> Result<(), EcssTmtcError>; } -pub struct ReceivedTcWrapper { - pub store_addr: StoreAddr, +#[non_exhaustive] +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum TcInMemory { + StoreAddr(StoreAddr), + #[cfg(feature = "alloc")] + Vec(alloc::vec::Vec), +} + +impl From for TcInMemory { + fn from(value: StoreAddr) -> Self { + Self::StoreAddr(value) + } +} + +#[cfg(feature = "alloc")] +impl From> for TcInMemory { + fn from(value: alloc::vec::Vec) -> Self { + Self::Vec(value) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct EcssTcAndToken { + pub tc_in_memory: TcInMemory, pub token: Option, } +impl EcssTcAndToken { + pub fn new(tc_in_memory: impl Into, token: impl Into) -> Self { + Self { + tc_in_memory: tc_in_memory.into(), + token: Some(token.into()), + } + } +} + +/// Generic abstraction for a telecommand being sent around after is has been accepted. +pub struct AcceptedEcssTcAndToken { + pub tc_in_memory: TcInMemory, + pub token: VerificationToken, +} + +impl From for EcssTcAndToken { + fn from(value: AcceptedEcssTcAndToken) -> Self { + EcssTcAndToken { + tc_in_memory: value.tc_in_memory, + token: Some(value.token.into()), + } + } +} + +impl TryFrom for AcceptedEcssTcAndToken { + type Error = (); + + fn try_from(value: EcssTcAndToken) -> Result { + if let Some(TcStateToken::Accepted(token)) = value.token { + return Ok(AcceptedEcssTcAndToken { + tc_in_memory: value.tc_in_memory, + token, + }); + } + Err(()) + } +} + #[derive(Debug, Clone)] pub enum TryRecvTmtcError { Error(EcssTmtcError), @@ -231,7 +285,7 @@ impl From for TryRecvTmtcError { /// Generic trait for a user supplied receiver object. pub trait EcssTcReceiverCore: EcssChannel { - fn recv_tc(&self) -> Result; + fn recv_tc(&self) -> Result; } /// Generic trait for objects which can receive ECSS PUS telecommands. This trait is @@ -315,15 +369,16 @@ pub mod std_mod { StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; use crate::pus::{ - EcssChannel, EcssTcReceiver, EcssTcReceiverCore, EcssTmSender, EcssTmSenderCore, - EcssTmtcError, GenericRecvError, GenericSendError, PusTmWrapper, ReceivedTcWrapper, - TcAddrWithToken, TryRecvTmtcError, + EcssChannel, EcssTcAndToken, EcssTcReceiver, EcssTcReceiverCore, EcssTmSender, + EcssTmSenderCore, EcssTmtcError, GenericRecvError, GenericSendError, PusTmWrapper, + TryRecvTmtcError, }; use crate::tmtc::tm_helper::SharedTmStore; use crate::ChannelId; use alloc::boxed::Box; use alloc::vec::Vec; use crossbeam_channel as cb; + use spacepackets::ecss::tc::PusTcReader; use spacepackets::ecss::tm::PusTmCreator; use spacepackets::ecss::PusError; use spacepackets::time::cds::TimeProvider; @@ -335,6 +390,9 @@ pub mod std_mod { use std::sync::mpsc::TryRecvError; use thiserror::Error; + use super::verification::VerificationReporterWithSender; + use super::{AcceptedEcssTcAndToken, TcInMemory}; + impl From> for EcssTmtcError { fn from(_: mpsc::SendError) -> Self { Self::Send(GenericSendError::RxDisconnected) @@ -414,7 +472,7 @@ pub mod std_mod { pub struct MpscTcInStoreReceiver { id: ChannelId, name: &'static str, - receiver: mpsc::Receiver, + receiver: mpsc::Receiver, } impl EcssChannel for MpscTcInStoreReceiver { @@ -428,16 +486,12 @@ pub mod std_mod { } impl EcssTcReceiverCore for MpscTcInStoreReceiver { - fn recv_tc(&self) -> Result { - let (store_addr, token) = self.receiver.try_recv().map_err(|e| match e { + fn recv_tc(&self) -> Result { + self.receiver.try_recv().map_err(|e| match e { TryRecvError::Empty => TryRecvTmtcError::Empty, TryRecvError::Disconnected => { TryRecvTmtcError::Error(EcssTmtcError::from(GenericRecvError::TxDisconnected)) } - })?; - Ok(ReceivedTcWrapper { - store_addr, - token: Some(token), }) } } @@ -446,7 +500,7 @@ pub mod std_mod { pub fn new( id: ChannelId, name: &'static str, - receiver: mpsc::Receiver, + receiver: mpsc::Receiver, ) -> Self { Self { id, name, receiver } } @@ -548,14 +602,14 @@ pub mod std_mod { pub struct CrossbeamTcInStoreReceiver { id: ChannelId, name: &'static str, - receiver: cb::Receiver, + receiver: cb::Receiver, } impl CrossbeamTcInStoreReceiver { pub fn new( id: ChannelId, name: &'static str, - receiver: cb::Receiver, + receiver: cb::Receiver, ) -> Self { Self { id, name, receiver } } @@ -572,16 +626,12 @@ pub mod std_mod { } impl EcssTcReceiverCore for CrossbeamTcInStoreReceiver { - fn recv_tc(&self) -> Result { - let (store_addr, token) = self.receiver.try_recv().map_err(|e| match e { + fn recv_tc(&self) -> Result { + self.receiver.try_recv().map_err(|e| match e { cb::TryRecvError::Empty => TryRecvTmtcError::Empty, cb::TryRecvError::Disconnected => { TryRecvTmtcError::Error(EcssTmtcError::from(GenericRecvError::TxDisconnected)) } - })?; - Ok(ReceivedTcWrapper { - store_addr, - token: Some(token), }) } } @@ -596,8 +646,12 @@ pub mod std_mod { InvalidSubservice(u8), #[error("not enough application data available: {0}")] NotEnoughAppData(String), + #[error("PUS packet too large, does not fit in buffer: {0}")] + PusPacketTooLarge(usize), #[error("invalid application data")] InvalidAppData(String), + #[error("invalid format of TC in memory: {0:?}")] + InvalidTcInMemoryFormat(TcInMemory), #[error("generic ECSS tmtc error: {0}")] EcssTmtc(#[from] EcssTmtcError), #[error("invalid verification token")] @@ -634,42 +688,118 @@ pub mod std_mod { } } - /// Base class for handlers which can handle PUS TC packets. Right now, the verification - /// reporter is constrained to the [StdVerifReporterWithSender] and the service handler - /// relies on TMTC packets being exchanged via a [SharedPool]. + pub trait EcssTcInMemConverter { + fn cache_ecss_tc_in_memory<'a>( + &'a mut self, + possible_packet: &'a AcceptedEcssTcAndToken, + ) -> Result<(), PusPacketHandlingError>; + + fn tc_slice_raw(&self) -> &[u8]; + + fn convert_ecss_tc_in_memory_to_reader<'a>( + &'a mut self, + possible_packet: &'a AcceptedEcssTcAndToken, + ) -> Result, PusPacketHandlingError> { + self.cache_ecss_tc_in_memory(possible_packet)?; + Ok(PusTcReader::new(self.tc_slice_raw())?.0) + } + } + + pub struct EcssTcInVecConverter { + pub pus_tc_raw: Option>, + } + + impl EcssTcInMemConverter for EcssTcInVecConverter { + fn cache_ecss_tc_in_memory<'a>( + &'a mut self, + possible_packet: &'a AcceptedEcssTcAndToken, + ) -> Result<(), PusPacketHandlingError> { + self.pus_tc_raw = None; + match &possible_packet.tc_in_memory { + super::TcInMemory::StoreAddr(_) => { + return Err(PusPacketHandlingError::InvalidTcInMemoryFormat( + possible_packet.tc_in_memory.clone(), + )); + } + super::TcInMemory::Vec(vec) => { + self.pus_tc_raw = Some(vec.clone()); + } + }; + Ok(()) + } + + fn tc_slice_raw(&self) -> &[u8] { + if self.pus_tc_raw.is_none() { + return &[]; + } + self.pus_tc_raw.as_ref().unwrap() + } + } + + pub struct EcssTcInStoreConverter { + pub shared_tc_store: SharedPool, + pub pus_buf: Vec, + } + + impl EcssTcInStoreConverter { + pub fn new(shared_tc_store: SharedPool, max_expected_tc_size: usize) -> Self { + Self { + shared_tc_store, + pus_buf: alloc::vec![0; max_expected_tc_size], + } + } + + pub fn copy_tc_to_buf(&mut self, addr: StoreAddr) -> Result<(), PusPacketHandlingError> { + // Keep locked section as short as possible. + let mut tc_pool = self + .shared_tc_store + .write() + .map_err(|_| PusPacketHandlingError::EcssTmtc(EcssTmtcError::StoreLock))?; + let tc_guard = tc_pool.read_with_guard(addr); + let tc_raw = tc_guard.read().unwrap(); + if tc_raw.len() > self.pus_buf.len() { + return Err(PusPacketHandlingError::PusPacketTooLarge(tc_raw.len())); + } + self.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); + Ok(()) + } + } + + impl EcssTcInMemConverter for EcssTcInStoreConverter { + fn cache_ecss_tc_in_memory<'a>( + &'a mut self, + possible_packet: &'a AcceptedEcssTcAndToken, + ) -> Result<(), PusPacketHandlingError> { + match &possible_packet.tc_in_memory { + super::TcInMemory::StoreAddr(addr) => { + self.copy_tc_to_buf(*addr)?; + } + super::TcInMemory::Vec(_) => { + return Err(PusPacketHandlingError::InvalidTcInMemoryFormat( + possible_packet.tc_in_memory.clone(), + )); + } + }; + Ok(()) + } + + fn tc_slice_raw(&self) -> &[u8] { + self.pus_buf.as_ref() + } + } + pub struct PusServiceBase { pub tc_receiver: Box, - pub shared_tc_store: SharedPool, pub tm_sender: Box, pub tm_apid: u16, /// The verification handler is wrapped in a [RefCell] to allow the interior mutability /// pattern. This makes writing methods which are not mutable a lot easier. pub verification_handler: RefCell, - pub pus_buf: [u8; 2048], - pub pus_size: usize, } impl PusServiceBase { - pub fn new( - tc_receiver: Box, - shared_tc_store: SharedPool, - tm_sender: Box, - tm_apid: u16, - verification_handler: StdVerifReporterWithSender, - ) -> Self { - Self { - tc_receiver, - shared_tc_store, - tm_apid, - tm_sender, - verification_handler: RefCell::new(verification_handler), - pus_buf: [0; 2048], - pus_size: 0, - } - } - + #[cfg(feature = "std")] pub fn get_current_timestamp( - &self, partial_error: &mut Option, ) -> [u8; 7] { let mut time_stamp: [u8; 7] = [0; 7]; @@ -684,48 +814,65 @@ pub mod std_mod { time_stamp } - pub fn get_current_timestamp_ignore_error(&self) -> [u8; 7] { + #[cfg(feature = "std")] + pub fn get_current_timestamp_ignore_error() -> [u8; 7] { let mut dummy = None; - self.get_current_timestamp(&mut dummy) + Self::get_current_timestamp(&mut dummy) } } - pub trait PusServiceHandler { - fn psb_mut(&mut self) -> &mut PusServiceBase; - fn psb(&self) -> &PusServiceBase; - fn handle_one_tc( - &mut self, - addr: StoreAddr, - token: VerificationToken, - ) -> Result; + /// Base class for handlers which can handle PUS TC packets. Right now, the verification + /// reporter is constrained to the [StdVerifReporterWithSender] and the service handler + /// relies on TMTC packets being exchanged via a [SharedPool]. Please note that this variant + /// of the PUS service base is not optimized for handling packets sent as a `Vec` and + /// might perform additional copies to the internal buffer as well. The class should + /// still behave correctly. + pub struct PusServiceHandler { + pub common: PusServiceBase, + pub tc_in_mem_converter: TcInMemConverter, + } - fn copy_tc_to_buf(&mut self, addr: StoreAddr) -> Result<(), PusPacketHandlingError> { - // Keep locked section as short as possible. - let psb_mut = self.psb_mut(); - let mut tc_pool = psb_mut - .shared_tc_store - .write() - .map_err(|_| PusPacketHandlingError::EcssTmtc(EcssTmtcError::StoreLock))?; - let tc_guard = tc_pool.read_with_guard(addr); - let tc_raw = tc_guard.read().unwrap(); - psb_mut.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); - Ok(()) + impl PusServiceHandler { + pub fn new( + tc_receiver: Box, + tm_sender: Box, + tm_apid: u16, + verification_handler: VerificationReporterWithSender, + tc_in_mem_converter: TcInMemConverter, + ) -> Self { + Self { + common: PusServiceBase { + tc_receiver, + tm_sender, + tm_apid, + verification_handler: RefCell::new(verification_handler), + }, + tc_in_mem_converter, + } } - fn handle_next_packet(&mut self) -> Result { - match self.psb().tc_receiver.recv_tc() { - Ok(ReceivedTcWrapper { store_addr, token }) => { + pub fn retrieve_and_accept_next_packet( + &mut self, + ) -> Result, PusPacketHandlingError> { + match self.common.tc_receiver.recv_tc() { + Ok(EcssTcAndToken { + tc_in_memory, + token, + }) => { if token.is_none() { return Err(PusPacketHandlingError::InvalidVerificationToken); } let token = token.unwrap(); let accepted_token = VerificationToken::::try_from(token) .map_err(|_| PusPacketHandlingError::InvalidVerificationToken)?; - self.handle_one_tc(store_addr, accepted_token) + Ok(Some(AcceptedEcssTcAndToken { + tc_in_memory, + token: accepted_token, + })) } Err(e) => match e { TryRecvTmtcError::Error(e) => Err(PusPacketHandlingError::EcssTmtc(e)), - TryRecvTmtcError::Empty => Ok(PusPacketHandlerResult::Empty), + TryRecvTmtcError::Empty => Ok(None), }, } } diff --git a/satrs-core/src/pus/scheduler_srv.rs b/satrs-core/src/pus/scheduler_srv.rs index 46a2bcd..9f391b1 100644 --- a/satrs-core/src/pus/scheduler_srv.rs +++ b/satrs-core/src/pus/scheduler_srv.rs @@ -1,15 +1,13 @@ -use crate::pool::{SharedPool, StoreAddr}; +use crate::pool::SharedPool; use crate::pus::scheduler::PusScheduler; -use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken}; -use crate::pus::{ - EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, - PusServiceHandler, -}; -use spacepackets::ecss::tc::PusTcReader; +use crate::pus::{EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError}; use spacepackets::ecss::{scheduling, PusPacket}; use spacepackets::time::cds::TimeProvider; use std::boxed::Box; +use super::verification::VerificationReporterWithSender; +use super::{EcssTcInMemConverter, PusServiceBase, PusServiceHandler}; + /// This is a helper class for [std] environments to handle generic PUS 11 (scheduling service) /// packets. This handler is constrained to using the [PusScheduler], but is able to process /// the most important PUS requests for a scheduling service. @@ -18,28 +16,31 @@ use std::boxed::Box; /// telecommands inside the scheduler. The user can retrieve the wrapped scheduler via the /// [Self::scheduler] and [Self::scheduler_mut] function and then use the scheduler API to release /// telecommands when applicable. -pub struct PusService11SchedHandler { - psb: PusServiceBase, +pub struct PusService11SchedHandler { + pub psb: PusServiceHandler, + shared_tc_store: SharedPool, scheduler: PusScheduler, } -impl PusService11SchedHandler { +impl PusService11SchedHandler { pub fn new( tc_receiver: Box, - shared_tc_store: SharedPool, tm_sender: Box, tm_apid: u16, - verification_handler: StdVerifReporterWithSender, + verification_handler: VerificationReporterWithSender, + tc_in_mem_converter: TcInMemConverter, + shared_tc_store: SharedPool, scheduler: PusScheduler, ) -> Self { Self { - psb: PusServiceBase::new( + psb: PusServiceHandler::new( tc_receiver, - shared_tc_store, tm_sender, tm_apid, verification_handler, + tc_in_mem_converter, ), + shared_tc_store, scheduler, } } @@ -51,45 +52,41 @@ impl PusService11SchedHandler { pub fn scheduler(&self) -> &PusScheduler { &self.scheduler } -} -impl PusServiceHandler for PusService11SchedHandler { - fn psb_mut(&mut self) -> &mut PusServiceBase { - &mut self.psb - } - fn psb(&self) -> &PusServiceBase { - &self.psb - } - - fn handle_one_tc( - &mut self, - addr: StoreAddr, - token: VerificationToken, - ) -> Result { - self.copy_tc_to_buf(addr)?; - let (tc, _) = PusTcReader::new(&self.psb.pus_buf)?; + pub fn handle_one_tc(&mut self) -> Result { + let possible_packet = self.psb.retrieve_and_accept_next_packet()?; + if possible_packet.is_none() { + return Ok(PusPacketHandlerResult::Empty); + } + let ecss_tc_and_token = possible_packet.unwrap(); + let tc = self + .psb + .tc_in_mem_converter + .convert_ecss_tc_in_memory_to_reader(&ecss_tc_and_token)?; let subservice = tc.subservice(); let std_service = scheduling::Subservice::try_from(subservice); if std_service.is_err() { return Ok(PusPacketHandlerResult::CustomSubservice( tc.subservice(), - token, + ecss_tc_and_token.token, )); } let mut partial_error = None; - let time_stamp = self.psb().get_current_timestamp(&mut partial_error); + let time_stamp = PusServiceBase::get_current_timestamp(&mut partial_error); match std_service.unwrap() { scheduling::Subservice::TcEnableScheduling => { let start_token = self .psb + .common .verification_handler .get_mut() - .start_success(token, Some(&time_stamp)) + .start_success(ecss_tc_and_token.token, Some(&time_stamp)) .expect("Error sending start success"); self.scheduler.enable(); if self.scheduler.is_enabled() { self.psb + .common .verification_handler .get_mut() .completion_success(start_token, Some(&time_stamp)) @@ -101,14 +98,16 @@ impl PusServiceHandler for PusService11SchedHandler { scheduling::Subservice::TcDisableScheduling => { let start_token = self .psb + .common .verification_handler .get_mut() - .start_success(token, Some(&time_stamp)) + .start_success(ecss_tc_and_token.token, Some(&time_stamp)) .expect("Error sending start success"); self.scheduler.disable(); if !self.scheduler.is_enabled() { self.psb + .common .verification_handler .get_mut() .completion_success(start_token, Some(&time_stamp)) @@ -120,22 +119,20 @@ impl PusServiceHandler for PusService11SchedHandler { scheduling::Subservice::TcResetScheduling => { let start_token = self .psb + .common .verification_handler .get_mut() - .start_success(token, Some(&time_stamp)) + .start_success(ecss_tc_and_token.token, Some(&time_stamp)) .expect("Error sending start success"); - let mut pool = self - .psb - .shared_tc_store - .write() - .expect("Locking pool failed"); + let mut pool = self.shared_tc_store.write().expect("Locking pool failed"); self.scheduler .reset(pool.as_mut()) .expect("Error resetting TC Pool"); self.psb + .common .verification_handler .get_mut() .completion_success(start_token, Some(&time_stamp)) @@ -144,30 +141,29 @@ impl PusServiceHandler for PusService11SchedHandler { scheduling::Subservice::TcInsertActivity => { let start_token = self .psb + .common .verification_handler .get_mut() - .start_success(token, Some(&time_stamp)) + .start_success(ecss_tc_and_token.token, Some(&time_stamp)) .expect("error sending start success"); - let mut pool = self - .psb - .shared_tc_store - .write() - .expect("locking pool failed"); + let mut pool = self.shared_tc_store.write().expect("locking pool failed"); self.scheduler .insert_wrapped_tc::(&tc, pool.as_mut()) .expect("insertion of activity into pool failed"); self.psb + .common .verification_handler .get_mut() .completion_success(start_token, Some(&time_stamp)) .expect("sending completion success failed"); } _ => { + // Treat unhandled standard subservices as custom subservices for now. return Ok(PusPacketHandlerResult::CustomSubservice( tc.subservice(), - token, + ecss_tc_and_token.token, )); } } @@ -176,9 +172,6 @@ impl PusServiceHandler for PusService11SchedHandler { partial_error, )); } - Ok(PusPacketHandlerResult::CustomSubservice( - tc.subservice(), - token, - )) + Ok(PusPacketHandlerResult::RequestHandled) } } diff --git a/satrs-core/src/pus/test.rs b/satrs-core/src/pus/test.rs index c205178..5d59ab3 100644 --- a/satrs-core/src/pus/test.rs +++ b/satrs-core/src/pus/test.rs @@ -1,67 +1,62 @@ -use crate::pool::{SharedPool, StoreAddr}; -use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken}; use crate::pus::{ EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult, - PusPacketHandlingError, PusServiceBase, PusServiceHandler, PusTmWrapper, + PusPacketHandlingError, PusTmWrapper, }; -use spacepackets::ecss::tc::PusTcReader; use spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader}; use spacepackets::ecss::PusPacket; use spacepackets::SpHeader; use std::boxed::Box; +use super::verification::VerificationReporterWithSender; +use super::{EcssTcInMemConverter, PusServiceBase, PusServiceHandler}; + /// This is a helper class for [std] environments to handle generic PUS 17 (test service) packets. /// This handler only processes ping requests and generates a ping reply for them accordingly. -pub struct PusService17TestHandler { - psb: PusServiceBase, +pub struct PusService17TestHandler { + pub psb: PusServiceHandler, } -impl PusService17TestHandler { +impl PusService17TestHandler { pub fn new( tc_receiver: Box, - shared_tc_store: SharedPool, tm_sender: Box, tm_apid: u16, - verification_handler: StdVerifReporterWithSender, + verification_handler: VerificationReporterWithSender, + tc_in_mem_converter: TcInMemConverter, ) -> Self { Self { - psb: PusServiceBase::new( + psb: PusServiceHandler::new( tc_receiver, - shared_tc_store, tm_sender, tm_apid, verification_handler, + tc_in_mem_converter, ), } } -} -impl PusServiceHandler for PusService17TestHandler { - fn psb_mut(&mut self) -> &mut PusServiceBase { - &mut self.psb - } - fn psb(&self) -> &PusServiceBase { - &self.psb - } - - fn handle_one_tc( - &mut self, - addr: StoreAddr, - token: VerificationToken, - ) -> Result { - self.copy_tc_to_buf(addr)?; - let (tc, _) = PusTcReader::new(&self.psb.pus_buf)?; + pub fn handle_one_tc(&mut self) -> Result { + let possible_packet = self.psb.retrieve_and_accept_next_packet()?; + if possible_packet.is_none() { + return Ok(PusPacketHandlerResult::Empty); + } + let ecss_tc_and_token = possible_packet.unwrap(); + let tc = self + .psb + .tc_in_mem_converter + .convert_ecss_tc_in_memory_to_reader(&ecss_tc_and_token)?; if tc.service() != 17 { return Err(PusPacketHandlingError::WrongService(tc.service())); } if tc.subservice() == 1 { let mut partial_error = None; - let time_stamp = self.psb().get_current_timestamp(&mut partial_error); + let time_stamp = PusServiceBase::get_current_timestamp(&mut partial_error); let result = self .psb + .common .verification_handler .get_mut() - .start_success(token, Some(&time_stamp)) + .start_success(ecss_tc_and_token.token, Some(&time_stamp)) .map_err(|_| PartialPusHandlingError::Verification); let start_token = if let Ok(result) = result { Some(result) @@ -70,11 +65,12 @@ impl PusServiceHandler for PusService17TestHandler { None }; // Sequence count will be handled centrally in TM funnel. - let mut reply_header = SpHeader::tm_unseg(self.psb.tm_apid, 0, 0).unwrap(); + let mut reply_header = SpHeader::tm_unseg(self.psb.common.tm_apid, 0, 0).unwrap(); let tc_header = PusTmSecondaryHeader::new_simple(17, 2, &time_stamp); let ping_reply = PusTmCreator::new(&mut reply_header, tc_header, &[], true); let result = self .psb + .common .tm_sender .send_tm(PusTmWrapper::Direct(ping_reply)) .map_err(PartialPusHandlingError::TmSend); @@ -85,6 +81,7 @@ impl PusServiceHandler for PusService17TestHandler { if let Some(start_token) = start_token { if self .psb + .common .verification_handler .get_mut() .completion_success(start_token, Some(&time_stamp)) @@ -98,12 +95,13 @@ impl PusServiceHandler for PusService17TestHandler { partial_error, )); }; - return Ok(PusPacketHandlerResult::RequestHandled); + } else { + return Ok(PusPacketHandlerResult::CustomSubservice( + tc.subservice(), + ecss_tc_and_token.token, + )); } - Ok(PusPacketHandlerResult::CustomSubservice( - tc.subservice(), - token, - )) + Ok(PusPacketHandlerResult::RequestHandled) } } @@ -112,9 +110,11 @@ mod tests { use crate::pool::{LocalPool, PoolCfg, SharedPool}; use crate::pus::test::PusService17TestHandler; use crate::pus::verification::{ - RequestId, StdVerifReporterWithSender, VerificationReporterCfg, + RequestId, VerificationReporterCfg, VerificationReporterWithSender, + }; + use crate::pus::{ + EcssTcAndToken, EcssTcInStoreConverter, MpscTcInStoreReceiver, MpscTmInStoreSender, }; - use crate::pus::{MpscTcInStoreReceiver, MpscTmInStoreSender, PusServiceHandler}; use crate::tmtc::tm_helper::SharedTmStore; use spacepackets::ecss::tc::{PusTcCreator, PusTcSecondaryHeader}; use spacepackets::ecss::tm::PusTmReader; @@ -141,15 +141,16 @@ mod tests { MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tm_tx.clone()); let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let mut verification_handler = - StdVerifReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); + VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); let test_srv_tm_sender = MpscTmInStoreSender::new(0, "TEST_SENDER", shared_tm_store, tm_tx); let test_srv_tc_receiver = MpscTcInStoreReceiver::new(0, "TEST_RECEIVER", test_srv_tc_rx); + let in_store_converter = EcssTcInStoreConverter::new(tc_pool_shared.clone(), 2048); let mut pus_17_handler = PusService17TestHandler::new( Box::new(test_srv_tc_receiver), - tc_pool_shared.clone(), Box::new(test_srv_tm_sender), TEST_APID, verification_handler.clone(), + in_store_converter, ); // Create a ping TC, verify acceptance. let mut sp_header = SpHeader::tc(TEST_APID, SequenceFlags::Unsegmented, 0, 0).unwrap(); @@ -164,8 +165,10 @@ mod tests { let addr = tc_pool.add(&pus_buf[..tc_size]).unwrap(); drop(tc_pool); // Send accepted TC to test service handler. - test_srv_tc_tx.send((addr, token.into())).unwrap(); - let result = pus_17_handler.handle_next_packet(); + test_srv_tc_tx + .send(EcssTcAndToken::new(addr, token)) + .unwrap(); + let result = pus_17_handler.handle_one_tc(); assert!(result.is_ok()); // We should see 4 replies in the TM queue now: Acceptance TM, Start TM, ping reply and // Completion TM @@ -174,7 +177,7 @@ mod tests { let mut tm_addr = next_msg.unwrap(); let tm_pool = tm_pool_shared.read().unwrap(); let tm_raw = tm_pool.read(&tm_addr).unwrap(); - let (tm, _) = PusTmReader::new(&tm_raw, 0).unwrap(); + let (tm, _) = PusTmReader::new(tm_raw, 0).unwrap(); assert_eq!(tm.service(), 1); assert_eq!(tm.subservice(), 1); let req_id = RequestId::from_bytes(tm.user_data()).expect("generating request ID failed"); @@ -186,7 +189,7 @@ mod tests { tm_addr = next_msg.unwrap(); let tm_raw = tm_pool.read(&tm_addr).unwrap(); // Is generated with CDS short timestamp. - let (tm, _) = PusTmReader::new(&tm_raw, 7).unwrap(); + let (tm, _) = PusTmReader::new(tm_raw, 7).unwrap(); assert_eq!(tm.service(), 1); assert_eq!(tm.subservice(), 3); let req_id = RequestId::from_bytes(tm.user_data()).expect("generating request ID failed"); @@ -198,7 +201,7 @@ mod tests { tm_addr = next_msg.unwrap(); let tm_raw = tm_pool.read(&tm_addr).unwrap(); // Is generated with CDS short timestamp. - let (tm, _) = PusTmReader::new(&tm_raw, 7).unwrap(); + let (tm, _) = PusTmReader::new(tm_raw, 7).unwrap(); assert_eq!(tm.service(), 17); assert_eq!(tm.subservice(), 2); assert!(tm.user_data().is_empty()); @@ -209,7 +212,7 @@ mod tests { tm_addr = next_msg.unwrap(); let tm_raw = tm_pool.read(&tm_addr).unwrap(); // Is generated with CDS short timestamp. - let (tm, _) = PusTmReader::new(&tm_raw, 7).unwrap(); + let (tm, _) = PusTmReader::new(tm_raw, 7).unwrap(); assert_eq!(tm.service(), 1); assert_eq!(tm.subservice(), 7); let req_id = RequestId::from_bytes(tm.user_data()).expect("generating request ID failed"); diff --git a/satrs-example/pyclient/.gitignore b/satrs-example/pyclient/.gitignore index 894ba5d..d994678 100644 --- a/satrs-example/pyclient/.gitignore +++ b/satrs-example/pyclient/.gitignore @@ -6,3 +6,4 @@ __pycache__ !/.idea/runConfigurations /seqcnt.txt +/.tmtc-history.txt diff --git a/satrs-example/pyclient/common.py b/satrs-example/pyclient/common.py index c2d0777..8f57e54 100644 --- a/satrs-example/pyclient/common.py +++ b/satrs-example/pyclient/common.py @@ -44,10 +44,6 @@ class AcsHkIds(enum.IntEnum): MGM_SET = 1 -class HkOpCodes: - GENERATE_ONE_SHOT = ["0", "oneshot"] - - def make_addressable_id(target_id: int, unique_id: int) -> bytes: byte_string = bytearray(struct.pack("!I", target_id)) byte_string.extend(struct.pack("!I", unique_id)) diff --git a/satrs-example/pyclient/main.py b/satrs-example/pyclient/main.py index c749e26..66a41e4 100755 --- a/satrs-example/pyclient/main.py +++ b/satrs-example/pyclient/main.py @@ -4,6 +4,8 @@ import logging import sys import time from typing import Optional +from prompt_toolkit.history import History +from prompt_toolkit.history import FileHistory import tmtccmd from spacepackets.ecss import PusTelemetry, PusVerificator @@ -11,16 +13,16 @@ from spacepackets.ecss.pus_17_test import Service17Tm from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm from spacepackets.ccsds.time import CdsShortTimestamp -from tmtccmd import CcsdsTmtcBackend, TcHandlerBase, ProcedureParamsWrapper +from tmtccmd import TcHandlerBase, ProcedureParamsWrapper from tmtccmd.core.base import BackendRequest from tmtccmd.pus import VerificationWrapper from tmtccmd.tmtc import CcsdsTmHandler, SpecificApidHandlerBase from tmtccmd.com import ComInterface from tmtccmd.config import ( + CmdTreeNode, default_json_path, SetupParams, HookBase, - TmtcDefinitionWrapper, params_to_procedure_conversion, ) from tmtccmd.config import PreArgsParsingWrapper, SetupWrapper @@ -39,12 +41,11 @@ from tmtccmd.tmtc import ( DefaultPusQueueHelper, QueueWrapper, ) -from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider +from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.util.obj_id import ObjectIdDictT import pus_tc -import tc_definitions from common import EXAMPLE_PUS_APID, TM_PACKET_IDS, EventU32 _LOGGER = logging.getLogger() @@ -54,25 +55,29 @@ class SatRsConfigHook(HookBase): def __init__(self, json_cfg_path: str): super().__init__(json_cfg_path=json_cfg_path) - def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: + def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: from tmtccmd.config.com import ( create_com_interface_default, create_com_interface_cfg_default, ) + assert self.cfg_path is not None cfg = create_com_interface_cfg_default( com_if_key=com_if_key, json_cfg_path=self.cfg_path, space_packet_ids=TM_PACKET_IDS, ) + assert cfg is not None return create_com_interface_default(cfg) - def get_tmtc_definitions(self) -> TmtcDefinitionWrapper: - return tc_definitions.tc_definitions() + def get_command_definitions(self) -> CmdTreeNode: + """This function should return the root node of the command definition tree.""" + return pus_tc.create_cmd_definition_tree() - def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int): - _LOGGER.info("Mode operation hook was called") - pass + def get_cmd_history(self) -> Optional[History]: + """Optionlly return a history class for the past command paths which will be used + when prompting a command path from the user in CLI mode.""" + return FileHistory(".tmtc-history.txt") def get_object_ids(self) -> ObjectIdDictT: from tmtccmd.config.objects import get_core_object_ids @@ -94,15 +99,12 @@ class PusHandler(SpecificApidHandlerBase): def handle_tm(self, packet: bytes, _user_args: any): try: - tm_packet = PusTelemetry.unpack( - packet, time_reader=CdsShortTimestamp.empty() - ) + pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) except ValueError as e: _LOGGER.warning("Could not generate PUS TM object from raw data") _LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") raise e - service = tm_packet.service - dedicated_handler = False + service = pus_tm.service if service == 1: tm_packet = Service1Tm.unpack( data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2) @@ -119,8 +121,7 @@ class PusHandler(SpecificApidHandlerBase): else: self.verif_wrapper.log_to_console(tm_packet, res) self.verif_wrapper.log_to_file(tm_packet, res) - dedicated_handler = True - if service == 3: + elif service == 3: _LOGGER.info("No handling for HK packets implemented") _LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]") pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) @@ -129,8 +130,7 @@ class PusHandler(SpecificApidHandlerBase): raise ValueError("No addressable ID in HK packet") json_str = pus_tm.source_data[8:] _LOGGER.info(json_str) - dedicated_handler = True - if service == 5: + elif service == 5: tm_packet = PusTelemetry.unpack( packet, time_reader=CdsShortTimestamp.empty() ) @@ -139,11 +139,10 @@ class PusHandler(SpecificApidHandlerBase): _LOGGER.info(f"Received event packet. Event: {event_u32}") if event_u32.group_id == 0 and event_u32.unique_id == 0: _LOGGER.info("Received test event") - if service == 17: + elif service == 17: tm_packet = Service17Tm.unpack( packet, time_reader=CdsShortTimestamp.empty() ) - dedicated_handler = True if tm_packet.subservice == 2: self.file_logger.info("Received Ping Reply TM[17,2]") _LOGGER.info("Received Ping Reply TM[17,2]") @@ -154,17 +153,14 @@ class PusHandler(SpecificApidHandlerBase): _LOGGER.info( f"Received Test Packet with unknown subservice {tm_packet.subservice}" ) - if tm_packet is None: + else: _LOGGER.info( f"The service {service} is not implemented in Telemetry Factory" ) tm_packet = PusTelemetry.unpack( packet, time_reader=CdsShortTimestamp.empty() ) - self.raw_logger.log_tm(tm_packet) - if not dedicated_handler and tm_packet is not None: - pass - # self.printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet) + self.raw_logger.log_tm(pus_tm) class TcHandler(TcHandlerBase): @@ -196,22 +192,18 @@ class TcHandler(TcHandlerBase): log_entry = entry_helper.to_log_entry() _LOGGER.info(log_entry.log_str) - def queue_finished_cb(self, helper: ProcedureWrapper): - if helper.proc_type == TcProcedureType.DEFAULT: - def_proc = helper.to_def_procedure() - _LOGGER.info( - f"Queue handling finished for service {def_proc.service} and " - f"op code {def_proc.op_code}" - ) + def queue_finished_cb(self, info: ProcedureWrapper): + if info.proc_type == TcProcedureType.DEFAULT: + def_proc = info.to_def_procedure() + _LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}") - def feed_cb(self, helper: ProcedureWrapper, wrapper: FeedWrapper): + def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): q = self.queue_helper q.queue_wrapper = wrapper.queue_wrapper - if helper.proc_type == TcProcedureType.DEFAULT: - def_proc = helper.to_def_procedure() - service = def_proc.service - op_code = def_proc.op_code - pus_tc.pack_pus_telecommands(q, service, op_code) + if info.proc_type == TcProcedureType.DEFAULT: + def_proc = info.to_def_procedure() + assert def_proc.cmd_path is not None + pus_tc.pack_pus_telecommands(q, def_proc.cmd_path) def main(): diff --git a/satrs-example/pyclient/pus_tc.py b/satrs-example/pyclient/pus_tc.py index 9996e5b..f73b755 100644 --- a/satrs-example/pyclient/pus_tc.py +++ b/satrs-example/pyclient/pus_tc.py @@ -1,50 +1,85 @@ import datetime +import logging from spacepackets.ccsds import CdsShortTimestamp from spacepackets.ecss import PusTelecommand -from tmtccmd.config import CoreServiceList +from tmtccmd.config import CmdTreeNode from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd from tmtccmd.pus.tc.s3_fsfw_hk import create_request_one_hk_command from common import ( EXAMPLE_PUS_APID, - HkOpCodes, make_addressable_id, RequestTargetId, AcsHkIds, ) +_LOGGER = logging.getLogger(__name__) -def pack_pus_telecommands(q: DefaultPusQueueHelper, service: str, op_code: str): - if ( - service == CoreServiceList.SERVICE_17 - or service == CoreServiceList.SERVICE_17_ALT - ): - if op_code == "ping": + +def create_cmd_definition_tree() -> CmdTreeNode: + + root_node = CmdTreeNode.root_node() + + test_node = CmdTreeNode("test", "Test Node") + test_node.add_child(CmdTreeNode("ping", "Send PUS ping TC")) + test_node.add_child(CmdTreeNode("trigger_event", "Send PUS test to trigger event")) + root_node.add_child(test_node) + + scheduler_node = CmdTreeNode("scheduler", "Scheduler Node") + scheduler_node.add_child( + CmdTreeNode( + "schedule_ping_10_secs_ahead", "Schedule Ping to execute in 10 seconds" + ) + ) + root_node.add_child(scheduler_node) + + acs_node = CmdTreeNode("acs", "ACS Subsystem Node") + mgm_node = CmdTreeNode("mgms", "MGM devices node") + mgm_node.add_child(CmdTreeNode("one_shot_hk", "Request one shot HK")) + acs_node.add_child(mgm_node) + root_node.add_child(acs_node) + + return root_node + + +def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str): + # It should always be at least the root path "/", so we split of the empty portion left of it. + cmd_path_list = cmd_path.split("/")[1:] + if len(cmd_path_list) == 0: + _LOGGER.warning("empty command path") + return + if cmd_path_list[0] == "test": + assert len(cmd_path_list) >= 2 + if cmd_path_list[1] == "ping": q.add_log_cmd("Sending PUS ping telecommand") return q.add_pus_tc(PusTelecommand(service=17, subservice=1)) - elif op_code == "trigger_event": + elif cmd_path_list[1] == "trigger_event": q.add_log_cmd("Triggering test event") return q.add_pus_tc(PusTelecommand(service=17, subservice=128)) - if service == CoreServiceList.SERVICE_11: - q.add_log_cmd("Sending PUS scheduled TC telecommand") - crt_time = CdsShortTimestamp.from_now() - time_stamp = crt_time + datetime.timedelta(seconds=10) - time_stamp = time_stamp.pack() - return q.add_pus_tc( - create_time_tagged_cmd( - time_stamp, - PusTelecommand(service=17, subservice=1), - apid=EXAMPLE_PUS_APID, - ) - ) - if service == CoreServiceList.SERVICE_3: - if op_code in HkOpCodes.GENERATE_ONE_SHOT: - q.add_log_cmd("Sending HK one shot request") - q.add_pus_tc( - create_request_one_hk_command( - make_addressable_id(RequestTargetId.ACS, AcsHkIds.MGM_SET) + if cmd_path_list[0] == "scheduler": + assert len(cmd_path_list) >= 2 + if cmd_path_list[1] == "schedule_ping_10_secs_ahead": + q.add_log_cmd("Sending PUS scheduled TC telecommand") + crt_time = CdsShortTimestamp.from_now() + time_stamp = crt_time + datetime.timedelta(seconds=10) + time_stamp = time_stamp.pack() + return q.add_pus_tc( + create_time_tagged_cmd( + time_stamp, + PusTelecommand(service=17, subservice=1), + apid=EXAMPLE_PUS_APID, ) ) - pass + if cmd_path_list[0] == "acs": + assert len(cmd_path_list) >= 2 + if cmd_path_list[1] == "mgm": + assert len(cmd_path_list) >= 3 + if cmd_path_list[2] == "one_shot_hk": + q.add_log_cmd("Sending HK one shot request") + q.add_pus_tc( + create_request_one_hk_command( + make_addressable_id(RequestTargetId.ACS, AcsHkIds.MGM_SET) + ) + ) diff --git a/satrs-example/pyclient/requirements.txt b/satrs-example/pyclient/requirements.txt index 485c76a..b3f6f2a 100644 --- a/satrs-example/pyclient/requirements.txt +++ b/satrs-example/pyclient/requirements.txt @@ -1,2 +1,2 @@ -tmtccmd == 7.0.0 +tmtccmd == 8.0.0rc1 # -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 59d1566..6adf040 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -42,7 +42,7 @@ use satrs_core::pus::test::PusService17TestHandler; use satrs_core::pus::verification::{ TcStateStarted, VerificationReporterCfg, VerificationReporterWithSender, VerificationToken, }; -use satrs_core::pus::{MpscTcInStoreReceiver, MpscTmInStoreSender}; +use satrs_core::pus::{EcssTcInStoreConverter, MpscTcInStoreReceiver, MpscTmInStoreSender}; use satrs_core::seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}; use satrs_core::spacepackets::ecss::tm::{PusTmCreator, PusTmZeroCopyWriter}; use satrs_core::spacepackets::{ @@ -179,10 +179,10 @@ fn main() { ); let pus17_handler = PusService17TestHandler::new( Box::new(test_srv_receiver), - tc_store.pool.clone(), Box::new(test_srv_tm_sender), PUS_APID, verif_reporter.clone(), + EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), ); let mut pus_17_wrapper = Service17CustomWrapper { pus17_handler, @@ -204,10 +204,11 @@ fn main() { .expect("Creating PUS Scheduler failed"); let pus_11_handler = PusService11SchedHandler::new( Box::new(sched_srv_receiver), - tc_store.pool.clone(), Box::new(sched_srv_tm_sender), PUS_APID, verif_reporter.clone(), + EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), + tc_store.pool.clone(), scheduler, ); let mut pus_11_wrapper = Pus11Wrapper { @@ -228,10 +229,10 @@ fn main() { ); let pus_5_handler = PusService5EventHandler::new( Box::new(event_srv_receiver), - tc_store.pool.clone(), Box::new(event_srv_tm_sender), PUS_APID, verif_reporter.clone(), + EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), event_request_tx, ); let mut pus_5_wrapper = Pus5Wrapper { pus_5_handler }; @@ -249,10 +250,10 @@ fn main() { ); let pus_8_handler = PusService8ActionHandler::new( Box::new(action_srv_receiver), - tc_store.pool.clone(), Box::new(action_srv_tm_sender), PUS_APID, verif_reporter.clone(), + EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), request_map.clone(), ); let mut pus_8_wrapper = Pus8Wrapper { pus_8_handler }; @@ -267,10 +268,10 @@ fn main() { MpscTcInStoreReceiver::new(TcReceiverId::PusHk as ChannelId, "PUS_8_TC_RECV", pus_hk_rx); let pus_3_handler = PusService3HkHandler::new( Box::new(hk_srv_receiver), - tc_store.pool.clone(), Box::new(hk_srv_tm_sender), PUS_APID, verif_reporter.clone(), + EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), request_map, ); let mut pus_3_wrapper = Pus3Wrapper { pus_3_handler }; diff --git a/satrs-example/src/pus/action.rs b/satrs-example/src/pus/action.rs index d280e69..366539e 100644 --- a/satrs-example/src/pus/action.rs +++ b/satrs-example/src/pus/action.rs @@ -1,12 +1,11 @@ use crate::requests::{ActionRequest, Request, RequestWithToken}; use log::{error, warn}; -use satrs_core::pool::{SharedPool, StoreAddr}; use satrs_core::pus::verification::{ - FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken, + FailParams, TcStateAccepted, VerificationReporterWithSender, VerificationToken, }; use satrs_core::pus::{ - EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, - PusServiceHandler, + EcssTcInMemConverter, EcssTcInStoreConverter, EcssTcReceiver, EcssTmSender, + PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHandler, }; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; @@ -14,34 +13,32 @@ use satrs_example::{tmtc_err, TargetIdWithApid}; use std::collections::HashMap; use std::sync::mpsc::Sender; -pub struct PusService8ActionHandler { - psb: PusServiceBase, +pub struct PusService8ActionHandler { + psb: PusServiceHandler, request_handlers: HashMap>, } -impl PusService8ActionHandler { +impl PusService8ActionHandler { pub fn new( tc_receiver: Box, - shared_tc_pool: SharedPool, tm_sender: Box, tm_apid: u16, - verification_handler: StdVerifReporterWithSender, + verification_handler: VerificationReporterWithSender, + tc_in_mem_converter: TcInMemConverter, request_handlers: HashMap>, ) -> Self { Self { - psb: PusServiceBase::new( + psb: PusServiceHandler::new( tc_receiver, - shared_tc_pool, tm_sender, tm_apid, verification_handler, + tc_in_mem_converter, ), request_handlers, } } -} -impl PusService8ActionHandler { fn handle_action_request_with_id( &self, token: VerificationToken, @@ -50,7 +47,8 @@ impl PusService8ActionHandler { ) -> Result<(), PusPacketHandlingError> { let user_data = tc.user_data(); if user_data.len() < 8 { - self.psb() + self.psb + .common .verification_handler .borrow_mut() .start_failure( @@ -79,7 +77,8 @@ impl PusService8ActionHandler { } else { let mut fail_data: [u8; 4] = [0; 4]; fail_data.copy_from_slice(&target_id.target.to_be_bytes()); - self.psb() + self.psb + .common .verification_handler .borrow_mut() .start_failure( @@ -97,37 +96,32 @@ impl PusService8ActionHandler { } Ok(()) } -} -impl PusServiceHandler for PusService8ActionHandler { - fn psb_mut(&mut self) -> &mut PusServiceBase { - &mut self.psb - } - fn psb(&self) -> &PusServiceBase { - &self.psb - } - - fn handle_one_tc( - &mut self, - addr: StoreAddr, - token: VerificationToken, - ) -> Result { - self.copy_tc_to_buf(addr)?; - let (tc, _) = PusTcReader::new(&self.psb().pus_buf).unwrap(); + fn handle_one_tc(&mut self) -> Result { + let possible_packet = self.psb.retrieve_and_accept_next_packet()?; + if possible_packet.is_none() { + return Ok(PusPacketHandlerResult::Empty); + } + let ecss_tc_and_token = possible_packet.unwrap(); + self.psb + .tc_in_mem_converter + .cache_ecss_tc_in_memory(&ecss_tc_and_token)?; + let tc = PusTcReader::new(self.psb.tc_in_mem_converter.tc_slice_raw())?.0; let subservice = tc.subservice(); let mut partial_error = None; - let time_stamp = self.psb().get_current_timestamp(&mut partial_error); + let time_stamp = PusServiceBase::get_current_timestamp(&mut partial_error); match subservice { 128 => { - self.handle_action_request_with_id(token, &tc, &time_stamp)?; + self.handle_action_request_with_id(ecss_tc_and_token.token, &tc, &time_stamp)?; } _ => { let fail_data = [subservice]; - self.psb_mut() + self.psb + .common .verification_handler .get_mut() .start_failure( - token, + ecss_tc_and_token.token, FailParams::new( Some(&time_stamp), &tmtc_err::INVALID_PUS_SUBSERVICE, @@ -148,12 +142,12 @@ impl PusServiceHandler for PusService8ActionHandler { } pub struct Pus8Wrapper { - pub(crate) pus_8_handler: PusService8ActionHandler, + pub(crate) pus_8_handler: PusService8ActionHandler, } impl Pus8Wrapper { pub fn handle_next_packet(&mut self) -> bool { - match self.pus_8_handler.handle_next_packet() { + match self.pus_8_handler.handle_one_tc() { Ok(result) => match result { PusPacketHandlerResult::RequestHandled => {} PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { diff --git a/satrs-example/src/pus/event.rs b/satrs-example/src/pus/event.rs index 0f2654e..3dc9dc1 100644 --- a/satrs-example/src/pus/event.rs +++ b/satrs-example/src/pus/event.rs @@ -1,14 +1,14 @@ use log::{error, warn}; use satrs_core::pus::event_srv::PusService5EventHandler; -use satrs_core::pus::{PusPacketHandlerResult, PusServiceHandler}; +use satrs_core::pus::{EcssTcInStoreConverter, PusPacketHandlerResult}; pub struct Pus5Wrapper { - pub pus_5_handler: PusService5EventHandler, + pub pus_5_handler: PusService5EventHandler, } impl Pus5Wrapper { pub fn handle_next_packet(&mut self) -> bool { - match self.pus_5_handler.handle_next_packet() { + match self.pus_5_handler.handle_one_tc() { Ok(result) => match result { PusPacketHandlerResult::RequestHandled => {} PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { diff --git a/satrs-example/src/pus/hk.rs b/satrs-example/src/pus/hk.rs index a00f5ef..94b21b2 100644 --- a/satrs-example/src/pus/hk.rs +++ b/satrs-example/src/pus/hk.rs @@ -1,72 +1,63 @@ use crate::requests::{Request, RequestWithToken}; use log::{error, warn}; use satrs_core::hk::{CollectionIntervalFactor, HkRequest}; -use satrs_core::pool::{SharedPool, StoreAddr}; -use satrs_core::pus::verification::{ - FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken, -}; +use satrs_core::pus::verification::{FailParams, StdVerifReporterWithSender}; use satrs_core::pus::{ - EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, - PusServiceHandler, + EcssTcInMemConverter, EcssTcInStoreConverter, EcssTcReceiver, EcssTmSender, + PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHandler, }; -use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::{hk, PusPacket}; use satrs_example::{hk_err, tmtc_err, TargetIdWithApid}; use std::collections::HashMap; use std::sync::mpsc::Sender; -pub struct PusService3HkHandler { - psb: PusServiceBase, +pub struct PusService3HkHandler { + psb: PusServiceHandler, request_handlers: HashMap>, } -impl PusService3HkHandler { +impl PusService3HkHandler { pub fn new( tc_receiver: Box, - shared_tc_pool: SharedPool, tm_sender: Box, tm_apid: u16, verification_handler: StdVerifReporterWithSender, + tc_in_mem_converter: TcInMemConverter, request_handlers: HashMap>, ) -> Self { Self { - psb: PusServiceBase::new( + psb: PusServiceHandler::new( tc_receiver, - shared_tc_pool, tm_sender, tm_apid, verification_handler, + tc_in_mem_converter, ), request_handlers, } } -} -impl PusServiceHandler for PusService3HkHandler { - fn psb_mut(&mut self) -> &mut PusServiceBase { - &mut self.psb - } - fn psb(&self) -> &PusServiceBase { - &self.psb - } - - fn handle_one_tc( - &mut self, - addr: StoreAddr, - token: VerificationToken, - ) -> Result { - self.copy_tc_to_buf(addr)?; - let (tc, _) = PusTcReader::new(&self.psb().pus_buf).unwrap(); + fn handle_one_tc(&mut self) -> Result { + let possible_packet = self.psb.retrieve_and_accept_next_packet()?; + if possible_packet.is_none() { + return Ok(PusPacketHandlerResult::Empty); + } + let ecss_tc_and_token = possible_packet.unwrap(); + let tc = self + .psb + .tc_in_mem_converter + .convert_ecss_tc_in_memory_to_reader(&ecss_tc_and_token)?; let subservice = tc.subservice(); let mut partial_error = None; - let time_stamp = self.psb().get_current_timestamp(&mut partial_error); + let time_stamp = PusServiceBase::get_current_timestamp(&mut partial_error); let user_data = tc.user_data(); if user_data.is_empty() { self.psb + .common .verification_handler .borrow_mut() .start_failure( - token, + ecss_tc_and_token.token, FailParams::new(Some(&time_stamp), &tmtc_err::NOT_ENOUGH_APP_DATA, None), ) .expect("Sending start failure TM failed"); @@ -81,9 +72,13 @@ impl PusServiceHandler for PusService3HkHandler { &hk_err::UNIQUE_ID_MISSING }; self.psb + .common .verification_handler .borrow_mut() - .start_failure(token, FailParams::new(Some(&time_stamp), err, None)) + .start_failure( + ecss_tc_and_token.token, + FailParams::new(Some(&time_stamp), err, None), + ) .expect("Sending start failure TM failed"); return Err(PusPacketHandlingError::NotEnoughAppData( "Expected at least 8 bytes of app data".into(), @@ -93,10 +88,11 @@ impl PusServiceHandler for PusService3HkHandler { let unique_id = u32::from_be_bytes(tc.user_data()[0..4].try_into().unwrap()); if !self.request_handlers.contains_key(&target_id) { self.psb + .common .verification_handler .borrow_mut() .start_failure( - token, + ecss_tc_and_token.token, FailParams::new(Some(&time_stamp), &hk_err::UNKNOWN_TARGET_ID, None), ) .expect("Sending start failure TM failed"); @@ -107,7 +103,11 @@ impl PusServiceHandler for PusService3HkHandler { let send_request = |target: TargetIdWithApid, request: HkRequest| { let sender = self.request_handlers.get(&target).unwrap(); sender - .send(RequestWithToken::new(target, Request::Hk(request), token)) + .send(RequestWithToken::new( + target, + Request::Hk(request), + ecss_tc_and_token.token, + )) .unwrap_or_else(|_| panic!("Sending HK request {request:?} failed")); }; if subservice == hk::Subservice::TcEnableHkGeneration as u8 { @@ -119,10 +119,11 @@ impl PusServiceHandler for PusService3HkHandler { } else if subservice == hk::Subservice::TcModifyHkCollectionInterval as u8 { if user_data.len() < 12 { self.psb + .common .verification_handler .borrow_mut() .start_failure( - token, + ecss_tc_and_token.token, FailParams::new( Some(&time_stamp), &hk_err::COLLECTION_INTERVAL_MISSING, @@ -147,12 +148,12 @@ impl PusServiceHandler for PusService3HkHandler { } pub struct Pus3Wrapper { - pub(crate) pus_3_handler: PusService3HkHandler, + pub(crate) pus_3_handler: PusService3HkHandler, } impl Pus3Wrapper { pub fn handle_next_packet(&mut self) -> bool { - match self.pus_3_handler.handle_next_packet() { + match self.pus_3_handler.handle_one_tc() { Ok(result) => match result { PusPacketHandlerResult::RequestHandled => {} PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index edc7cc3..a545b6b 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -1,8 +1,7 @@ use crate::tmtc::MpscStoreAndSendError; use log::warn; -use satrs_core::pool::StoreAddr; use satrs_core::pus::verification::{FailParams, StdVerifReporterWithSender}; -use satrs_core::pus::{PusPacketHandlerResult, TcAddrWithToken}; +use satrs_core::pus::{EcssTcAndToken, PusPacketHandlerResult, TcInMemory}; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusServiceId; use satrs_core::spacepackets::time::cds::TimeProvider; @@ -17,11 +16,11 @@ pub mod scheduler; pub mod test; pub struct PusTcMpscRouter { - pub test_service_receiver: Sender, - pub event_service_receiver: Sender, - pub sched_service_receiver: Sender, - pub hk_service_receiver: Sender, - pub action_service_receiver: Sender, + pub test_service_receiver: Sender, + pub event_service_receiver: Sender, + pub sched_service_receiver: Sender, + pub hk_service_receiver: Sender, + pub action_service_receiver: Sender, } pub struct PusReceiver { @@ -70,7 +69,7 @@ impl PusReceiver { impl PusReceiver { pub fn handle_tc_packet( &mut self, - store_addr: StoreAddr, + tc_in_memory: TcInMemory, service: u8, pus_tc: &PusTcReader, ) -> Result { @@ -84,22 +83,33 @@ impl PusReceiver { match service { Ok(standard_service) => match standard_service { PusServiceId::Test => { - self.pus_router - .test_service_receiver - .send((store_addr, accepted_token.into()))?; + self.pus_router.test_service_receiver.send(EcssTcAndToken { + tc_in_memory, + token: Some(accepted_token.into()), + })? + } + PusServiceId::Housekeeping => { + self.pus_router.hk_service_receiver.send(EcssTcAndToken { + tc_in_memory, + token: Some(accepted_token.into()), + })? + } + PusServiceId::Event => { + self.pus_router + .event_service_receiver + .send(EcssTcAndToken { + tc_in_memory, + token: Some(accepted_token.into()), + })? + } + PusServiceId::Scheduling => { + self.pus_router + .sched_service_receiver + .send(EcssTcAndToken { + tc_in_memory, + token: Some(accepted_token.into()), + })? } - PusServiceId::Housekeeping => self - .pus_router - .hk_service_receiver - .send((store_addr, accepted_token.into()))?, - PusServiceId::Event => self - .pus_router - .event_service_receiver - .send((store_addr, accepted_token.into()))?, - PusServiceId::Scheduling => self - .pus_router - .sched_service_receiver - .send((store_addr, accepted_token.into()))?, _ => { let result = self.verif_reporter.start_failure( accepted_token, diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index 35c84b8..36c622e 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -2,10 +2,10 @@ use crate::tmtc::PusTcSource; use log::{error, info, warn}; use satrs_core::pus::scheduler::TcInfo; use satrs_core::pus::scheduler_srv::PusService11SchedHandler; -use satrs_core::pus::{PusPacketHandlerResult, PusServiceHandler}; +use satrs_core::pus::{EcssTcInStoreConverter, PusPacketHandlerResult}; pub struct Pus11Wrapper { - pub pus_11_handler: PusService11SchedHandler, + pub pus_11_handler: PusService11SchedHandler, pub tc_source_wrapper: PusTcSource, } @@ -44,7 +44,7 @@ impl Pus11Wrapper { } pub fn handle_next_packet(&mut self) -> bool { - match self.pus_11_handler.handle_next_packet() { + match self.pus_11_handler.handle_one_tc() { Ok(result) => match result { PusPacketHandlerResult::RequestHandled => {} PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index 047cbdb..3b08391 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -1,24 +1,24 @@ use log::{info, warn}; -use satrs_core::events::EventU32; use satrs_core::params::Params; use satrs_core::pus::test::PusService17TestHandler; use satrs_core::pus::verification::FailParams; -use satrs_core::pus::{PusPacketHandlerResult, PusServiceHandler}; +use satrs_core::pus::{EcssTcInMemConverter, PusPacketHandlerResult}; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::spacepackets::time::cds::TimeProvider; use satrs_core::spacepackets::time::TimeWriter; +use satrs_core::{events::EventU32, pus::EcssTcInStoreConverter}; use satrs_example::{tmtc_err, TEST_EVENT}; use std::sync::mpsc::Sender; pub struct Service17CustomWrapper { - pub pus17_handler: PusService17TestHandler, + pub pus17_handler: PusService17TestHandler, pub test_srv_event_sender: Sender<(EventU32, Option)>, } impl Service17CustomWrapper { pub fn handle_next_packet(&mut self) -> bool { - let res = self.pus17_handler.handle_next_packet(); + let res = self.pus17_handler.handle_one_tc(); if res.is_err() { warn!("PUS17 handler failed with error {:?}", res.unwrap_err()); return true; @@ -38,9 +38,9 @@ impl Service17CustomWrapper { warn!("PUS17: Subservice {subservice} not implemented") } PusPacketHandlerResult::CustomSubservice(subservice, token) => { - let psb_mut = self.pus17_handler.psb_mut(); - let buf = psb_mut.pus_buf; - let (tc, _) = PusTcReader::new(&buf).unwrap(); + let (tc, _) = + PusTcReader::new(self.pus17_handler.psb.tc_in_mem_converter.tc_slice_raw()) + .unwrap(); let time_stamper = TimeProvider::from_now_with_u16_days().unwrap(); let mut stamp_buf: [u8; 7] = [0; 7]; time_stamper.write_to_bytes(&mut stamp_buf).unwrap(); @@ -49,12 +49,17 @@ impl Service17CustomWrapper { self.test_srv_event_sender .send((TEST_EVENT.into(), None)) .expect("Sending test event failed"); - let start_token = psb_mut + let start_token = self + .pus17_handler + .psb + .common .verification_handler .get_mut() .start_success(token, Some(&stamp_buf)) .expect("Error sending start success"); - psb_mut + self.pus17_handler + .psb + .common .verification_handler .get_mut() .completion_success(start_token, Some(&stamp_buf)) @@ -62,7 +67,8 @@ impl Service17CustomWrapper { } else { let fail_data = [tc.subservice()]; self.pus17_handler - .psb_mut() + .psb + .common .verification_handler .get_mut() .start_failure( diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 8af701a..40b2b6b 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -1,12 +1,11 @@ use log::warn; -use satrs_core::pus::ReceivesEcssPusTc; +use satrs_core::pus::{EcssTcAndToken, ReceivesEcssPusTc}; use satrs_core::spacepackets::SpHeader; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use thiserror::Error; use crate::pus::PusReceiver; use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; -use satrs_core::pus::TcAddrWithToken; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::tmtc::tm_helper::SharedTmStore; @@ -35,7 +34,7 @@ pub enum MpscStoreAndSendError { #[error("Store error: {0}")] Store(#[from] StoreError), #[error("TC send error: {0}")] - TcSend(#[from] SendError), + TcSend(#[from] SendError), #[error("TMTC send error: {0}")] TmTcSend(#[from] SendError), } @@ -103,7 +102,6 @@ impl TmtcTask { } pub fn periodic_operation(&mut self) { - //while self.poll_tc() {} self.poll_tc(); } @@ -123,7 +121,11 @@ impl TmtcTask { match PusTcReader::new(&self.tc_buf) { Ok((pus_tc, _)) => { self.pus_receiver - .handle_tc_packet(addr, pus_tc.service(), &pus_tc) + .handle_tc_packet( + satrs_core::pus::TcInMemory::StoreAddr(addr), + pus_tc.service(), + &pus_tc, + ) .ok(); true }