diff --git a/satrs-core/Cargo.toml b/satrs-core/Cargo.toml index a6d8da1..871b3c8 100644 --- a/satrs-core/Cargo.toml +++ b/satrs-core/Cargo.toml @@ -51,6 +51,10 @@ version= "0.5" default-features = false optional = true +[dependencies.thiserror] +version = "1" +optional = true + [dependencies.serde] version = "1" default-features = false @@ -82,7 +86,8 @@ std = [ "crossbeam-channel/std", "serde/std", "spacepackets/std", - "num_enum/std" + "num_enum/std", + "thiserror" ] alloc = [ "serde/alloc", diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index c74e587..d43f8ed 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -13,8 +13,10 @@ pub mod event; pub mod event_man; pub mod hk; pub mod mode; +pub mod scheduler; +pub mod scheduler_srv; #[cfg(feature = "std")] -pub mod scheduling; +pub mod test; pub mod verification; #[cfg(feature = "alloc")] @@ -133,40 +135,36 @@ mod alloc_mod { #[cfg(feature = "std")] pub mod std_mod { use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError}; + use crate::pus::verification::{ + StdVerifReporterWithSender, TcStateAccepted, VerificationToken, + }; use crate::pus::{EcssSender, EcssTcSenderCore, EcssTmSenderCore}; + use crate::tmtc::tm_helper::SharedTmStore; use crate::SenderId; use alloc::vec::Vec; use spacepackets::ecss::{PusError, SerializablePusPacket}; use spacepackets::tc::PusTc; + use spacepackets::time::cds::TimeProvider; + use spacepackets::time::{StdTimestampError, TimeWriter}; use spacepackets::tm::PusTm; - use std::sync::mpsc::SendError; + use std::string::String; use std::sync::{mpsc, RwLockWriteGuard}; + use thiserror::Error; - #[derive(Debug, Clone)] + #[derive(Debug, Clone, Error)] pub enum MpscPusInStoreSendError { + #[error("RwGuard lock error")] LockError, - PusError(PusError), - StoreError(StoreError), - SendError(SendError), + #[error("Generic PUS error: {0}")] + PusError(#[from] PusError), + #[error("Generic store error: {0}")] + StoreError(#[from] StoreError), + #[error("Generic send error: {0}")] + SendError(#[from] mpsc::SendError), + #[error("RX handle has disconnected")] RxDisconnected(StoreAddr), } - impl From for MpscPusInStoreSendError { - fn from(value: PusError) -> Self { - MpscPusInStoreSendError::PusError(value) - } - } - impl From> for MpscPusInStoreSendError { - fn from(value: SendError) -> Self { - MpscPusInStoreSendError::SendError(value) - } - } - impl From for MpscPusInStoreSendError { - fn from(value: StoreError) -> Self { - MpscPusInStoreSendError::StoreError(value) - } - } - #[derive(Clone)] pub struct MpscTmtcInStoreSender { id: SenderId, @@ -246,7 +244,7 @@ pub mod std_mod { #[derive(Debug, Clone)] pub enum MpscAsVecSenderError { PusError(PusError), - SendError(SendError>), + SendError(mpsc::SendError>), } #[derive(Debug, Clone)] @@ -284,12 +282,127 @@ pub mod std_mod { Ok(()) } } -} -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum GenericTcCheckError { - NotEnoughAppData, - InvalidSubservice, + #[derive(Debug, Clone, Error)] + pub enum PusPacketHandlingError { + #[error("Generic PUS error: {0}")] + PusError(#[from] PusError), + #[error("Wrong service number {0} for packet handler")] + WrongService(u8), + #[error("Not enough application data available: {0}")] + NotEnoughAppData(String), + #[error("Generic store error: {0}")] + StoreError(#[from] StoreError), + #[error("Error with the pool RwGuard")] + RwGuardError(String), + #[error("MQ backend disconnect error")] + QueueDisconnected, + #[error("Other error {0}")] + OtherError(String), + } + + #[derive(Debug, Clone, Error)] + pub enum PartialPusHandlingError { + #[error("Generic timestamp generation error")] + TimeError(StdTimestampError), + #[error("Error sending telemetry: {0}")] + TmSendError(String), + #[error("Error sending verification message")] + VerificationError, + } + + #[derive(Debug, Clone)] + pub enum PusPacketHandlerResult { + RequestHandled, + RequestHandledPartialSuccess(PartialPusHandlingError), + CustomSubservice(VerificationToken), + Empty, + } + + impl From for PusPacketHandlerResult { + fn from(value: PartialPusHandlingError) -> Self { + Self::RequestHandledPartialSuccess(value) + } + } + + pub type AcceptedTc = (StoreAddr, VerificationToken); + + pub struct PusServiceBase { + pub(crate) tc_rx: mpsc::Receiver, + pub(crate) tc_store: SharedPool, + pub(crate) tm_tx: mpsc::Sender, + pub(crate) tm_store: SharedTmStore, + pub(crate) tm_apid: u16, + pub(crate) verification_handler: StdVerifReporterWithSender, + pub(crate) stamp_buf: [u8; 7], + pub(crate) pus_buf: [u8; 2048], + pus_size: usize, + } + + impl PusServiceBase { + pub fn new( + receiver: mpsc::Receiver, + tc_pool: SharedPool, + tm_tx: mpsc::Sender, + tm_store: SharedTmStore, + tm_apid: u16, + verification_handler: StdVerifReporterWithSender, + ) -> Self { + Self { + tc_rx: receiver, + tc_store: tc_pool, + tm_apid, + tm_tx, + tm_store, + verification_handler, + stamp_buf: [0; 7], + pus_buf: [0; 2048], + pus_size: 0, + } + } + + pub fn update_stamp(&mut self) -> Result<(), PartialPusHandlingError> { + let time_provider = + TimeProvider::from_now_with_u16_days().map_err(PartialPusHandlingError::TimeError); + if let Ok(time_provider) = time_provider { + time_provider.write_to_bytes(&mut self.stamp_buf).unwrap(); + Ok(()) + } else { + self.stamp_buf = [0; 7]; + Err(time_provider.unwrap_err()) + } + } + } + + pub trait PusServiceHandler { + fn psb_mut(&mut self) -> &mut PusServiceBase; + fn psb(&self) -> &PusServiceBase; + fn verification_reporter(&mut self) -> &mut StdVerifReporterWithSender { + &mut self.psb_mut().verification_handler + } + fn tc_store(&mut self) -> &mut SharedPool { + &mut self.psb_mut().tc_store + } + fn pus_tc_buf(&self) -> (&[u8], usize) { + (&self.psb().pus_buf, self.psb().pus_size) + } + fn handle_one_tc( + &mut self, + addr: StoreAddr, + token: VerificationToken, + ) -> Result; + fn handle_next_packet(&mut self) -> Result { + return match self.psb().tc_rx.try_recv() { + Ok((addr, token)) => self.handle_one_tc(addr, token), + Err(e) => match e { + mpsc::TryRecvError::Empty => Ok(PusPacketHandlerResult::Empty), + mpsc::TryRecvError::Disconnected => { + Err(PusPacketHandlingError::QueueDisconnected) + } + }, + }; + } + } } pub(crate) fn source_buffer_large_enough(cap: usize, len: usize) -> Result<(), EcssTmtcError> { diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduler.rs similarity index 74% rename from satrs-core/src/pus/scheduling.rs rename to satrs-core/src/pus/scheduler.rs index cf3d8e0..0406f83 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduler.rs @@ -2,25 +2,24 @@ //! //! The core data structure of this module is the [PusScheduler]. This structure can be used //! to perform the scheduling of telecommands like specified in the ECSS standard. -use crate::pool::{PoolProvider, StoreAddr, StoreError}; -use alloc::collections::btree_map::{Entry, Range}; -use alloc::vec; -use alloc::vec::Vec; +use crate::pool::{StoreAddr, StoreError}; use core::fmt::{Debug, Display, Formatter}; use core::time::Duration; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; use spacepackets::ecss::scheduling::TimeWindowType; -use spacepackets::ecss::{PusError, PusPacket}; +use spacepackets::ecss::PusError; use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc}; -use spacepackets::time::cds::DaysLen24Bits; -use spacepackets::time::{cds, CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; +use spacepackets::time::{CcsdsTimeProvider, TimestampError, UnixTimestamp}; use spacepackets::CcsdsPacket; -use std::collections::BTreeMap; #[cfg(feature = "std")] use std::error::Error; -#[cfg(feature = "std")] -use std::time::SystemTimeError; + +//#[cfg(feature = "std")] +//pub use std_mod::*; + +#[cfg(feature = "alloc")] +pub use alloc_mod::*; /// This is the request ID as specified in ECSS-E-ST-70-41C 5.4.11.2 of the standard. /// @@ -171,35 +170,6 @@ impl TcInfo { } } -/// This is the core data structure for scheduling PUS telecommands with [alloc] support. -/// -/// It is assumed that the actual telecommand data is stored in a separate TC pool offering -/// a [crate::pool::PoolProvider] API. This data structure just tracks the store addresses and their -/// release times and offers a convenient API to insert and release telecommands and perform -/// other functionality specified by the ECSS standard in section 6.11. The time is tracked -/// as a [spacepackets::time::UnixTimestamp] but the only requirement to the timekeeping of -/// the user is that it is convertible to that timestamp. -/// -/// The standard also specifies that the PUS scheduler can be enabled and disabled. -/// A disabled scheduler should still delete commands where the execution time has been reached -/// but should not release them to be executed. -/// -/// The implementation uses an ordered map internally with the release timestamp being the key. -/// This allows efficient time based insertions and extractions which should be the primary use-case -/// for a time-based command scheduler. -/// There is no way to avoid duplicate [RequestId]s during insertion, which can occur even if the -/// user always correctly increment for sequence counter due to overflows. To avoid this issue, -/// it can make sense to split up telecommand groups by the APID to avoid overflows. -/// -/// Currently, sub-schedules and groups are not supported. -#[derive(Debug)] -pub struct PusScheduler { - tc_map: BTreeMap>, - current_time: UnixTimestamp, - time_margin: Duration, - enabled: bool, -} - enum DeletionResult { WithoutStoreDeletion(Option), WithStoreDeletion(Result), @@ -259,360 +229,410 @@ impl TimeWindow { } } -impl PusScheduler { - /// Create a new PUS scheduler. - /// - /// # Arguments - /// - /// * `init_current_time` - The time to initialize the scheduler with. - /// * `time_margin` - This time margin is used when inserting new telecommands into the - /// schedule. If the release time of a new telecommand is earlier than the time margin - /// added to the current time, it will not be inserted into the schedule. - pub fn new(init_current_time: UnixTimestamp, time_margin: Duration) -> Self { - PusScheduler { - tc_map: Default::default(), - current_time: init_current_time, - time_margin, - enabled: true, - } - } +#[cfg(feature = "alloc")] +pub mod alloc_mod { + use crate::pool::{PoolProvider, StoreAddr, StoreError}; + use crate::pus::scheduler::{DeletionResult, RequestId, ScheduleError, TcInfo, TimeWindow}; + use alloc::collections::btree_map::{Entry, Range}; + use alloc::collections::BTreeMap; + use alloc::vec; + use alloc::vec::Vec; + use core::time::Duration; + use spacepackets::ecss::scheduling::TimeWindowType; + use spacepackets::ecss::PusPacket; + use spacepackets::tc::PusTc; + use spacepackets::time::cds::DaysLen24Bits; + use spacepackets::time::{cds, CcsdsTimeProvider, TimeReader, UnixTimestamp}; - /// Like [Self::new], but sets the `init_current_time` parameter to the current system time. #[cfg(feature = "std")] - #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] - pub fn new_with_current_init_time(time_margin: Duration) -> Result { - Ok(Self::new(UnixTimestamp::from_now()?, time_margin)) - } - - pub fn num_scheduled_telecommands(&self) -> u64 { - let mut num_entries = 0; - for entries in &self.tc_map { - num_entries += entries.1.len() as u64; - } - num_entries - } - - pub fn is_enabled(&self) -> bool { - self.enabled - } - - pub fn enable(&mut self) { - self.enabled = true; - } + use std::time::SystemTimeError; + /// This is the core data structure for scheduling PUS telecommands with [alloc] support. + /// + /// It is assumed that the actual telecommand data is stored in a separate TC pool offering + /// a [crate::pool::PoolProvider] API. This data structure just tracks the store addresses and their + /// release times and offers a convenient API to insert and release telecommands and perform + /// other functionality specified by the ECSS standard in section 6.11. The time is tracked + /// as a [spacepackets::time::UnixTimestamp] but the only requirement to the timekeeping of + /// the user is that it is convertible to that timestamp. + /// + /// The standard also specifies that the PUS scheduler can be enabled and disabled. /// A disabled scheduler should still delete commands where the execution time has been reached /// but should not release them to be executed. - pub fn disable(&mut self) { - self.enabled = false; - } - - /// This will disable the scheduler and clear the schedule as specified in 6.11.4.4. - /// Be careful with this command as it will delete all the commands in the schedule. /// - /// The holding store for the telecommands needs to be passed so all the stored telecommands - /// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error - /// will be returned but the method will still try to delete all the commands in the schedule. - pub fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError> { - self.enabled = false; - let mut deletion_ok = Ok(()); - for tc_lists in &mut self.tc_map { - for tc in tc_lists.1 { - let res = store.delete(tc.addr); - if res.is_err() { - deletion_ok = res; - } - } - } - self.tc_map.clear(); - deletion_ok - } - - pub fn update_time(&mut self, current_time: UnixTimestamp) { - self.current_time = current_time; - } - - pub fn current_time(&self) -> &UnixTimestamp { - &self.current_time - } - - /// Insert a telecommand which was already unwrapped from the outer Service 11 packet and stored - /// inside the telecommand packet pool. - pub fn insert_unwrapped_and_stored_tc( - &mut self, - time_stamp: UnixTimestamp, - info: TcInfo, - ) -> Result<(), ScheduleError> { - if time_stamp < self.current_time + self.time_margin { - return Err(ScheduleError::ReleaseTimeInTimeMargin( - self.current_time, - self.time_margin, - time_stamp, - )); - } - match self.tc_map.entry(time_stamp) { - Entry::Vacant(e) => { - e.insert(vec![info]); - } - Entry::Occupied(mut v) => { - v.get_mut().push(info); - } - } - Ok(()) - } - - /// Insert a telecommand which was already unwrapped from the outer Service 11 packet but still - /// needs to be stored inside the telecommand pool. - pub fn insert_unwrapped_tc( - &mut self, - time_stamp: UnixTimestamp, - tc: &[u8], - pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - let check_tc = PusTc::from_bytes(tc)?; - if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { - return Err(ScheduleError::NestedScheduledTc); - } - let req_id = RequestId::from_tc(&check_tc.0); - - match pool.add(tc) { - Ok(addr) => { - let info = TcInfo::new(addr, req_id); - self.insert_unwrapped_and_stored_tc(time_stamp, info)?; - Ok(info) - } - Err(err) => Err(err.into()), - } - } - - /// Insert a telecommand based on the fully wrapped time-tagged telecommand. The timestamp - /// provider needs to be supplied via a generic. - pub fn insert_wrapped_tc( - &mut self, - pus_tc: &PusTc, - pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - if PusPacket::service(pus_tc) != 11 { - return Err(ScheduleError::WrongService); - } - if PusPacket::subservice(pus_tc) != 4 { - return Err(ScheduleError::WrongSubservice); - } - return if let Some(user_data) = pus_tc.user_data() { - let stamp: TimeStamp = TimeReader::from_bytes(user_data)?; - let unix_stamp = stamp.unix_stamp(); - let stamp_len = stamp.len_as_bytes(); - self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool) - } else { - Err(ScheduleError::TcDataEmpty) - }; - } - - /// Insert a telecommand based on the fully wrapped time-tagged telecommand using a CDS - /// short timestamp with 16-bit length of days field. - pub fn insert_wrapped_tc_cds_short( - &mut self, - pus_tc: &PusTc, - pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - self.insert_wrapped_tc::(pus_tc, pool) - } - - /// Insert a telecommand based on the fully wrapped time-tagged telecommand using a CDS - /// long timestamp with a 24-bit length of days field. - pub fn insert_wrapped_tc_cds_long( - &mut self, - pus_tc: &PusTc, - pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - self.insert_wrapped_tc::>(pus_tc, pool) - } - - /// This function uses [Self::retrieve_by_time_filter] to extract all scheduled commands inside - /// the time range and then deletes them from the provided store. + /// The implementation uses an ordered map internally with the release timestamp being the key. + /// This allows efficient time based insertions and extractions which should be the primary use-case + /// for a time-based command scheduler. + /// There is no way to avoid duplicate [RequestId]s during insertion, which can occur even if the + /// user always correctly increment for sequence counter due to overflows. To avoid this issue, + /// it can make sense to split up telecommand groups by the APID to avoid overflows. /// - /// Like specified in the documentation of [Self::retrieve_by_time_filter], the range extraction - /// for deletion is always inclusive. - /// - /// This function returns the number of deleted commands on success. In case any deletion fails, - /// the last deletion will be supplied in addition to the number of deleted commands. - pub fn delete_by_time_filter( - &mut self, - time_window: TimeWindow, - pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - let range = self.retrieve_by_time_filter(time_window); - let mut del_packets = 0; - let mut res_if_fails = None; - let mut keys_to_delete = Vec::new(); - for time_bucket in range { - for tc in time_bucket.1 { - match pool.delete(tc.addr) { - Ok(_) => del_packets += 1, - Err(e) => res_if_fails = Some(e), - } - } - keys_to_delete.push(*time_bucket.0); - } - for key in keys_to_delete { - self.tc_map.remove(&key); - } - if let Some(err) = res_if_fails { - return Err((del_packets, err)); - } - Ok(del_packets) + /// Currently, sub-schedules and groups are not supported. + #[derive(Debug)] + pub struct PusScheduler { + tc_map: BTreeMap>, + pub(crate) current_time: UnixTimestamp, + time_margin: Duration, + enabled: bool, } - - /// Deletes all the scheduled commands. This also deletes the packets from the passed TC pool. - /// - /// This function returns the number of deleted commands on success. In case any deletion fails, - /// the last deletion will be supplied in addition to the number of deleted commands. - pub fn delete_all( - &mut self, - pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - self.delete_by_time_filter(TimeWindow::::new_select_all(), pool) - } - - /// Retrieve a range over all scheduled commands. - pub fn retrieve_all(&mut self) -> Range<'_, UnixTimestamp, Vec> { - self.tc_map.range(..) - } - - /// This retrieves scheduled telecommands which are inside the provided time window. - /// - /// It should be noted that the ranged extraction is always inclusive. For example, a range - /// from 50 to 100 unix seconds would also include command scheduled at 100 unix seconds. - pub fn retrieve_by_time_filter( - &mut self, - time_window: TimeWindow, - ) -> Range<'_, UnixTimestamp, Vec> { - match time_window.time_window_type() { - TimeWindowType::SelectAll => self.tc_map.range(..), - TimeWindowType::TimeTagToTimeTag => { - // This should be guaranteed to be valid by library API, so unwrap is okay - let start_time = time_window.start_time().unwrap().unix_stamp(); - let end_time = time_window.end_time().unwrap().unix_stamp(); - self.tc_map.range(start_time..=end_time) - } - TimeWindowType::FromTimeTag => { - // This should be guaranteed to be valid by library API, so unwrap is okay - let start_time = time_window.start_time().unwrap().unix_stamp(); - self.tc_map.range(start_time..) - } - TimeWindowType::ToTimeTag => { - // This should be guaranteed to be valid by library API, so unwrap is okay - let end_time = time_window.end_time().unwrap().unix_stamp(); - self.tc_map.range(..=end_time) + impl PusScheduler { + /// Create a new PUS scheduler. + /// + /// # Arguments + /// + /// * `init_current_time` - The time to initialize the scheduler with. + /// * `time_margin` - This time margin is used when inserting new telecommands into the + /// schedule. If the release time of a new telecommand is earlier than the time margin + /// added to the current time, it will not be inserted into the schedule. + pub fn new(init_current_time: UnixTimestamp, time_margin: Duration) -> Self { + PusScheduler { + tc_map: Default::default(), + current_time: init_current_time, + time_margin, + enabled: true, } } - } - /// Deletes a scheduled command with the given request ID. Returns the store address if a - /// scheduled command was found in the map and deleted, and None otherwise. - /// - /// Please note that this function will stop on the first telecommand with a request ID match. - /// In case of duplicate IDs (which should generally not happen), this function needs to be - /// called repeatedly. - pub fn delete_by_request_id(&mut self, req_id: &RequestId) -> Option { - if let DeletionResult::WithoutStoreDeletion(v) = - self.delete_by_request_id_internal(req_id, None::<&mut dyn PoolProvider>) - { - return v; + /// Like [Self::new], but sets the `init_current_time` parameter to the current system time. + #[cfg(feature = "std")] + #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] + pub fn new_with_current_init_time(time_margin: Duration) -> Result { + Ok(Self::new(UnixTimestamp::from_now()?, time_margin)) } - panic!("unexpected deletion result"); - } - /// This behaves like [Self::delete_by_request_id] but deletes the packet from the pool as well. - pub fn delete_by_request_id_and_from_pool( - &mut self, - req_id: &RequestId, - pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - if let DeletionResult::WithStoreDeletion(v) = - self.delete_by_request_id_internal(req_id, Some(pool)) - { - return v; - } - panic!("unexpected deletion result"); - } - - fn delete_by_request_id_internal( - &mut self, - req_id: &RequestId, - pool: Option<&mut (impl PoolProvider + ?Sized)>, - ) -> DeletionResult { - let mut idx_found = None; - for time_bucket in &mut self.tc_map { - for (idx, tc_info) in time_bucket.1.iter().enumerate() { - if &tc_info.request_id == req_id { - idx_found = Some(idx); - } - } - if let Some(idx) = idx_found { - let addr = time_bucket.1.remove(idx).addr; - if let Some(pool) = pool { - return match pool.delete(addr) { - Ok(_) => DeletionResult::WithStoreDeletion(Ok(true)), - Err(e) => DeletionResult::WithStoreDeletion(Err(e)), - }; - } - return DeletionResult::WithoutStoreDeletion(Some(addr)); + pub fn num_scheduled_telecommands(&self) -> u64 { + let mut num_entries = 0; + for entries in &self.tc_map { + num_entries += entries.1.len() as u64; } + num_entries } - if pool.is_none() { - DeletionResult::WithoutStoreDeletion(None) - } else { - DeletionResult::WithStoreDeletion(Ok(false)) + + pub fn is_enabled(&self) -> bool { + self.enabled } - } - /// Retrieve all telecommands which should be release based on the current time. - pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec> { - self.tc_map.range(..=self.current_time) - } - #[cfg(feature = "std")] - #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] - pub fn update_time_from_now(&mut self) -> Result<(), SystemTimeError> { - self.current_time = UnixTimestamp::from_now()?; - Ok(()) - } + pub fn enable(&mut self) { + self.enabled = true; + } - /// Utility method which calls [Self::telecommands_to_release] and then calls a releaser - /// closure for each telecommand which should be released. This function will also delete - /// the telecommands from the holding store after calling the release closure, if the scheduler - /// is disabled. - /// - /// # Arguments - /// - /// * `releaser` - Closure where the first argument is whether the scheduler is enabled and - /// the second argument is the telecommand information also containing the store address. - /// This closure should return whether the command should be deleted if the scheduler is - /// disabled to prevent memory leaks. - /// * `store` - The holding store of the telecommands. - pub fn release_telecommands bool>( - &mut self, - mut releaser: R, - tc_store: &mut (impl PoolProvider + ?Sized), - ) -> Result { - let tcs_to_release = self.telecommands_to_release(); - let mut released_tcs = 0; - let mut store_error = Ok(()); - for tc in tcs_to_release { - for info in tc.1 { - let should_delete = releaser(self.enabled, info); - released_tcs += 1; - if should_delete && !self.is_enabled() { - let res = tc_store.delete(info.addr); + /// A disabled scheduler should still delete commands where the execution time has been reached + /// but should not release them to be executed. + pub fn disable(&mut self) { + self.enabled = false; + } + + /// This will disable the scheduler and clear the schedule as specified in 6.11.4.4. + /// Be careful with this command as it will delete all the commands in the schedule. + /// + /// The holding store for the telecommands needs to be passed so all the stored telecommands + /// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error + /// will be returned but the method will still try to delete all the commands in the schedule. + pub fn reset( + &mut self, + store: &mut (impl PoolProvider + ?Sized), + ) -> Result<(), StoreError> { + self.enabled = false; + let mut deletion_ok = Ok(()); + for tc_lists in &mut self.tc_map { + for tc in tc_lists.1 { + let res = store.delete(tc.addr); if res.is_err() { - store_error = res; + deletion_ok = res; } } } + self.tc_map.clear(); + deletion_ok + } + + pub fn update_time(&mut self, current_time: UnixTimestamp) { + self.current_time = current_time; + } + + pub fn current_time(&self) -> &UnixTimestamp { + &self.current_time + } + + /// Insert a telecommand which was already unwrapped from the outer Service 11 packet and stored + /// inside the telecommand packet pool. + pub fn insert_unwrapped_and_stored_tc( + &mut self, + time_stamp: UnixTimestamp, + info: TcInfo, + ) -> Result<(), ScheduleError> { + if time_stamp < self.current_time + self.time_margin { + return Err(ScheduleError::ReleaseTimeInTimeMargin( + self.current_time, + self.time_margin, + time_stamp, + )); + } + match self.tc_map.entry(time_stamp) { + Entry::Vacant(e) => { + e.insert(vec![info]); + } + Entry::Occupied(mut v) => { + v.get_mut().push(info); + } + } + Ok(()) + } + + /// Insert a telecommand which was already unwrapped from the outer Service 11 packet but still + /// needs to be stored inside the telecommand pool. + pub fn insert_unwrapped_tc( + &mut self, + time_stamp: UnixTimestamp, + tc: &[u8], + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + let check_tc = PusTc::from_bytes(tc)?; + if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { + return Err(ScheduleError::NestedScheduledTc); + } + let req_id = RequestId::from_tc(&check_tc.0); + + match pool.add(tc) { + Ok(addr) => { + let info = TcInfo::new(addr, req_id); + self.insert_unwrapped_and_stored_tc(time_stamp, info)?; + Ok(info) + } + Err(err) => Err(err.into()), + } + } + + /// Insert a telecommand based on the fully wrapped time-tagged telecommand. The timestamp + /// provider needs to be supplied via a generic. + pub fn insert_wrapped_tc( + &mut self, + pus_tc: &PusTc, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + if PusPacket::service(pus_tc) != 11 { + return Err(ScheduleError::WrongService); + } + if PusPacket::subservice(pus_tc) != 4 { + return Err(ScheduleError::WrongSubservice); + } + return if let Some(user_data) = pus_tc.user_data() { + let stamp: TimeStamp = TimeReader::from_bytes(user_data)?; + let unix_stamp = stamp.unix_stamp(); + let stamp_len = stamp.len_as_bytes(); + self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool) + } else { + Err(ScheduleError::TcDataEmpty) + }; + } + + /// Insert a telecommand based on the fully wrapped time-tagged telecommand using a CDS + /// short timestamp with 16-bit length of days field. + pub fn insert_wrapped_tc_cds_short( + &mut self, + pus_tc: &PusTc, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + self.insert_wrapped_tc::(pus_tc, pool) + } + + /// Insert a telecommand based on the fully wrapped time-tagged telecommand using a CDS + /// long timestamp with a 24-bit length of days field. + pub fn insert_wrapped_tc_cds_long( + &mut self, + pus_tc: &PusTc, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + self.insert_wrapped_tc::>(pus_tc, pool) + } + + /// This function uses [Self::retrieve_by_time_filter] to extract all scheduled commands inside + /// the time range and then deletes them from the provided store. + /// + /// Like specified in the documentation of [Self::retrieve_by_time_filter], the range extraction + /// for deletion is always inclusive. + /// + /// This function returns the number of deleted commands on success. In case any deletion fails, + /// the last deletion will be supplied in addition to the number of deleted commands. + pub fn delete_by_time_filter( + &mut self, + time_window: TimeWindow, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + let range = self.retrieve_by_time_filter(time_window); + let mut del_packets = 0; + let mut res_if_fails = None; + let mut keys_to_delete = Vec::new(); + for time_bucket in range { + for tc in time_bucket.1 { + match pool.delete(tc.addr) { + Ok(_) => del_packets += 1, + Err(e) => res_if_fails = Some(e), + } + } + keys_to_delete.push(*time_bucket.0); + } + for key in keys_to_delete { + self.tc_map.remove(&key); + } + if let Some(err) = res_if_fails { + return Err((del_packets, err)); + } + Ok(del_packets) + } + + /// Deletes all the scheduled commands. This also deletes the packets from the passed TC pool. + /// + /// This function returns the number of deleted commands on success. In case any deletion fails, + /// the last deletion will be supplied in addition to the number of deleted commands. + pub fn delete_all( + &mut self, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + self.delete_by_time_filter(TimeWindow::::new_select_all(), pool) + } + + /// Retrieve a range over all scheduled commands. + pub fn retrieve_all(&mut self) -> Range<'_, UnixTimestamp, Vec> { + self.tc_map.range(..) + } + + /// This retrieves scheduled telecommands which are inside the provided time window. + /// + /// It should be noted that the ranged extraction is always inclusive. For example, a range + /// from 50 to 100 unix seconds would also include command scheduled at 100 unix seconds. + pub fn retrieve_by_time_filter( + &mut self, + time_window: TimeWindow, + ) -> Range<'_, UnixTimestamp, Vec> { + match time_window.time_window_type() { + TimeWindowType::SelectAll => self.tc_map.range(..), + TimeWindowType::TimeTagToTimeTag => { + // This should be guaranteed to be valid by library API, so unwrap is okay + let start_time = time_window.start_time().unwrap().unix_stamp(); + let end_time = time_window.end_time().unwrap().unix_stamp(); + self.tc_map.range(start_time..=end_time) + } + TimeWindowType::FromTimeTag => { + // This should be guaranteed to be valid by library API, so unwrap is okay + let start_time = time_window.start_time().unwrap().unix_stamp(); + self.tc_map.range(start_time..) + } + TimeWindowType::ToTimeTag => { + // This should be guaranteed to be valid by library API, so unwrap is okay + let end_time = time_window.end_time().unwrap().unix_stamp(); + self.tc_map.range(..=end_time) + } + } + } + + /// Deletes a scheduled command with the given request ID. Returns the store address if a + /// scheduled command was found in the map and deleted, and None otherwise. + /// + /// Please note that this function will stop on the first telecommand with a request ID match. + /// In case of duplicate IDs (which should generally not happen), this function needs to be + /// called repeatedly. + pub fn delete_by_request_id(&mut self, req_id: &RequestId) -> Option { + if let DeletionResult::WithoutStoreDeletion(v) = + self.delete_by_request_id_internal(req_id, None::<&mut dyn PoolProvider>) + { + return v; + } + panic!("unexpected deletion result"); + } + + /// This behaves like [Self::delete_by_request_id] but deletes the packet from the pool as well. + pub fn delete_by_request_id_and_from_pool( + &mut self, + req_id: &RequestId, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + if let DeletionResult::WithStoreDeletion(v) = + self.delete_by_request_id_internal(req_id, Some(pool)) + { + return v; + } + panic!("unexpected deletion result"); + } + + fn delete_by_request_id_internal( + &mut self, + req_id: &RequestId, + pool: Option<&mut (impl PoolProvider + ?Sized)>, + ) -> DeletionResult { + let mut idx_found = None; + for time_bucket in &mut self.tc_map { + for (idx, tc_info) in time_bucket.1.iter().enumerate() { + if &tc_info.request_id == req_id { + idx_found = Some(idx); + } + } + if let Some(idx) = idx_found { + let addr = time_bucket.1.remove(idx).addr; + if let Some(pool) = pool { + return match pool.delete(addr) { + Ok(_) => DeletionResult::WithStoreDeletion(Ok(true)), + Err(e) => DeletionResult::WithStoreDeletion(Err(e)), + }; + } + return DeletionResult::WithoutStoreDeletion(Some(addr)); + } + } + if pool.is_none() { + DeletionResult::WithoutStoreDeletion(None) + } else { + DeletionResult::WithStoreDeletion(Ok(false)) + } + } + /// Retrieve all telecommands which should be release based on the current time. + pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec> { + self.tc_map.range(..=self.current_time) + } + + #[cfg(feature = "std")] + #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] + pub fn update_time_from_now(&mut self) -> Result<(), SystemTimeError> { + self.current_time = UnixTimestamp::from_now()?; + Ok(()) + } + + /// Utility method which calls [Self::telecommands_to_release] and then calls a releaser + /// closure for each telecommand which should be released. This function will also delete + /// the telecommands from the holding store after calling the release closure, if the scheduler + /// is disabled. + /// + /// # Arguments + /// + /// * `releaser` - Closure where the first argument is whether the scheduler is enabled and + /// the second argument is the telecommand information also containing the store address. + /// This closure should return whether the command should be deleted if the scheduler is + /// disabled to prevent memory leaks. + /// * `store` - The holding store of the telecommands. + pub fn release_telecommands bool>( + &mut self, + mut releaser: R, + tc_store: &mut (impl PoolProvider + ?Sized), + ) -> Result { + let tcs_to_release = self.telecommands_to_release(); + let mut released_tcs = 0; + let mut store_error = Ok(()); + for tc in tcs_to_release { + for info in tc.1 { + let should_delete = releaser(self.enabled, info); + released_tcs += 1; + if should_delete && !self.is_enabled() { + let res = tc_store.delete(info.addr); + if res.is_err() { + store_error = res; + } + } + } + } + self.tc_map.retain(|k, _| k > &self.current_time); + store_error + .map(|_| released_tcs) + .map_err(|e| (released_tcs, e)) } - self.tc_map.retain(|k, _| k > &self.current_time); - store_error - .map(|_| released_tcs) - .map_err(|e| (released_tcs, e)) } } @@ -620,6 +640,7 @@ impl PusScheduler { mod tests { use super::*; use crate::pool::{LocalPool, PoolCfg, PoolProvider, StoreAddr, StoreError}; + use alloc::collections::btree_map::Range; use spacepackets::ecss::SerializablePusPacket; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::time::{cds, TimeWriter, UnixTimestamp}; diff --git a/satrs-core/src/pus/scheduler_srv.rs b/satrs-core/src/pus/scheduler_srv.rs new file mode 100644 index 0000000..80bd2b5 --- /dev/null +++ b/satrs-core/src/pus/scheduler_srv.rs @@ -0,0 +1,168 @@ +use crate::pool::{SharedPool, StoreAddr}; +use crate::pus::scheduler::PusScheduler; +use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken}; +use crate::pus::{ + AcceptedTc, PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError, + PusServiceBase, PusServiceHandler, +}; +use crate::tmtc::tm_helper::SharedTmStore; +use spacepackets::ecss::{scheduling, PusPacket}; +use spacepackets::tc::PusTc; +use spacepackets::time::cds::TimeProvider; +use spacepackets::time::TimeWriter; +use std::format; +use std::sync::mpsc::{Receiver, Sender}; + +pub struct PusService11SchedHandler { + psb: PusServiceBase, + scheduler: PusScheduler, +} + +impl PusService11SchedHandler { + pub fn new( + receiver: Receiver, + tc_pool: SharedPool, + tm_tx: Sender, + tm_store: SharedTmStore, + tm_apid: u16, + verification_handler: StdVerifReporterWithSender, + scheduler: PusScheduler, + ) -> Self { + Self { + psb: PusServiceBase::new( + receiver, + tc_pool, + tm_tx, + tm_store, + tm_apid, + verification_handler, + ), + scheduler, + } + } +} + +impl PusServiceHandler for PusService11SchedHandler { + fn psb_mut(&mut self) -> &mut PusServiceBase { + &mut self.psb + } + fn psb(&self) -> &PusServiceBase { + &self.psb + } + + fn handle_one_tc( + &mut self, + addr: StoreAddr, + token: VerificationToken, + ) -> Result { + { + // Keep locked section as short as possible. + let mut tc_pool = self + .psb + .tc_store + .write() + .map_err(|e| PusPacketHandlingError::RwGuardError(format!("{e}")))?; + let tc_guard = tc_pool.read_with_guard(addr); + let tc_raw = tc_guard.read().unwrap(); + self.psb.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); + } + let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf).unwrap(); + let std_service = scheduling::Subservice::try_from(tc.subservice()); + if std_service.is_err() { + return Ok(PusPacketHandlerResult::CustomSubservice(token)); + } + //let partial_error = self.psb.update_stamp().err(); + let time_provider = + TimeProvider::from_now_with_u16_days().map_err(PartialPusHandlingError::TimeError); + let partial_error = if let Ok(time_provider) = time_provider { + time_provider + .write_to_bytes(&mut self.psb.stamp_buf) + .unwrap(); + Ok(()) + } else { + self.psb.stamp_buf = [0; 7]; + Err(time_provider.unwrap_err()) + }; + let partial_error = partial_error.err(); + match std_service.unwrap() { + scheduling::Subservice::TcEnableScheduling => { + let start_token = self + .psb + .verification_handler + .start_success(token, Some(&self.psb.stamp_buf)) + .expect("Error sending start success"); + + self.scheduler.enable(); + if self.scheduler.is_enabled() { + self.psb + .verification_handler + .completion_success(start_token, Some(&self.psb.stamp_buf)) + .expect("Error sending completion success"); + } else { + panic!("Failed to enable scheduler"); + } + } + scheduling::Subservice::TcDisableScheduling => { + let start_token = self + .psb + .verification_handler + .start_success(token, Some(&self.psb.stamp_buf)) + .expect("Error sending start success"); + + self.scheduler.disable(); + if !self.scheduler.is_enabled() { + self.psb + .verification_handler + .completion_success(start_token, Some(&self.psb.stamp_buf)) + .expect("Error sending completion success"); + } else { + panic!("Failed to disable scheduler"); + } + } + scheduling::Subservice::TcResetScheduling => { + let start_token = self + .psb + .verification_handler + .start_success(token, Some(&self.psb.stamp_buf)) + .expect("Error sending start success"); + + let mut pool = self.psb.tc_store.write().expect("Locking pool failed"); + + self.scheduler + .reset(pool.as_mut()) + .expect("Error resetting TC Pool"); + + self.psb + .verification_handler + .completion_success(start_token, Some(&self.psb.stamp_buf)) + .expect("Error sending completion success"); + } + scheduling::Subservice::TcInsertActivity => { + let start_token = self + .psb + .verification_handler + .start_success(token, Some(&self.psb.stamp_buf)) + .expect("error sending start success"); + + let mut pool = self.psb.tc_store.write().expect("locking pool failed"); + self.scheduler + .insert_wrapped_tc::(&tc, pool.as_mut()) + .expect("insertion of activity into pool failed"); + + self.psb + .verification_handler + .completion_success(start_token, Some(&self.psb.stamp_buf)) + .expect("sending completion success failed"); + } + _ => { + return Ok(PusPacketHandlerResult::CustomSubservice(token)); + } + } + if let Some(partial_error) = partial_error { + return Ok(PusPacketHandlerResult::RequestHandledPartialSuccess( + partial_error, + )); + } + Ok(PusPacketHandlerResult::CustomSubservice(token)) + } +} diff --git a/satrs-core/src/pus/test.rs b/satrs-core/src/pus/test.rs new file mode 100644 index 0000000..397fc14 --- /dev/null +++ b/satrs-core/src/pus/test.rs @@ -0,0 +1,114 @@ +use crate::pool::{SharedPool, StoreAddr}; +use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken}; +use crate::pus::{ + AcceptedTc, PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError, + PusServiceBase, PusServiceHandler, +}; +use crate::tmtc::tm_helper::SharedTmStore; +use spacepackets::ecss::PusPacket; +use spacepackets::tc::PusTc; +use spacepackets::tm::{PusTm, PusTmSecondaryHeader}; +use spacepackets::SpHeader; +use std::format; +use std::sync::mpsc::{Receiver, Sender}; + +pub struct PusService17TestHandler { + psb: PusServiceBase, +} + +impl PusService17TestHandler { + pub fn new( + receiver: Receiver, + tc_pool: SharedPool, + tm_tx: Sender, + tm_store: SharedTmStore, + tm_apid: u16, + verification_handler: StdVerifReporterWithSender, + ) -> Self { + Self { + psb: PusServiceBase::new( + receiver, + tc_pool, + tm_tx, + tm_store, + tm_apid, + verification_handler, + ), + } + } +} + +impl PusServiceHandler for PusService17TestHandler { + fn psb_mut(&mut self) -> &mut PusServiceBase { + &mut self.psb + } + fn psb(&self) -> &PusServiceBase { + &self.psb + } + + fn handle_one_tc( + &mut self, + addr: StoreAddr, + token: VerificationToken, + ) -> Result { + { + // Keep locked section as short as possible. + let mut tc_pool = self + .psb + .tc_store + .write() + .map_err(|e| PusPacketHandlingError::RwGuardError(format!("{e}")))?; + let tc_guard = tc_pool.read_with_guard(addr); + let tc_raw = tc_guard.read()?; + self.psb.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); + } + let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf)?; + if tc.service() != 17 { + return Err(PusPacketHandlingError::WrongService(tc.service())); + } + if tc.subservice() == 1 { + let mut partial_error = self.psb.update_stamp().err(); + let result = self + .psb + .verification_handler + .start_success(token, Some(&self.psb.stamp_buf)) + .map_err(|_| PartialPusHandlingError::VerificationError); + let start_token = if let Ok(result) = result { + Some(result) + } else { + partial_error = Some(result.unwrap_err()); + None + }; + // Sequence count will be handled centrally in TM funnel. + let mut reply_header = SpHeader::tm_unseg(self.psb.tm_apid, 0, 0).unwrap(); + let tc_header = PusTmSecondaryHeader::new_simple(17, 2, &self.psb.stamp_buf); + let ping_reply = PusTm::new(&mut reply_header, tc_header, None, true); + let addr = self.psb.tm_store.add_pus_tm(&ping_reply); + if let Err(e) = self + .psb + .tm_tx + .send(addr) + .map_err(|e| PartialPusHandlingError::TmSendError(format!("{e}"))) + { + partial_error = Some(e); + } + if let Some(start_token) = start_token { + if self + .psb + .verification_handler + .completion_success(start_token, Some(&self.psb.stamp_buf)) + .is_err() + { + partial_error = Some(PartialPusHandlingError::VerificationError) + } + } + if let Some(partial_error) = partial_error { + return Ok(PusPacketHandlerResult::RequestHandledPartialSuccess( + partial_error, + )); + }; + return Ok(PusPacketHandlerResult::RequestHandled); + } + Ok(PusPacketHandlerResult::CustomSubservice(token)) + } +} diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index f9efd16..6597302 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -74,7 +74,6 @@ //! context involving multiple threads use crate::pus::{ source_buffer_large_enough, EcssTmSenderCore, EcssTmtcError, EcssTmtcErrorWithSend, - GenericTcCheckError, }; use core::fmt::{Debug, Display, Formatter}; use core::hash::{Hash, Hasher}; @@ -84,7 +83,7 @@ use core::mem::size_of; use delegate::delegate; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use spacepackets::ecss::{scheduling, EcssEnumeration, PusPacket, SerializablePusPacket}; +use spacepackets::ecss::{EcssEnumeration, SerializablePusPacket}; use spacepackets::tc::PusTc; use spacepackets::tm::{PusTm, PusTmSecondaryHeader}; use spacepackets::{CcsdsPacket, PacketId, PacketSequenceCtrl}; @@ -1519,21 +1518,6 @@ mod stdmod { } } -pub fn pus_11_generic_tc_check( - pus_tc: &PusTc, -) -> Result { - if pus_tc.user_data().is_none() { - return Err(GenericTcCheckError::NotEnoughAppData); - } - let subservice: scheduling::Subservice = match pus_tc.subservice().try_into() { - Ok(subservice) => subservice, - Err(_) => { - return Err(GenericTcCheckError::InvalidSubservice); - } - }; - Ok(subservice) -} - #[cfg(test)] mod tests { use crate::pool::{LocalPool, PoolCfg, SharedPool}; diff --git a/satrs-core/src/tmtc/tm_helper.rs b/satrs-core/src/tmtc/tm_helper.rs index 07e8c18..2ef099c 100644 --- a/satrs-core/src/tmtc/tm_helper.rs +++ b/satrs-core/src/tmtc/tm_helper.rs @@ -1,4 +1,3 @@ -use spacepackets::ecss::SerializablePusPacket; use spacepackets::time::cds::TimeProvider; use spacepackets::time::TimeWriter; use spacepackets::tm::{PusTm, PusTmSecondaryHeader}; @@ -76,17 +75,6 @@ impl PusTmWithCdsShortHelper { self.create_pus_tm_common(service, subservice, source_data, seq_count) } - pub fn create_pus_tm_with_stamp<'a>( - &'a mut self, - service: u8, - subservice: u8, - source_data: Option<&'a [u8]>, - timestamp: &'a [u8], - seq_count: u16, - ) -> PusTm { - self.create_pus_tm_common(service, subservice, source_data, seq_count) - } - fn create_pus_tm_common<'a>( &'a self, service: u8, diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 04de260..a9bdc57 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -9,7 +9,7 @@ use log::{info, warn}; use crate::hk::AcsHkIds; use crate::logging::setup_logger; -use crate::pus::test::{PusService17TestHandler, Service17CustomWrapper}; +use crate::pus::test::Service17CustomWrapper; use crate::pus::PusTcMpscRouter; use crate::requests::{Request, RequestWithToken}; use crate::tmtc::{ @@ -26,25 +26,25 @@ use satrs_core::pus::event_man::{ PusEventDispatcher, }; use satrs_core::pus::hk::Subservice as HkSubservice; +use satrs_core::pus::test::PusService17TestHandler; use satrs_core::pus::verification::{ MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender, }; use satrs_core::pus::MpscTmtcInStoreSender; use satrs_core::seq_count::{SeqCountProviderSimple, SeqCountProviderSyncClonable}; -use satrs_core::spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc}; use satrs_core::spacepackets::{ time::cds::TimeProvider, time::TimeWriter, tm::{PusTm, PusTmSecondaryHeader}, SequenceFlags, SpHeader, }; -use satrs_core::tmtc::tm_helper::{PusTmWithCdsShortHelper, SharedTmStore}; +use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::AddressableId; -use satrs_example::{RequestTargetId, OBSW_SERVER_ADDR, SERVER_PORT, TEST_EVENT}; +use satrs_example::{RequestTargetId, OBSW_SERVER_ADDR, SERVER_PORT}; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::mpsc::{channel, TryRecvError}; -use std::sync::{mpsc, Arc, RwLock}; +use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; @@ -167,13 +167,13 @@ fn main() { hk_service_receiver: pus_hk_tx, action_service_receiver: pus_action_tx, }; - let mut pus17_handler = PusService17TestHandler::new( + let pus17_handler = PusService17TestHandler::new( pus_test_rx, tc_store.pool.clone(), tm_funnel_tx.clone(), tm_store.clone(), PUS_APID, - verif_reporter.clone(), + verif_reporter, ); let mut srv_17_wrapper = Service17CustomWrapper { pus17_handler, diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index 078ccf1..4e1941b 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -1,177 +1,26 @@ -use crate::pus::test::PusService17TestHandler; use crate::tmtc::MpscStoreAndSendError; use satrs_core::events::EventU32; -use satrs_core::hk::{CollectionIntervalFactor, HkRequest}; -use satrs_core::mode::{ModeAndSubmode, ModeRequest}; -use satrs_core::objects::ObjectId; use satrs_core::params::Params; -use satrs_core::pool::{PoolProvider, SharedPool, StoreAddr, StoreError}; -use satrs_core::pus::event_man::{EventRequest, EventRequestWithToken}; -use satrs_core::pus::hk; -use satrs_core::pus::mode::Subservice; -use satrs_core::pus::scheduling::PusScheduler; -use satrs_core::pus::verification::{ - pus_11_generic_tc_check, FailParams, StdVerifReporterWithSender, TcStateAccepted, TcStateToken, - VerificationToken, -}; -use satrs_core::pus::{event, EcssTcSenderCore, GenericTcCheckError, MpscTmtcInStoreSender}; -use satrs_core::pus::{mode, EcssTcSender}; -use satrs_core::res_code::ResultU16; -use satrs_core::seq_count::{SeqCountProviderSyncClonable, SequenceCountProviderCore}; -use satrs_core::spacepackets::ecss::{scheduling, PusError, PusServiceId}; -use satrs_core::spacepackets::time::{CcsdsTimeProvider, StdTimestampError, TimestampError}; +use satrs_core::pool::StoreAddr; +use satrs_core::pus::verification::{FailParams, StdVerifReporterWithSender}; +use satrs_core::pus::AcceptedTc; +use satrs_core::seq_count::SeqCountProviderSyncClonable; +use satrs_core::spacepackets::ecss::PusServiceId; +use satrs_core::spacepackets::tc::PusTc; +use satrs_core::spacepackets::time::cds::TimeProvider; +use satrs_core::spacepackets::time::TimeWriter; use satrs_core::tmtc::tm_helper::{PusTmWithCdsShortHelper, SharedTmStore}; -use satrs_core::tmtc::{AddressableId, PusServiceProvider, TargetId}; -use satrs_core::{ - spacepackets::ecss::PusPacket, spacepackets::tc::PusTc, spacepackets::time::cds::TimeProvider, - spacepackets::time::TimeWriter, spacepackets::SpHeader, -}; -use satrs_example::{hk_err, tmtc_err, CustomPusServiceId, TEST_EVENT}; -use std::cell::RefCell; -use std::collections::HashMap; -use std::convert::TryFrom; -use std::rc::Rc; -use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; +use satrs_example::{tmtc_err, CustomPusServiceId}; +use std::sync::mpsc::Sender; pub mod scheduler; pub mod test; -#[derive(Debug, Clone)] -pub enum PusPacketHandlingError { - PusError(PusError), - WrongService(u8), - NotEnoughAppData(String), - StoreError(StoreError), - RwGuardError(String), - QueueDisconnected, - OtherError(String), -} - -impl From for PusPacketHandlingError { - fn from(value: PusError) -> Self { - Self::PusError(value) - } -} - -impl From for PusPacketHandlingError { - fn from(value: StoreError) -> Self { - Self::StoreError(value) - } -} - -#[derive(Debug, Clone)] -pub enum PartialPusHandlingError { - TimeError(StdTimestampError), - TmSendError(String), - VerificationError, -} -impl From for PartialPusHandlingError { - fn from(value: StdTimestampError) -> Self { - Self::TimeError(value) - } -} - -impl From for PartialPusHandlingError { - fn from(value: TimestampError) -> Self { - Self::TimeError(StdTimestampError::TimestampError(value)) - } -} - -#[derive(Debug, Clone)] -pub enum PusPacketHandlerResult { - RequestHandled, - RequestHandledPartialSuccess(PartialPusHandlingError), - CustomSubservice(VerificationToken), - Empty, -} - -pub struct PusServiceBase { - tc_rx: Receiver, - tc_store: SharedPool, - tm_tx: Sender, - tm_store: SharedTmStore, - tm_apid: u16, - verification_handler: StdVerifReporterWithSender, - stamp_buf: [u8; 7], - pus_buf: [u8; 2048], - pus_size: usize, -} - -impl PusServiceBase { - pub fn new( - receiver: Receiver, - tc_pool: SharedPool, - tm_tx: Sender, - tm_store: SharedTmStore, - tm_apid: u16, - verification_handler: StdVerifReporterWithSender, - ) -> Self { - Self { - tc_rx: receiver, - tc_store: tc_pool, - tm_apid, - tm_tx, - tm_store, - verification_handler, - stamp_buf: [0; 7], - pus_buf: [0; 2048], - pus_size: 0, - } - } - - pub fn handle_next_packet< - T: FnOnce( - StoreAddr, - VerificationToken, - ) -> Result, - >( - &mut self, - handle_one_packet: T, - ) -> Result { - return match self.tc_rx.try_recv() { - Ok((addr, token)) => handle_one_packet(addr, token), - Err(e) => match e { - TryRecvError::Empty => Ok(PusPacketHandlerResult::Empty), - TryRecvError::Disconnected => Err(PusPacketHandlingError::QueueDisconnected), - }, - }; - } - - pub fn update_stamp(&mut self) -> Result<(), PartialPusHandlingError> { - let time_provider = TimeProvider::from_now_with_u16_days() - .map_err(|e| PartialPusHandlingError::TimeError(e)); - return if time_provider.is_ok() { - // Can not fail, buffer is large enough. - time_provider - .unwrap() - .write_to_bytes(&mut self.stamp_buf) - .unwrap(); - Ok(()) - } else { - self.stamp_buf = [0; 7]; - Err(time_provider.unwrap_err()) - }; - } -} - -// pub trait PusTcRouter { -// type Error; -// fn route_pus_tc( -// &mut self, -// apid: u16, -// service: u8, -// subservice: u8, -// tc: &PusTc, -// ); -// } - pub enum PusTcWrapper<'tc> { PusTc(&'tc PusTc<'tc>), StoreAddr(StoreAddr), } -pub type AcceptedTc = (StoreAddr, VerificationToken); - pub struct PusTcMpscRouter { pub test_service_receiver: Sender, pub event_service_receiver: Sender, @@ -280,7 +129,6 @@ pub struct PusTcArgs { //pub tc_source: PusTcSource, /// Used to send events from within the TC router pub event_sender: Sender<(EventU32, Option)>, - //pub scheduler: Rc>, } struct TimeStampHelper { diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index 7b7ffd1..8b13789 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -1,164 +1 @@ -use crate::pus::test::PusServiceHandler; -use crate::pus::{ - AcceptedTc, PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError, - PusServiceBase, -}; -use delegate::delegate; -use satrs_core::pool::{SharedPool, StoreAddr}; -use satrs_core::pus::scheduling::PusScheduler; -use satrs_core::pus::verification::{ - pus_11_generic_tc_check, FailParams, StdVerifReporterWithSender, TcStateAccepted, - VerificationToken, -}; -use satrs_core::pus::GenericTcCheckError; -use satrs_core::spacepackets::ecss::{scheduling, PusPacket}; -use satrs_core::spacepackets::tc::PusTc; -use satrs_core::spacepackets::time::cds::TimeProvider; -use satrs_core::spacepackets::time::TimeWriter; -use satrs_core::tmtc::tm_helper::{PusTmWithCdsShortHelper, SharedTmStore}; -use satrs_example::tmtc_err; -use std::sync::mpsc::{Receiver, Sender, TryRecvError}; -pub struct PusService11SchedHandler { - psb: PusServiceBase, - scheduler: PusScheduler, -} - -impl PusService11SchedHandler { - pub fn new( - receiver: Receiver, - tc_pool: SharedPool, - tm_tx: Sender, - tm_store: SharedTmStore, - tm_apid: u16, - verification_handler: StdVerifReporterWithSender, - scheduler: PusScheduler, - ) -> Self { - Self { - psb: PusServiceBase::new( - receiver, - tc_pool, - tm_tx, - tm_store, - tm_apid, - verification_handler, - ), - scheduler, - } - } - - pub fn handle_next_packet(&mut self) -> Result { - return match self.psb.tc_rx.try_recv() { - Ok((addr, token)) => self.handle_one_tc(addr, token), - Err(e) => match e { - TryRecvError::Empty => Ok(PusPacketHandlerResult::Empty), - TryRecvError::Disconnected => Err(PusPacketHandlingError::QueueDisconnected), - }, - }; - } -} - -impl PusServiceHandler for PusService11SchedHandler { - fn psb(&mut self) -> &mut PusServiceBase { - &mut self.psb - } - - fn handle_one_tc( - &mut self, - addr: StoreAddr, - token: VerificationToken, - ) -> Result { - let mut partial_result = self.psb.update_stamp().err(); - { - // Keep locked section as short as possible. - let mut tc_pool = self - .psb - .tc_store - .write() - .map_err(|e| PusPacketHandlingError::RwGuardError(format!("{e}")))?; - let tc_guard = tc_pool.read_with_guard(addr); - let tc_raw = tc_guard.read().unwrap(); - self.psb.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); - } - let (tc, tc_size) = PusTc::from_bytes(&self.psb.pus_buf).unwrap(); - let std_service = scheduling::Subservice::try_from(tc.subservice()); - if std_service.is_err() { - return Ok(PusPacketHandlerResult::CustomSubservice(token)); - } - match std_service.unwrap() { - scheduling::Subservice::TcEnableScheduling => { - let start_token = self - .psb - .verification_handler - .start_success(token, Some(&self.psb.stamp_buf)) - .expect("Error sending start success"); - - self.scheduler.enable(); - if self.scheduler.is_enabled() { - self.psb - .verification_handler - .completion_success(start_token, Some(&self.psb.stamp_buf)) - .expect("Error sending completion success"); - } else { - panic!("Failed to enable scheduler"); - } - } - scheduling::Subservice::TcDisableScheduling => { - let start_token = self - .psb - .verification_handler - .start_success(token, Some(&self.psb.stamp_buf)) - .expect("Error sending start success"); - - self.scheduler.disable(); - if !self.scheduler.is_enabled() { - self.psb - .verification_handler - .completion_success(start_token, Some(&self.psb.stamp_buf)) - .expect("Error sending completion success"); - } else { - panic!("Failed to disable scheduler"); - } - } - scheduling::Subservice::TcResetScheduling => { - let start_token = self - .psb - .verification_handler - .start_success(token, Some(&self.psb.stamp_buf)) - .expect("Error sending start success"); - - let mut pool = self.psb.tc_store.write().expect("Locking pool failed"); - - self.scheduler - .reset(pool.as_mut()) - .expect("Error resetting TC Pool"); - - self.psb - .verification_handler - .completion_success(start_token, Some(&self.psb.stamp_buf)) - .expect("Error sending completion success"); - } - scheduling::Subservice::TcInsertActivity => { - let start_token = self - .psb - .verification_handler - .start_success(token, Some(&self.psb.stamp_buf)) - .expect("error sending start success"); - - let mut pool = self.psb.tc_store.write().expect("locking pool failed"); - self.scheduler - .insert_wrapped_tc::(&tc, pool.as_mut()) - .expect("insertion of activity into pool failed"); - - self.psb - .verification_handler - .completion_success(start_token, Some(&self.psb.stamp_buf)) - .expect("sending completion success failed"); - } - _ => { - return Ok(PusPacketHandlerResult::CustomSubservice(token)); - } - } - Ok(PusPacketHandlerResult::CustomSubservice(token)) - } -} diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index 7765c21..b970b61 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -1,28 +1,15 @@ -use crate::pus::{ - AcceptedTc, PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError, - PusServiceBase, -}; -use delegate::delegate; -use log::{error, info, warn}; +use log::{info, warn}; use satrs_core::events::EventU32; use satrs_core::params::Params; -use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; -use satrs_core::pus::verification::{ - FailParams, StdVerifReporterWithSender, TcStateAccepted, TcStateStarted, - VerificationOrSendErrorWithToken, VerificationToken, -}; -use satrs_core::seq_count::{SeqCountProviderSyncClonable, SequenceCountProviderCore}; -use satrs_core::spacepackets::ecss::{PusError, PusPacket}; +use satrs_core::pus::test::PusService17TestHandler; +use satrs_core::pus::verification::FailParams; +use satrs_core::pus::{PusPacketHandlerResult, PusServiceHandler}; +use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::spacepackets::tc::PusTc; use satrs_core::spacepackets::time::cds::TimeProvider; -use satrs_core::spacepackets::time::{StdTimestampError, TimeWriter}; -use satrs_core::spacepackets::tm::{PusTm, PusTmSecondaryHeader}; -use satrs_core::spacepackets::SpHeader; -use satrs_core::tmtc::tm_helper::{PusTmWithCdsShortHelper, SharedTmStore}; +use satrs_core::spacepackets::time::TimeWriter; use satrs_example::{tmtc_err, TEST_EVENT}; -use std::sync::mpsc::{Receiver, Sender, TryRecvError}; -use std::thread; -use std::time::Duration; +use std::sync::mpsc::Sender; pub struct Service17CustomWrapper { pub pus17_handler: PusService17TestHandler, @@ -31,7 +18,6 @@ pub struct Service17CustomWrapper { impl Service17CustomWrapper { pub fn perform_operation(&mut self) -> bool { - let mut handled_pings = 0; let res = self.pus17_handler.handle_next_packet(); if res.is_err() { warn!("PUS17 handler failed with error {:?}", res.unwrap_err()); @@ -41,18 +27,16 @@ impl Service17CustomWrapper { PusPacketHandlerResult::RequestHandled => { info!("Received PUS ping command TC[17,1]"); info!("Sent ping reply PUS TM[17,2]"); - handled_pings += 1; } PusPacketHandlerResult::RequestHandledPartialSuccess(partial_err) => { warn!( "Handled PUS ping command with partial success: {:?}", partial_err ); - handled_pings += 1; } PusPacketHandlerResult::CustomSubservice(token) => { let (buf, _) = self.pus17_handler.pus_tc_buf(); - let (tc, size) = PusTc::from_bytes(buf).unwrap(); + let (tc, _) = PusTc::from_bytes(buf).unwrap(); let time_stamper = TimeProvider::from_now_with_u16_days().unwrap(); let mut stamp_buf: [u8; 7] = [0; 7]; time_stamper.write_to_bytes(&mut stamp_buf).unwrap(); @@ -63,17 +47,17 @@ impl Service17CustomWrapper { .expect("Sending test event failed"); let start_token = self .pus17_handler - .verification_handler() + .verification_reporter() .start_success(token, Some(&stamp_buf)) .expect("Error sending start success"); self.pus17_handler - .verification_handler() + .verification_reporter() .completion_success(start_token, Some(&stamp_buf)) .expect("Error sending completion success"); } else { let fail_data = [tc.subservice()]; self.pus17_handler - .verification_handler() + .verification_reporter() .start_failure( token, FailParams::new( @@ -92,128 +76,3 @@ impl Service17CustomWrapper { true } } - -pub trait PusServiceHandler { - fn psb(&mut self) -> &mut PusServiceBase; - fn handle_one_tc( - &mut self, - addr: StoreAddr, - token: VerificationToken, - ) -> Result; - fn handle_next_packet(&mut self) -> Result { - return match self.psb().tc_rx.try_recv() { - Ok((addr, token)) => self.handle_one_tc(addr, token), - Err(e) => match e { - TryRecvError::Empty => Ok(PusPacketHandlerResult::Empty), - TryRecvError::Disconnected => Err(PusPacketHandlingError::QueueDisconnected), - }, - }; - } -} -pub struct PusService17TestHandler { - psb: PusServiceBase, -} - -impl PusService17TestHandler { - pub fn new( - receiver: Receiver, - tc_pool: SharedPool, - tm_tx: Sender, - tm_store: SharedTmStore, - tm_apid: u16, - verification_handler: StdVerifReporterWithSender, - ) -> Self { - Self { - psb: PusServiceBase::new( - receiver, - tc_pool, - tm_tx, - tm_store, - tm_apid, - verification_handler, - ), - } - } - - pub fn verification_handler(&mut self) -> &mut StdVerifReporterWithSender { - &mut self.psb.verification_handler - } - - pub fn pus_tc_buf(&self) -> (&[u8], usize) { - (&self.psb.pus_buf, self.psb.pus_size) - } -} - -impl PusServiceHandler for PusService17TestHandler { - fn psb(&mut self) -> &mut PusServiceBase { - &mut self.psb - } - - fn handle_one_tc( - &mut self, - addr: StoreAddr, - token: VerificationToken, - ) -> Result { - let mut partial_result = None; - { - // Keep locked section as short as possible. - let mut tc_pool = self - .psb - .tc_store - .write() - .map_err(|e| PusPacketHandlingError::RwGuardError(format!("{e}")))?; - let tc_guard = tc_pool.read_with_guard(addr); - let tc_raw = tc_guard.read()?; - self.psb.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); - } - let mut partial_error = None; - let (tc, tc_size) = PusTc::from_bytes(&self.psb.pus_buf)?; - if tc.service() != 17 { - return Err(PusPacketHandlingError::WrongService(tc.service())); - } - if tc.subservice() == 1 { - partial_result = self.psb.update_stamp().err(); - let result = self - .psb - .verification_handler - .start_success(token, Some(&self.psb.stamp_buf)) - .map_err(|e| PartialPusHandlingError::VerificationError); - let start_token = if result.is_err() { - partial_error = Some(result.unwrap_err()); - None - } else { - Some(result.unwrap()) - }; - // Sequence count will be handled centrally in TM funnel. - let mut reply_header = SpHeader::tm_unseg(self.psb.tm_apid, 0, 0).unwrap(); - let tc_header = PusTmSecondaryHeader::new_simple(17, 2, &self.psb.stamp_buf); - let ping_reply = PusTm::new(&mut reply_header, tc_header, None, true); - let addr = self.psb.tm_store.add_pus_tm(&ping_reply); - if let Err(e) = self - .psb - .tm_tx - .send(addr) - .map_err(|e| PartialPusHandlingError::TmSendError(format!("{e}"))) - { - partial_error = Some(e); - } - if let Some(start_token) = start_token { - if self - .psb - .verification_handler - .completion_success(start_token, Some(&self.psb.stamp_buf)) - .is_err() - { - partial_error = Some(PartialPusHandlingError::VerificationError) - } - } - if partial_error.is_some() { - return Ok(PusPacketHandlerResult::RequestHandledPartialSuccess( - partial_error.unwrap(), - )); - } - return Ok(PusPacketHandlerResult::RequestHandled); - } - Ok(PusPacketHandlerResult::CustomSubservice(token)) - } -} diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 320cf5a..accef97 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -8,7 +8,6 @@ use std::error::Error; use std::fmt::{Display, Formatter}; use std::net::SocketAddr; use std::rc::Rc; -use std::sync::mpsc; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use std::thread; use std::time::Duration; @@ -18,15 +17,14 @@ use crate::pus::{PusReceiver, PusTcArgs, PusTcMpscRouter, PusTmArgs}; use crate::requests::RequestWithToken; use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; use satrs_core::pus::event_man::EventRequestWithToken; -use satrs_core::pus::scheduling::{PusScheduler, TcInfo}; +use satrs_core::pus::scheduler::{PusScheduler, TcInfo}; use satrs_core::pus::verification::StdVerifReporterWithSender; use satrs_core::seq_count::SeqCountProviderSyncClonable; -use satrs_core::spacepackets::ecss::SerializablePusPacket; -use satrs_core::spacepackets::{ecss::PusPacket, tc::PusTc, tm::PusTm, SpHeader}; +use satrs_core::spacepackets::ecss::{PusPacket, SerializablePusPacket}; +use satrs_core::spacepackets::tc::PusTc; +use satrs_core::spacepackets::SpHeader; use satrs_core::tmtc::tm_helper::SharedTmStore; -use satrs_core::tmtc::{ - CcsdsDistributor, CcsdsError, PusServiceProvider, ReceivesCcsdsTc, ReceivesEcssPusTc, -}; +use satrs_core::tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, ReceivesEcssPusTc}; pub const PUS_APID: u16 = 0x02; @@ -162,7 +160,6 @@ pub fn core_tmtc_task( PusScheduler::new_with_current_init_time(Duration::from_secs(5)).unwrap(), )); - let sched_clone = scheduler.clone(); let pus_tm_args = PusTmArgs { tm_tx: tm_args.tm_sink_sender, tm_store: tm_args.tm_store.clone(), diff --git a/satrs-mib/codegen/src/lib.rs b/satrs-mib/codegen/src/lib.rs index 3f144c6..906bc1a 100644 --- a/satrs-mib/codegen/src/lib.rs +++ b/satrs-mib/codegen/src/lib.rs @@ -20,7 +20,7 @@ pub fn resultcode( let item = parse_macro_input!(item as ItemConst); // Generate additional generated info struct used for introspection. - let result_code_name = item.ident.clone(); + let result_code_name = &item.ident; let name_as_str = result_code_name.to_string(); let gen_struct_name = format_ident!("{}_EXT", result_code_name); let info_str = info_str.map_or(String::from(""), |v| v.value());