From a891b947c7835075001fdf86dbff398d2d4b71a5 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Sat, 3 Feb 2024 13:41:51 +0100 Subject: [PATCH] Finish PUS service optimizations - Better naming for pool abstractions - Added last unittests for PUS helper services - Introduce new abstraction for PUS schedulers - `StoreAddr` is now a generic u64 - `spacepackets` points to 0.7.0 release --- satrs-core/Cargo.toml | 4 +- satrs-core/src/error.rs | 49 --- satrs-core/src/lib.rs | 1 - satrs-core/src/pool.rs | 420 ++++++++++-------- satrs-core/src/pus/event_man.rs | 2 +- satrs-core/src/pus/event_srv.rs | 202 +++++++-- satrs-core/src/pus/mod.rs | 333 ++++++++++++-- satrs-core/src/pus/scheduler.rs | 635 ++++++++++++++++----------- satrs-core/src/pus/scheduler_srv.rs | 294 ++++++++++--- satrs-core/src/pus/test.rs | 281 +++++++----- satrs-core/src/pus/verification.rs | 76 ++-- satrs-core/src/tmtc/tm_helper.rs | 49 ++- satrs-core/tests/pools.rs | 8 +- satrs-core/tests/pus_verification.rs | 11 +- satrs-example/src/main.rs | 68 +-- satrs-example/src/pus/action.rs | 24 +- satrs-example/src/pus/event.rs | 4 +- satrs-example/src/pus/hk.rs | 12 +- satrs-example/src/pus/scheduler.rs | 42 +- satrs-example/src/pus/test.rs | 20 +- satrs-example/src/tmtc.rs | 4 +- satrs-example/src/udp.rs | 4 +- 22 files changed, 1669 insertions(+), 874 deletions(-) delete mode 100644 satrs-core/src/error.rs diff --git a/satrs-core/Cargo.toml b/satrs-core/Cargo.toml index d16f0a0..6730619 100644 --- a/satrs-core/Cargo.toml +++ b/satrs-core/Cargo.toml @@ -73,7 +73,7 @@ features = ["all"] optional = true [dependencies.spacepackets] -version = "0.7.0-beta.4" +version = "0.7.0" default-features = false # git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" # rev = "297cfad22637d3b07a1b27abe56d9a607b5b82a7" @@ -123,4 +123,4 @@ doc-images = [] [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "doc_cfg"] +rustdoc-args = ["--cfg", "doc_cfg", "--generate-link-to-definition"] diff --git a/satrs-core/src/error.rs b/satrs-core/src/error.rs deleted file mode 100644 index 2144839..0000000 --- a/satrs-core/src/error.rs +++ /dev/null @@ -1,49 +0,0 @@ -pub enum FsrcGroupIds { - Tmtc = 0, -} - -pub struct FsrcErrorRaw { - pub group_id: u8, - pub unique_id: u8, - pub group_name: &'static str, - pub info: &'static str, -} - -pub trait FsrcErrorHandler { - fn error(&mut self, e: FsrcErrorRaw); - fn error_with_one_param(&mut self, e: FsrcErrorRaw, _p1: u32) { - self.error(e); - } - fn error_with_two_params(&mut self, e: FsrcErrorRaw, _p1: u32, _p2: u32) { - self.error(e); - } -} - -impl FsrcErrorRaw { - pub const fn new( - group_id: u8, - unique_id: u8, - group_name: &'static str, - info: &'static str, - ) -> Self { - FsrcErrorRaw { - group_id, - unique_id, - group_name, - info, - } - } -} - -#[derive(Clone, Copy, Default)] -pub struct SimpleStdErrorHandler {} - -#[cfg(feature = "use_std")] -impl FsrcErrorHandler for SimpleStdErrorHandler { - fn error(&mut self, e: FsrcErrorRaw) { - println!( - "Received error from group {} with ID ({},{}): {}", - e.group_name, e.group_id, e.unique_id, e.info - ); - } -} diff --git a/satrs-core/src/lib.rs b/satrs-core/src/lib.rs index 061f728..7418693 100644 --- a/satrs-core/src/lib.rs +++ b/satrs-core/src/lib.rs @@ -24,7 +24,6 @@ extern crate std; #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] pub mod cfdp; pub mod encoding; -pub mod error; #[cfg(feature = "alloc")] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] pub mod event_man; diff --git a/satrs-core/src/pool.rs b/satrs-core/src/pool.rs index 3e644d3..0111d18 100644 --- a/satrs-core/src/pool.rs +++ b/satrs-core/src/pool.rs @@ -1,23 +1,13 @@ -//! # Pool implementation providing pre-allocated sub-pools with fixed size memory blocks +//! # Pool implementation providing memory pools for packet storage. //! -//! This is a simple memory pool implementation which pre-allocates all sub-pools using a given pool -//! configuration. After the pre-allocation, no dynamic memory allocation will be performed -//! during run-time. This makes the implementation suitable for real-time applications and -//! embedded environments. The pool implementation will also track the size of the data stored -//! inside it. -//! -//! Transactions with the [pool][LocalPool] are done using a special [address][StoreAddr] type. -//! Adding any data to the pool will yield a store address. Modification and read operations are -//! done using a reference to a store address. Deletion will consume the store address. -//! -//! # Example +//! # Example for the [StaticMemoryPool] //! //! ``` -//! use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider}; +//! use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StaticPoolConfig}; //! //! // 4 buckets of 4 bytes, 2 of 8 bytes and 1 of 16 bytes -//! let pool_cfg = PoolCfg::new(vec![(4, 4), (2, 8), (1, 16)]); -//! let mut local_pool = LocalPool::new(pool_cfg); +//! let pool_cfg = StaticPoolConfig::new(vec![(4, 4), (2, 8), (1, 16)]); +//! let mut local_pool = StaticMemoryPool::new(pool_cfg); //! let mut addr; //! { //! // Add new data to the pool @@ -77,22 +67,24 @@ #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] pub use alloc_mod::*; use core::fmt::{Display, Formatter}; +use delegate::delegate; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; #[cfg(feature = "std")] use std::error::Error; type NumBlocks = u16; +pub type StoreAddr = u64; /// Simple address type used for transactions with the local pool. #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct StoreAddr { +pub struct StaticPoolAddr { pub(crate) pool_idx: u16, pub(crate) packet_idx: NumBlocks, } -impl StoreAddr { +impl StaticPoolAddr { pub const INVALID_ADDR: u32 = 0xFFFFFFFF; pub fn raw(&self) -> u32 { @@ -100,7 +92,22 @@ impl StoreAddr { } } -impl Display for StoreAddr { +impl From for StoreAddr { + fn from(value: StaticPoolAddr) -> Self { + ((value.pool_idx as u64) << 16) | value.packet_idx as u64 + } +} + +impl From for StaticPoolAddr { + fn from(value: StoreAddr) -> Self { + Self { + pool_idx: ((value >> 16) & 0xff) as u16, + packet_idx: (value & 0xff) as u16, + } + } +} + +impl Display for StaticPoolAddr { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { write!( f, @@ -180,39 +187,158 @@ impl Error for StoreError { } } +/// Generic trait for pool providers where the data can be modified and read in-place. This +/// generally means that a shared pool structure has to be wrapped inside a lock structure. +pub trait PoolProviderMemInPlace { + /// Add new data to the pool. The provider should attempt to reserve a memory block with the + /// appropriate size and then copy the given data to the block. Yields a [StoreAddr] which can + /// be used to access the data stored in the pool + fn add(&mut self, data: &[u8]) -> Result; + + /// The provider should attempt to reserve a free memory block with the appropriate size and + /// then return a mutable reference to it. Yields a [StoreAddr] which can be used to access + /// the data stored in the pool + fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError>; + + /// Modify data added previously using a given [StoreAddr] by yielding a mutable reference + /// to it + fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError>; + + /// Read data by yielding a read-only reference given a [StoreAddr] + fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError>; + + /// Delete data inside the pool given a [StoreAddr] + fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError>; + fn has_element_at(&self, addr: &StoreAddr) -> Result; + + /// Retrieve the length of the data at the given store address. + fn len_of_data(&self, addr: &StoreAddr) -> Result { + if !self.has_element_at(addr)? { + return Err(StoreError::DataDoesNotExist(*addr)); + } + Ok(self.read(addr)?.len()) + } +} + +pub trait PoolProviderMemInPlaceWithGuards: PoolProviderMemInPlace { + /// This function behaves like [PoolProviderMemInPlace::read], but consumes the provided address + /// and returns a RAII conformant guard object. + /// + /// Unless the guard [PoolRwGuard::release] method is called, the data for the + /// given address will be deleted automatically when the guard is dropped. + /// This can prevent memory leaks. Users can read the data and release the guard + /// if the data in the store is valid for further processing. If the data is faulty, no + /// manual deletion is necessary when returning from a processing function prematurely. + fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard; + + /// This function behaves like [PoolProviderMemInPlace::modify], but consumes the provided + /// address and returns a RAII conformant guard object. + /// + /// Unless the guard [PoolRwGuard::release] method is called, the data for the + /// given address will be deleted automatically when the guard is dropped. + /// This can prevent memory leaks. Users can read (and modify) the data and release the guard + /// if the data in the store is valid for further processing. If the data is faulty, no + /// manual deletion is necessary when returning from a processing function prematurely. + fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard; +} + +pub struct PoolGuard<'a, MemProvider: PoolProviderMemInPlace + ?Sized> { + pool: &'a mut MemProvider, + pub addr: StoreAddr, + no_deletion: bool, + deletion_failed_error: Option, +} + +/// This helper object +impl<'a, MemProvider: PoolProviderMemInPlace> PoolGuard<'a, MemProvider> { + pub fn new(pool: &'a mut MemProvider, addr: StoreAddr) -> Self { + Self { + pool, + addr, + no_deletion: false, + deletion_failed_error: None, + } + } + + pub fn read(&self) -> Result<&[u8], StoreError> { + self.pool.read(&self.addr) + } + + /// Releasing the pool guard will disable the automatic deletion of the data when the guard + /// is dropped. + pub fn release(&mut self) { + self.no_deletion = true; + } +} + +impl Drop for PoolGuard<'_, MemProvider> { + fn drop(&mut self) { + if !self.no_deletion { + if let Err(e) = self.pool.delete(self.addr) { + self.deletion_failed_error = Some(e); + } + } + } +} + +pub struct PoolRwGuard<'a, MemProvider: PoolProviderMemInPlace + ?Sized> { + guard: PoolGuard<'a, MemProvider>, +} + +impl<'a, MemProvider: PoolProviderMemInPlace> PoolRwGuard<'a, MemProvider> { + pub fn new(pool: &'a mut MemProvider, addr: StoreAddr) -> Self { + Self { + guard: PoolGuard::new(pool, addr), + } + } + + pub fn modify(&mut self) -> Result<&mut [u8], StoreError> { + self.guard.pool.modify(&self.guard.addr) + } + + delegate!( + to self.guard { + pub fn read(&self) -> Result<&[u8], StoreError>; + /// Releasing the pool guard will disable the automatic deletion of the data when the guard + /// is dropped. + pub fn release(&mut self); + } + ); +} + #[cfg(feature = "alloc")] mod alloc_mod { + use super::{ + PoolGuard, PoolProviderMemInPlace, PoolProviderMemInPlaceWithGuards, PoolRwGuard, + StaticPoolAddr, + }; use crate::pool::{NumBlocks, StoreAddr, StoreError, StoreIdError}; - use alloc::boxed::Box; use alloc::vec; use alloc::vec::Vec; - use delegate::delegate; #[cfg(feature = "std")] use std::sync::{Arc, RwLock}; #[cfg(feature = "std")] - pub type ShareablePoolProvider = Box; - #[cfg(feature = "std")] - pub type SharedPool = Arc>; + pub type SharedStaticMemoryPool = Arc>; type PoolSize = usize; const STORE_FREE: PoolSize = PoolSize::MAX; pub const POOL_MAX_SIZE: PoolSize = STORE_FREE - 1; - /// Configuration structure of the [local pool][LocalPool] + /// Configuration structure of the [static memory pool][StaticMemoryPool] /// /// # Parameters /// /// * `cfg`: Vector of tuples which represent a subpool. The first entry in the tuple specifies the /// number of memory blocks in the subpool, the second entry the size of the blocks #[derive(Clone)] - pub struct PoolCfg { + pub struct StaticPoolConfig { cfg: Vec<(NumBlocks, usize)>, } - impl PoolCfg { + impl StaticPoolConfig { pub fn new(cfg: Vec<(NumBlocks, usize)>) -> Self { - PoolCfg { cfg } + StaticPoolConfig { cfg } } pub fn cfg(&self) -> &Vec<(NumBlocks, usize)> { @@ -228,135 +354,30 @@ mod alloc_mod { } } - pub struct PoolGuard<'a> { - pool: &'a mut LocalPool, - pub addr: StoreAddr, - no_deletion: bool, - deletion_failed_error: Option, - } - - /// This helper object - impl<'a> PoolGuard<'a> { - pub fn new(pool: &'a mut LocalPool, addr: StoreAddr) -> Self { - Self { - pool, - addr, - no_deletion: false, - deletion_failed_error: None, - } - } - - pub fn read(&self) -> Result<&[u8], StoreError> { - self.pool.read(&self.addr) - } - - /// Releasing the pool guard will disable the automatic deletion of the data when the guard - /// is dropped. - pub fn release(&mut self) { - self.no_deletion = true; - } - } - - impl Drop for PoolGuard<'_> { - fn drop(&mut self) { - if !self.no_deletion { - if let Err(e) = self.pool.delete(self.addr) { - self.deletion_failed_error = Some(e); - } - } - } - } - - pub struct PoolRwGuard<'a> { - guard: PoolGuard<'a>, - } - - impl<'a> PoolRwGuard<'a> { - pub fn new(pool: &'a mut LocalPool, addr: StoreAddr) -> Self { - Self { - guard: PoolGuard::new(pool, addr), - } - } - - pub fn modify(&mut self) -> Result<&mut [u8], StoreError> { - self.guard.pool.modify(&self.guard.addr) - } - - delegate!( - to self.guard { - pub fn read(&self) -> Result<&[u8], StoreError>; - /// Releasing the pool guard will disable the automatic deletion of the data when the guard - /// is dropped. - pub fn release(&mut self); - } - ); - } - - pub trait PoolProvider { - /// Add new data to the pool. The provider should attempt to reserve a memory block with the - /// appropriate size and then copy the given data to the block. Yields a [StoreAddr] which can - /// be used to access the data stored in the pool - fn add(&mut self, data: &[u8]) -> Result; - - /// The provider should attempt to reserve a free memory block with the appropriate size and - /// then return a mutable reference to it. Yields a [StoreAddr] which can be used to access - /// the data stored in the pool - fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError>; - - /// Modify data added previously using a given [StoreAddr] by yielding a mutable reference - /// to it - fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError>; - - /// This function behaves like [Self::modify], but consumes the provided address and returns a - /// RAII conformant guard object. - /// - /// Unless the guard [PoolRwGuard::release] method is called, the data for the - /// given address will be deleted automatically when the guard is dropped. - /// This can prevent memory leaks. Users can read (and modify) the data and release the guard - /// if the data in the store is valid for further processing. If the data is faulty, no - /// manual deletion is necessary when returning from a processing function prematurely. - fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard; - - /// Read data by yielding a read-only reference given a [StoreAddr] - fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError>; - - /// This function behaves like [Self::read], but consumes the provided address and returns a - /// RAII conformant guard object. - /// - /// Unless the guard [PoolRwGuard::release] method is called, the data for the - /// given address will be deleted automatically when the guard is dropped. - /// This can prevent memory leaks. Users can read the data and release the guard - /// if the data in the store is valid for further processing. If the data is faulty, no - /// manual deletion is necessary when returning from a processing function prematurely. - fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard; - - /// Delete data inside the pool given a [StoreAddr] - fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError>; - fn has_element_at(&self, addr: &StoreAddr) -> Result; - - /// Retrieve the length of the data at the given store address. - fn len_of_data(&self, addr: &StoreAddr) -> Result { - if !self.has_element_at(addr)? { - return Err(StoreError::DataDoesNotExist(*addr)); - } - Ok(self.read(addr)?.len()) - } - } - - /// Pool implementation providing sub-pools with fixed size memory blocks. More details in - /// the [module documentation][crate::pool] - pub struct LocalPool { - pool_cfg: PoolCfg, + /// Pool implementation providing sub-pools with fixed size memory blocks. + /// + /// This is a simple memory pool implementation which pre-allocates all sub-pools using a given pool + /// configuration. After the pre-allocation, no dynamic memory allocation will be performed + /// during run-time. This makes the implementation suitable for real-time applications and + /// embedded environments. The pool implementation will also track the size of the data stored + /// inside it. + /// + /// Transactions with the [pool][StaticMemoryPool] are done using a generic + /// [address][StoreAddr] type. + /// Adding any data to the pool will yield a store address. Modification and read operations are + /// done using a reference to a store address. Deletion will consume the store address. + pub struct StaticMemoryPool { + pool_cfg: StaticPoolConfig, pool: Vec>, sizes_lists: Vec>, } - impl LocalPool { - /// Create a new local pool from the [given configuration][PoolCfg]. This function will sanitize - /// the given configuration as well. - pub fn new(mut cfg: PoolCfg) -> LocalPool { + impl StaticMemoryPool { + /// Create a new local pool from the [given configuration][StaticPoolConfig]. This function + /// will sanitize the given configuration as well. + pub fn new(mut cfg: StaticPoolConfig) -> StaticMemoryPool { let subpools_num = cfg.sanitize(); - let mut local_pool = LocalPool { + let mut local_pool = StaticMemoryPool { pool_cfg: cfg, pool: Vec::with_capacity(subpools_num), sizes_lists: Vec::with_capacity(subpools_num), @@ -372,39 +393,39 @@ mod alloc_mod { local_pool } - fn addr_check(&self, addr: &StoreAddr) -> Result { + fn addr_check(&self, addr: &StaticPoolAddr) -> Result { self.validate_addr(addr)?; let pool_idx = addr.pool_idx as usize; let size_list = self.sizes_lists.get(pool_idx).unwrap(); let curr_size = size_list[addr.packet_idx as usize]; if curr_size == STORE_FREE { - return Err(StoreError::DataDoesNotExist(*addr)); + return Err(StoreError::DataDoesNotExist(StoreAddr::from(*addr))); } Ok(curr_size) } - fn validate_addr(&self, addr: &StoreAddr) -> Result<(), StoreError> { + fn validate_addr(&self, addr: &StaticPoolAddr) -> Result<(), StoreError> { let pool_idx = addr.pool_idx as usize; if pool_idx >= self.pool_cfg.cfg.len() { return Err(StoreError::InvalidStoreId( StoreIdError::InvalidSubpool(addr.pool_idx), - Some(*addr), + Some(StoreAddr::from(*addr)), )); } if addr.packet_idx >= self.pool_cfg.cfg[addr.pool_idx as usize].0 { return Err(StoreError::InvalidStoreId( StoreIdError::InvalidPacketIdx(addr.packet_idx), - Some(*addr), + Some(StoreAddr::from(*addr)), )); } Ok(()) } - fn reserve(&mut self, data_len: usize) -> Result { + fn reserve(&mut self, data_len: usize) -> Result { let subpool_idx = self.find_subpool(data_len, 0)?; let (slot, size_slot_ref) = self.find_empty(subpool_idx)?; *size_slot_ref = data_len; - Ok(StoreAddr { + Ok(StaticPoolAddr { pool_idx: subpool_idx, packet_idx: slot, }) @@ -422,7 +443,7 @@ mod alloc_mod { Err(StoreError::DataTooLarge(req_size)) } - fn write(&mut self, addr: &StoreAddr, data: &[u8]) -> Result<(), StoreError> { + fn write(&mut self, addr: &StaticPoolAddr, data: &[u8]) -> Result<(), StoreError> { let packet_pos = self.raw_pos(addr).ok_or(StoreError::InternalError(0))?; let subpool = self .pool @@ -449,13 +470,13 @@ mod alloc_mod { Err(StoreError::StoreFull(subpool)) } - fn raw_pos(&self, addr: &StoreAddr) -> Option { + fn raw_pos(&self, addr: &StaticPoolAddr) -> Option { let (_, size) = self.pool_cfg.cfg.get(addr.pool_idx as usize)?; Some(addr.packet_idx as usize * size) } } - impl PoolProvider for LocalPool { + impl PoolProviderMemInPlace for StaticMemoryPool { fn add(&mut self, data: &[u8]) -> Result { let data_len = data.len(); if data_len > POOL_MAX_SIZE { @@ -463,7 +484,7 @@ mod alloc_mod { } let addr = self.reserve(data_len)?; self.write(&addr, data)?; - Ok(addr) + Ok(addr.into()) } fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError> { @@ -474,34 +495,29 @@ mod alloc_mod { let raw_pos = self.raw_pos(&addr).unwrap(); let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + len]; - Ok((addr, block)) + Ok((addr.into(), block)) } fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError> { - let curr_size = self.addr_check(addr)?; - let raw_pos = self.raw_pos(addr).unwrap(); + let addr = StaticPoolAddr::from(*addr); + let curr_size = self.addr_check(&addr)?; + let raw_pos = self.raw_pos(&addr).unwrap(); let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap() [raw_pos..raw_pos + curr_size]; Ok(block) } - fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard { - PoolRwGuard::new(self, addr) - } - fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError> { - let curr_size = self.addr_check(addr)?; - let raw_pos = self.raw_pos(addr).unwrap(); + let addr = StaticPoolAddr::from(*addr); + let curr_size = self.addr_check(&addr)?; + let raw_pos = self.raw_pos(&addr).unwrap(); let block = &self.pool.get(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + curr_size]; Ok(block) } - fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard { - PoolGuard::new(self, addr) - } - fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError> { + let addr = StaticPoolAddr::from(addr); self.addr_check(&addr)?; let block_size = self.pool_cfg.cfg.get(addr.pool_idx as usize).unwrap().1; let raw_pos = self.raw_pos(&addr).unwrap(); @@ -514,7 +530,8 @@ mod alloc_mod { } fn has_element_at(&self, addr: &StoreAddr) -> Result { - self.validate_addr(addr)?; + let addr = StaticPoolAddr::from(*addr); + self.validate_addr(&addr)?; let pool_idx = addr.pool_idx as usize; let size_list = self.sizes_lists.get(pool_idx).unwrap(); let curr_size = size_list[addr.packet_idx as usize]; @@ -524,34 +541,45 @@ mod alloc_mod { Ok(true) } } + + impl PoolProviderMemInPlaceWithGuards for StaticMemoryPool { + fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard { + PoolRwGuard::new(self, addr) + } + + fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard { + PoolGuard::new(self, addr) + } + } } #[cfg(test)] mod tests { use crate::pool::{ - LocalPool, PoolCfg, PoolGuard, PoolProvider, PoolRwGuard, StoreAddr, StoreError, - StoreIdError, POOL_MAX_SIZE, + PoolGuard, PoolProviderMemInPlace, PoolProviderMemInPlaceWithGuards, PoolRwGuard, + StaticMemoryPool, StaticPoolAddr, StaticPoolConfig, StoreError, StoreIdError, + POOL_MAX_SIZE, }; use std::vec; - fn basic_small_pool() -> LocalPool { + fn basic_small_pool() -> StaticMemoryPool { // 4 buckets of 4 bytes, 2 of 8 bytes and 1 of 16 bytes - let pool_cfg = PoolCfg::new(vec![(4, 4), (2, 8), (1, 16)]); - LocalPool::new(pool_cfg) + let pool_cfg = StaticPoolConfig::new(vec![(4, 4), (2, 8), (1, 16)]); + StaticMemoryPool::new(pool_cfg) } #[test] fn test_cfg() { // Values where number of buckets is 0 or size is too large should be removed - let mut pool_cfg = PoolCfg::new(vec![(0, 0), (1, 0), (2, POOL_MAX_SIZE)]); + let mut pool_cfg = StaticPoolConfig::new(vec![(0, 0), (1, 0), (2, POOL_MAX_SIZE)]); pool_cfg.sanitize(); assert_eq!(*pool_cfg.cfg(), vec![(1, 0)]); // Entries should be ordered according to bucket size - pool_cfg = PoolCfg::new(vec![(16, 6), (32, 3), (8, 12)]); + pool_cfg = StaticPoolConfig::new(vec![(16, 6), (32, 3), (8, 12)]); pool_cfg.sanitize(); assert_eq!(*pool_cfg.cfg(), vec![(32, 3), (16, 6), (8, 12)]); // Unstable sort is used, so order of entries with same block length should not matter - pool_cfg = PoolCfg::new(vec![(12, 12), (14, 16), (10, 12)]); + pool_cfg = StaticPoolConfig::new(vec![(12, 12), (14, 16), (10, 12)]); pool_cfg.sanitize(); assert!( *pool_cfg.cfg() == vec![(12, 12), (10, 12), (14, 16)] @@ -600,10 +628,10 @@ mod tests { let (addr, buf_ref) = res.unwrap(); assert_eq!( addr, - StoreAddr { + u64::from(StaticPoolAddr { pool_idx: 2, packet_idx: 0 - } + }) ); assert_eq!(buf_ref.len(), 12); } @@ -655,10 +683,13 @@ mod tests { fn test_read_does_not_exist() { let local_pool = basic_small_pool(); // Try to access data which does not exist - let res = local_pool.read(&StoreAddr { - packet_idx: 0, - pool_idx: 0, - }); + let res = local_pool.read( + &StaticPoolAddr { + packet_idx: 0, + pool_idx: 0, + } + .into(), + ); assert!(res.is_err()); assert!(matches!( res.unwrap_err(), @@ -684,10 +715,11 @@ mod tests { #[test] fn test_invalid_pool_idx() { let local_pool = basic_small_pool(); - let addr = StoreAddr { + let addr = StaticPoolAddr { pool_idx: 3, packet_idx: 0, - }; + } + .into(); let res = local_pool.read(&addr); assert!(res.is_err()); let err = res.unwrap_err(); @@ -700,12 +732,12 @@ mod tests { #[test] fn test_invalid_packet_idx() { let local_pool = basic_small_pool(); - let addr = StoreAddr { + let addr = StaticPoolAddr { pool_idx: 2, packet_idx: 1, }; assert_eq!(addr.raw(), 0x00020001); - let res = local_pool.read(&addr); + let res = local_pool.read(&addr.into()); assert!(res.is_err()); let err = res.unwrap_err(); assert!(matches!( diff --git a/satrs-core/src/pus/event_man.rs b/satrs-core/src/pus/event_man.rs index 132e24c..d51135a 100644 --- a/satrs-core/src/pus/event_man.rs +++ b/satrs-core/src/pus/event_man.rs @@ -82,7 +82,7 @@ pub mod heapless_mod { } } -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub enum EventRequest { Enable(Event), Disable(Event), diff --git a/satrs-core/src/pus/event_srv.rs b/satrs-core/src/pus/event_srv.rs index 1d6bbd1..49a080c 100644 --- a/satrs-core/src/pus/event_srv.rs +++ b/satrs-core/src/pus/event_srv.rs @@ -1,54 +1,39 @@ use crate::events::EventU32; use crate::pus::event_man::{EventRequest, EventRequestWithToken}; use crate::pus::verification::TcStateToken; -use crate::pus::{ - EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult, - PusPacketHandlingError, -}; -use alloc::boxed::Box; +use crate::pus::{PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError}; use spacepackets::ecss::event::Subservice; use spacepackets::ecss::PusPacket; use std::sync::mpsc::Sender; -use super::verification::VerificationReporterWithSender; -use super::{EcssTcInMemConverter, PusServiceBase, PusServiceHandler}; +use super::{EcssTcInMemConverter, PusServiceBase, PusServiceHelper}; pub struct PusService5EventHandler { - pub psb: PusServiceHandler, + pub service_helper: PusServiceHelper, event_request_tx: Sender, } impl PusService5EventHandler { pub fn new( - tc_receiver: Box, - tm_sender: Box, - tm_apid: u16, - verification_handler: VerificationReporterWithSender, - tc_in_mem_converter: TcInMemConverter, + service_handler: PusServiceHelper, event_request_tx: Sender, ) -> Self { Self { - psb: PusServiceHandler::new( - tc_receiver, - tm_sender, - tm_apid, - verification_handler, - tc_in_mem_converter, - ), + service_helper: service_handler, event_request_tx, } } pub fn handle_one_tc(&mut self) -> Result { - let possible_packet = self.psb.retrieve_and_accept_next_packet()?; + let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?; if possible_packet.is_none() { return Ok(PusPacketHandlerResult::Empty); } let ecss_tc_and_token = possible_packet.unwrap(); let tc = self - .psb + .service_helper .tc_in_mem_converter - .convert_ecss_tc_in_memory_to_reader(&ecss_tc_and_token)?; + .convert_ecss_tc_in_memory_to_reader(&ecss_tc_and_token.tc_in_memory)?; let subservice = tc.subservice(); let srv = Subservice::try_from(subservice); if srv.is_err() { @@ -60,13 +45,13 @@ impl PusService5EventHandler PusService5EventHandler, + } + + impl Pus5HandlerWithStoreTester { + pub fn new(event_request_tx: Sender) -> Self { + let (common, srv_handler) = PusServiceHandlerWithSharedStoreCommon::new(); + Self { + common, + handler: PusService5EventHandler::new(srv_handler, event_request_tx), + } + } + } + + impl PusTestHarness for Pus5HandlerWithStoreTester { + delegate! { + to self.common { + fn send_tc(&mut self, tc: &PusTcCreator) -> VerificationToken; + fn read_next_tm(&mut self) -> PusTmReader<'_>; + fn check_no_tm_available(&self) -> bool; + fn check_next_verification_tm(&self, subservice: u8, expected_request_id: RequestId); + } + + } + } + + impl SimplePusPacketHandler for Pus5HandlerWithStoreTester { + delegate! { + to self.handler { + fn handle_one_tc(&mut self) -> Result; + } + } + } + + fn event_test( + test_harness: &mut (impl PusTestHarness + SimplePusPacketHandler), + subservice: Subservice, + expected_event_req: EventRequest, + event_req_receiver: mpsc::Receiver, + ) { + let mut sp_header = SpHeader::tc(TEST_APID, SequenceFlags::Unsegmented, 0, 0).unwrap(); + let sec_header = PusTcSecondaryHeader::new_simple(5, subservice as u8); + let mut app_data = [0; 4]; + TEST_EVENT_0 + .write_to_be_bytes(&mut app_data) + .expect("writing test event failed"); + let ping_tc = PusTcCreator::new(&mut sp_header, sec_header, &app_data, true); + let token = test_harness.send_tc(&ping_tc); + let request_id = token.req_id(); + test_harness.handle_one_tc().unwrap(); + test_harness.check_next_verification_tm(1, request_id); + test_harness.check_next_verification_tm(3, request_id); + // Completion TM is not generated for us. + assert!(test_harness.check_no_tm_available()); + let event_request = event_req_receiver + .try_recv() + .expect("no event request received"); + assert_eq!(expected_event_req, event_request.request); + } + + #[test] + fn test_enabling_event_reporting() { + let (event_request_tx, event_request_rx) = mpsc::channel(); + let mut test_harness = Pus5HandlerWithStoreTester::new(event_request_tx); + event_test( + &mut test_harness, + Subservice::TcEnableEventGeneration, + EventRequest::Enable(TEST_EVENT_0), + event_request_rx, + ); + } + + #[test] + fn test_disabling_event_reporting() { + let (event_request_tx, event_request_rx) = mpsc::channel(); + let mut test_harness = Pus5HandlerWithStoreTester::new(event_request_tx); + event_test( + &mut test_harness, + Subservice::TcDisableEventGeneration, + EventRequest::Disable(TEST_EVENT_0), + event_request_rx, + ); + } + + #[test] + fn test_empty_tc_queue() { + let (event_request_tx, _) = mpsc::channel(); + let mut test_harness = Pus5HandlerWithStoreTester::new(event_request_tx); + let result = test_harness.handle_one_tc(); + assert!(result.is_ok()); + let result = result.unwrap(); + if let PusPacketHandlerResult::Empty = result { + } else { + panic!("unexpected result type {result:?}") + } + } + + #[test] + fn test_sending_custom_subservice() { + let (event_request_tx, _) = mpsc::channel(); + let mut test_harness = Pus5HandlerWithStoreTester::new(event_request_tx); + let mut sp_header = SpHeader::tc(TEST_APID, SequenceFlags::Unsegmented, 0, 0).unwrap(); + let sec_header = PusTcSecondaryHeader::new_simple(5, 200); + let ping_tc = PusTcCreator::new_no_app_data(&mut sp_header, sec_header, true); + test_harness.send_tc(&ping_tc); + let result = test_harness.handle_one_tc(); + assert!(result.is_ok()); + let result = result.unwrap(); + if let PusPacketHandlerResult::CustomSubservice(subservice, _) = result { + assert_eq!(subservice, 200); + } else { + panic!("unexpected result type {result:?}") + } + } + + #[test] + fn test_sending_invalid_app_data() { + let (event_request_tx, _) = mpsc::channel(); + let mut test_harness = Pus5HandlerWithStoreTester::new(event_request_tx); + let mut sp_header = SpHeader::tc(TEST_APID, SequenceFlags::Unsegmented, 0, 0).unwrap(); + let sec_header = + PusTcSecondaryHeader::new_simple(5, Subservice::TcEnableEventGeneration as u8); + let ping_tc = PusTcCreator::new(&mut sp_header, sec_header, &[0, 1, 2], true); + test_harness.send_tc(&ping_tc); + let result = test_harness.handle_one_tc(); + assert!(result.is_err()); + let result = result.unwrap_err(); + if let PusPacketHandlingError::NotEnoughAppData(string) = result { + assert_eq!(string, "at least 4 bytes event ID expected"); + } else { + panic!("unexpected result type {result:?}") + } + } +} diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index cb28562..7c721ee 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -194,6 +194,9 @@ pub trait EcssTcSenderCore: EcssChannel { fn send_tc(&self, tc: PusTcCreator, token: Option) -> Result<(), EcssTmtcError>; } +/// A PUS telecommand packet can be stored in memory using different methods. Right now, +/// storage inside a pool structure like [crate::pool::StaticMemoryPool], and storage inside a +/// `Vec` are supported. #[non_exhaustive] #[derive(Debug, Clone, PartialEq, Eq)] pub enum TcInMemory { @@ -215,6 +218,7 @@ impl From> for TcInMemory { } } +/// Generic structure for an ECSS PUS Telecommand and its correspoding verification token. #[derive(Debug, Clone, PartialEq, Eq)] pub struct EcssTcAndToken { pub tc_in_memory: TcInMemory, @@ -363,8 +367,9 @@ mod alloc_mod { } #[cfg(feature = "std")] +#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] pub mod std_mod { - use crate::pool::{SharedPool, StoreAddr}; + use crate::pool::{PoolProviderMemInPlaceWithGuards, SharedStaticMemoryPool, StoreAddr}; use crate::pus::verification::{ StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; @@ -469,13 +474,13 @@ pub mod std_mod { } } - pub struct MpscTcInStoreReceiver { + pub struct MpscTcReceiver { id: ChannelId, name: &'static str, receiver: mpsc::Receiver, } - impl EcssChannel for MpscTcInStoreReceiver { + impl EcssChannel for MpscTcReceiver { fn id(&self) -> ChannelId { self.id } @@ -485,7 +490,7 @@ pub mod std_mod { } } - impl EcssTcReceiverCore for MpscTcInStoreReceiver { + impl EcssTcReceiverCore for MpscTcReceiver { fn recv_tc(&self) -> Result { self.receiver.try_recv().map_err(|e| match e { TryRecvError::Empty => TryRecvTmtcError::Empty, @@ -496,7 +501,7 @@ pub mod std_mod { } } - impl MpscTcInStoreReceiver { + impl MpscTcReceiver { pub fn new( id: ChannelId, name: &'static str, @@ -513,8 +518,8 @@ pub mod std_mod { #[derive(Clone)] pub struct MpscTmAsVecSender { id: ChannelId, - sender: mpsc::Sender>, name: &'static str, + sender: mpsc::Sender>, } impl From>> for EcssTmtcError { @@ -599,13 +604,13 @@ pub mod std_mod { } } - pub struct CrossbeamTcInStoreReceiver { + pub struct CrossbeamTcReceiver { id: ChannelId, name: &'static str, receiver: cb::Receiver, } - impl CrossbeamTcInStoreReceiver { + impl CrossbeamTcReceiver { pub fn new( id: ChannelId, name: &'static str, @@ -615,7 +620,7 @@ pub mod std_mod { } } - impl EcssChannel for CrossbeamTcInStoreReceiver { + impl EcssChannel for CrossbeamTcReceiver { fn id(&self) -> ChannelId { self.id } @@ -625,7 +630,7 @@ pub mod std_mod { } } - impl EcssTcReceiverCore for CrossbeamTcInStoreReceiver { + impl EcssTcReceiverCore for CrossbeamTcReceiver { fn recv_tc(&self) -> Result { self.receiver.try_recv().map_err(|e| match e { cb::TryRecvError::Empty => TryRecvTmtcError::Empty, @@ -689,36 +694,40 @@ pub mod std_mod { } pub trait EcssTcInMemConverter { - fn cache_ecss_tc_in_memory<'a>( - &'a mut self, - possible_packet: &'a AcceptedEcssTcAndToken, + fn cache_ecss_tc_in_memory( + &mut self, + possible_packet: &TcInMemory, ) -> Result<(), PusPacketHandlingError>; fn tc_slice_raw(&self) -> &[u8]; - fn convert_ecss_tc_in_memory_to_reader<'a>( - &'a mut self, - possible_packet: &'a AcceptedEcssTcAndToken, - ) -> Result, PusPacketHandlingError> { + fn convert_ecss_tc_in_memory_to_reader( + &mut self, + possible_packet: &TcInMemory, + ) -> Result, PusPacketHandlingError> { self.cache_ecss_tc_in_memory(possible_packet)?; Ok(PusTcReader::new(self.tc_slice_raw())?.0) } } + /// Converter structure for PUS telecommands which are stored inside a `Vec` structure. + /// Please note that this structure is not able to convert TCs which are stored inside a + /// [SharedStaticMemoryPool]. + #[derive(Default, Clone)] pub struct EcssTcInVecConverter { pub pus_tc_raw: Option>, } impl EcssTcInMemConverter for EcssTcInVecConverter { - fn cache_ecss_tc_in_memory<'a>( - &'a mut self, - possible_packet: &'a AcceptedEcssTcAndToken, + fn cache_ecss_tc_in_memory( + &mut self, + tc_in_memory: &TcInMemory, ) -> Result<(), PusPacketHandlingError> { self.pus_tc_raw = None; - match &possible_packet.tc_in_memory { + match tc_in_memory { super::TcInMemory::StoreAddr(_) => { return Err(PusPacketHandlingError::InvalidTcInMemoryFormat( - possible_packet.tc_in_memory.clone(), + tc_in_memory.clone(), )); } super::TcInMemory::Vec(vec) => { @@ -736,13 +745,17 @@ pub mod std_mod { } } - pub struct EcssTcInStoreConverter { - pub shared_tc_store: SharedPool, - pub pus_buf: Vec, + /// Converter structure for PUS telecommands which are stored inside + /// [SharedStaticMemoryPool] structure. This is useful if run-time allocation for these + /// packets should be avoided. Please note that this structure is not able to convert TCs which + /// are stored as a `Vec`. + pub struct EcssTcInSharedStoreConverter { + shared_tc_store: SharedStaticMemoryPool, + pus_buf: Vec, } - impl EcssTcInStoreConverter { - pub fn new(shared_tc_store: SharedPool, max_expected_tc_size: usize) -> Self { + impl EcssTcInSharedStoreConverter { + pub fn new(shared_tc_store: SharedStaticMemoryPool, max_expected_tc_size: usize) -> Self { Self { shared_tc_store, pus_buf: alloc::vec![0; max_expected_tc_size], @@ -765,18 +778,18 @@ pub mod std_mod { } } - impl EcssTcInMemConverter for EcssTcInStoreConverter { - fn cache_ecss_tc_in_memory<'a>( - &'a mut self, - possible_packet: &'a AcceptedEcssTcAndToken, + impl EcssTcInMemConverter for EcssTcInSharedStoreConverter { + fn cache_ecss_tc_in_memory( + &mut self, + tc_in_memory: &TcInMemory, ) -> Result<(), PusPacketHandlingError> { - match &possible_packet.tc_in_memory { + match tc_in_memory { super::TcInMemory::StoreAddr(addr) => { self.copy_tc_to_buf(*addr)?; } super::TcInMemory::Vec(_) => { return Err(PusPacketHandlingError::InvalidTcInMemoryFormat( - possible_packet.tc_in_memory.clone(), + tc_in_memory.clone(), )); } }; @@ -821,18 +834,21 @@ pub mod std_mod { } } - /// Base class for handlers which can handle PUS TC packets. Right now, the verification - /// reporter is constrained to the [StdVerifReporterWithSender] and the service handler - /// relies on TMTC packets being exchanged via a [SharedPool]. Please note that this variant - /// of the PUS service base is not optimized for handling packets sent as a `Vec` and - /// might perform additional copies to the internal buffer as well. The class should - /// still behave correctly. - pub struct PusServiceHandler { + /// This is a high-level PUS packet handler helper. + /// + /// It performs some of the boilerplate acitivities involved when handling PUS telecommands and + /// it can be used to implement the handling of PUS telecommands for certain PUS telecommands + /// groups (for example individual services). + /// + /// This base class can handle PUS telecommands backed by different memory storage machanisms + /// by using the [EcssTcInMemConverter] abstraction. This object provides some convenience + /// methods to make the generic parts of TC handling easier. + pub struct PusServiceHelper { pub common: PusServiceBase, pub tc_in_mem_converter: TcInMemConverter, } - impl PusServiceHandler { + impl PusServiceHelper { pub fn new( tc_receiver: Box, tm_sender: Box, @@ -851,6 +867,11 @@ pub mod std_mod { } } + /// This function can be used to poll the internal [EcssTcReceiver] object for the next + /// telecommand packet. It will return `Ok(None)` if there are not packets available. + /// In any other case, it will perform the acceptance of the ECSS TC packet using the + /// internal [VerificationReporterWithSender] object. It will then return the telecommand + /// and the according accepted token. pub fn retrieve_and_accept_next_packet( &mut self, ) -> Result, PusPacketHandlingError> { @@ -893,10 +914,35 @@ pub(crate) fn source_buffer_large_enough(cap: usize, len: usize) -> Result<(), E } #[cfg(test)] -pub(crate) mod tests { - use spacepackets::ecss::tm::{GenericPusTmSecondaryHeader, PusTmCreator}; +pub mod tests { + use std::sync::mpsc::TryRecvError; + use std::sync::{mpsc, RwLock}; + + use alloc::boxed::Box; + use alloc::vec; + use spacepackets::ecss::tc::PusTcCreator; + use spacepackets::ecss::tm::{GenericPusTmSecondaryHeader, PusTmCreator, PusTmReader}; + use spacepackets::ecss::{PusPacket, WritablePusPacket}; use spacepackets::CcsdsPacket; + use crate::pool::{ + PoolProviderMemInPlace, SharedStaticMemoryPool, StaticMemoryPool, StaticPoolConfig, + StoreAddr, + }; + use crate::pus::verification::RequestId; + use crate::tmtc::tm_helper::SharedTmStore; + + use super::verification::{ + TcStateAccepted, VerificationReporterCfg, VerificationReporterWithSender, VerificationToken, + }; + use super::{ + EcssTcAndToken, EcssTcInSharedStoreConverter, EcssTcInVecConverter, MpscTcReceiver, + MpscTmAsVecSender, MpscTmInStoreSender, PusPacketHandlerResult, PusPacketHandlingError, + PusServiceHelper, TcInMemory, + }; + + pub const TEST_APID: u16 = 0x101; + #[derive(Debug, Eq, PartialEq, Clone)] pub(crate) struct CommonTmInfo { pub subservice: u8, @@ -906,12 +952,23 @@ pub(crate) mod tests { pub time_stamp: [u8; 7], } + pub trait PusTestHarness { + fn send_tc(&mut self, tc: &PusTcCreator) -> VerificationToken; + fn read_next_tm(&mut self) -> PusTmReader<'_>; + fn check_no_tm_available(&self) -> bool; + fn check_next_verification_tm(&self, subservice: u8, expected_request_id: RequestId); + } + + pub trait SimplePusPacketHandler { + fn handle_one_tc(&mut self) -> Result; + } + impl CommonTmInfo { pub fn new_from_tm(tm: &PusTmCreator) -> Self { let mut time_stamp = [0; 7]; time_stamp.clone_from_slice(&tm.timestamp()[0..7]); Self { - subservice: tm.subservice(), + subservice: PusPacket::subservice(tm), apid: tm.apid(), msg_counter: tm.msg_counter(), dest_id: tm.dest_id(), @@ -919,4 +976,192 @@ pub(crate) mod tests { } } } + + /// Common fields for a PUS service test harness. + pub struct PusServiceHandlerWithSharedStoreCommon { + pus_buf: [u8; 2048], + tm_buf: [u8; 2048], + tc_pool: SharedStaticMemoryPool, + tm_pool: SharedTmStore, + tc_sender: mpsc::Sender, + tm_receiver: mpsc::Receiver, + verification_handler: VerificationReporterWithSender, + } + + impl PusServiceHandlerWithSharedStoreCommon { + /// This function generates the structure in addition to the PUS service handler + /// [PusServiceHandler] which might be required for a specific PUS service handler. + /// + /// The PUS service handler is instantiated with a [EcssTcInStoreConverter]. + pub fn new() -> (Self, PusServiceHelper) { + let pool_cfg = StaticPoolConfig::new(vec![(16, 16), (8, 32), (4, 64)]); + let tc_pool = StaticMemoryPool::new(pool_cfg.clone()); + let tm_pool = StaticMemoryPool::new(pool_cfg); + let shared_tc_pool = SharedStaticMemoryPool::new(RwLock::new(tc_pool)); + let shared_tm_pool = SharedTmStore::new(tm_pool); + let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel(); + let (tm_tx, tm_rx) = mpsc::channel(); + + let verif_sender = + MpscTmInStoreSender::new(0, "verif_sender", shared_tm_pool.clone(), tm_tx.clone()); + let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); + let verification_handler = + VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); + let test_srv_tm_sender = + MpscTmInStoreSender::new(0, "TEST_SENDER", shared_tm_pool.clone(), tm_tx); + let test_srv_tc_receiver = MpscTcReceiver::new(0, "TEST_RECEIVER", test_srv_tc_rx); + let in_store_converter = + EcssTcInSharedStoreConverter::new(shared_tc_pool.clone(), 2048); + ( + Self { + pus_buf: [0; 2048], + tm_buf: [0; 2048], + tc_pool: shared_tc_pool, + tm_pool: shared_tm_pool, + tc_sender: test_srv_tc_tx, + tm_receiver: tm_rx, + verification_handler: verification_handler.clone(), + }, + PusServiceHelper::new( + Box::new(test_srv_tc_receiver), + Box::new(test_srv_tm_sender), + TEST_APID, + verification_handler, + in_store_converter, + ), + ) + } + pub fn send_tc(&mut self, tc: &PusTcCreator) -> VerificationToken { + let token = self.verification_handler.add_tc(tc); + let token = self + .verification_handler + .acceptance_success(token, Some(&[0; 7])) + .unwrap(); + let tc_size = tc.write_to_bytes(&mut self.pus_buf).unwrap(); + let mut tc_pool = self.tc_pool.write().unwrap(); + let addr = tc_pool.add(&self.pus_buf[..tc_size]).unwrap(); + drop(tc_pool); + // Send accepted TC to test service handler. + self.tc_sender + .send(EcssTcAndToken::new(addr, token)) + .expect("sending tc failed"); + token + } + + pub fn read_next_tm(&mut self) -> PusTmReader<'_> { + let next_msg = self.tm_receiver.try_recv(); + assert!(next_msg.is_ok()); + let tm_addr = next_msg.unwrap(); + let tm_pool = self.tm_pool.shared_pool.read().unwrap(); + let tm_raw = tm_pool.read(&tm_addr).unwrap(); + self.tm_buf[0..tm_raw.len()].copy_from_slice(tm_raw); + PusTmReader::new(&self.tm_buf, 7).unwrap().0 + } + + pub fn check_no_tm_available(&self) -> bool { + let next_msg = self.tm_receiver.try_recv(); + if let TryRecvError::Empty = next_msg.unwrap_err() { + return true; + } + false + } + + pub fn check_next_verification_tm(&self, subservice: u8, expected_request_id: RequestId) { + let next_msg = self.tm_receiver.try_recv(); + assert!(next_msg.is_ok()); + let tm_addr = next_msg.unwrap(); + let tm_pool = self.tm_pool.shared_pool.read().unwrap(); + let tm_raw = tm_pool.read(&tm_addr).unwrap(); + let tm = PusTmReader::new(tm_raw, 7).unwrap().0; + assert_eq!(PusPacket::service(&tm), 1); + assert_eq!(PusPacket::subservice(&tm), subservice); + assert_eq!(tm.apid(), TEST_APID); + let req_id = + RequestId::from_bytes(tm.user_data()).expect("generating request ID failed"); + assert_eq!(req_id, expected_request_id); + } + } + + pub struct PusServiceHandlerWithVecCommon { + current_tm: Option>, + tc_sender: mpsc::Sender, + tm_receiver: mpsc::Receiver>, + verification_handler: VerificationReporterWithSender, + } + + impl PusServiceHandlerWithVecCommon { + pub fn new() -> (Self, PusServiceHelper) { + let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel(); + let (tm_tx, tm_rx) = mpsc::channel(); + + let verif_sender = MpscTmAsVecSender::new(0, "verififcatio-sender", tm_tx.clone()); + let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); + let verification_handler = + VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); + let test_srv_tm_sender = MpscTmAsVecSender::new(0, "test-sender", tm_tx); + let test_srv_tc_receiver = MpscTcReceiver::new(0, "test-receiver", test_srv_tc_rx); + let in_store_converter = EcssTcInVecConverter::default(); + ( + Self { + current_tm: None, + tc_sender: test_srv_tc_tx, + tm_receiver: tm_rx, + verification_handler: verification_handler.clone(), + }, + PusServiceHelper::new( + Box::new(test_srv_tc_receiver), + Box::new(test_srv_tm_sender), + TEST_APID, + verification_handler, + in_store_converter, + ), + ) + } + + pub fn send_tc(&mut self, tc: &PusTcCreator) -> VerificationToken { + let token = self.verification_handler.add_tc(tc); + let token = self + .verification_handler + .acceptance_success(token, Some(&[0; 7])) + .unwrap(); + // Send accepted TC to test service handler. + self.tc_sender + .send(EcssTcAndToken::new( + TcInMemory::Vec(tc.to_vec().expect("pus tc conversion to vec failed")), + token, + )) + .expect("sending tc failed"); + token + } + + pub fn read_next_tm(&mut self) -> PusTmReader<'_> { + let next_msg = self.tm_receiver.try_recv(); + assert!(next_msg.is_ok()); + self.current_tm = Some(next_msg.unwrap()); + PusTmReader::new(self.current_tm.as_ref().unwrap(), 7) + .unwrap() + .0 + } + + pub fn check_no_tm_available(&self) -> bool { + let next_msg = self.tm_receiver.try_recv(); + if let TryRecvError::Empty = next_msg.unwrap_err() { + return true; + } + false + } + + pub fn check_next_verification_tm(&self, subservice: u8, expected_request_id: RequestId) { + let next_msg = self.tm_receiver.try_recv(); + assert!(next_msg.is_ok()); + let next_msg = next_msg.unwrap(); + let tm = PusTmReader::new(next_msg.as_slice(), 7).unwrap().0; + assert_eq!(PusPacket::service(&tm), 1); + assert_eq!(PusPacket::subservice(&tm), subservice); + assert_eq!(tm.apid(), TEST_APID); + let req_id = + RequestId::from_bytes(tm.user_data()).expect("generating request ID failed"); + assert_eq!(req_id, expected_request_id); + } + } } diff --git a/satrs-core/src/pus/scheduler.rs b/satrs-core/src/pus/scheduler.rs index 392eb87..d0f6a72 100644 --- a/satrs-core/src/pus/scheduler.rs +++ b/satrs-core/src/pus/scheduler.rs @@ -2,23 +2,25 @@ //! //! The core data structure of this module is the [PusScheduler]. This structure can be used //! to perform the scheduling of telecommands like specified in the ECSS standard. -use core::fmt::Debug; +use core::fmt::{Debug, Display, Formatter}; +use core::time::Duration; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; use spacepackets::ecss::scheduling::TimeWindowType; -use spacepackets::ecss::tc::{GenericPusTcSecondaryHeader, IsPusTelecommand}; -use spacepackets::time::CcsdsTimeProvider; +use spacepackets::ecss::tc::{GenericPusTcSecondaryHeader, IsPusTelecommand, PusTcReader}; +use spacepackets::ecss::{PusError, PusPacket}; +use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; use spacepackets::CcsdsPacket; #[cfg(feature = "std")] use std::error::Error; -use crate::pool::StoreAddr; +use crate::pool::{PoolProviderMemInPlace, StoreError}; #[cfg(feature = "alloc")] pub use alloc_mod::*; /// This is the request ID as specified in ECSS-E-ST-70-41C 5.4.11.2 of the standard. /// -/// This version of the request ID is used to identify scheduled commands and also contains +/// This version of the request ID is used to identify scheduled commands and also contains /// the source ID found in the secondary header of PUS telecommands. #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] @@ -56,17 +58,19 @@ impl RequestId { } } +pub type AddrInStore = u64; + /// This is the format stored internally by the TC scheduler for each scheduled telecommand. -/// It consists of the address of that telecommand in the TC pool and a request ID. +/// It consists of a generic address for that telecommand in the TC pool and a request ID. #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct TcInfo { - addr: StoreAddr, + addr: AddrInStore, request_id: RequestId, } impl TcInfo { - pub fn addr(&self) -> StoreAddr { + pub fn addr(&self) -> AddrInStore { self.addr } @@ -74,7 +78,7 @@ impl TcInfo { self.request_id } - pub fn new(addr: StoreAddr, request_id: RequestId) -> Self { + pub fn new(addr: u64, request_id: RequestId) -> Self { TcInfo { addr, request_id } } } @@ -133,23 +137,172 @@ impl TimeWindow { } } +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum ScheduleError { + PusError(PusError), + /// The release time is within the time-margin added on top of the current time. + /// The first parameter is the current time, the second one the time margin, and the third one + /// the release time. + ReleaseTimeInTimeMargin { + current_time: UnixTimestamp, + time_margin: Duration, + release_time: UnixTimestamp, + }, + /// Nested time-tagged commands are not allowed. + NestedScheduledTc, + StoreError(StoreError), + TcDataEmpty, + TimestampError(TimestampError), + WrongSubservice, + WrongService, +} + +impl Display for ScheduleError { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + match self { + ScheduleError::PusError(e) => { + write!(f, "Pus Error: {e}") + } + ScheduleError::ReleaseTimeInTimeMargin { + current_time, + time_margin, + release_time, + } => { + write!( + f, + "Error: time margin too short, current time: {current_time:?}, time margin: {time_margin:?}, release time: {release_time:?}" + ) + } + ScheduleError::NestedScheduledTc => { + write!(f, "Error: nested scheduling is not allowed") + } + ScheduleError::StoreError(e) => { + write!(f, "Store Error: {e}") + } + ScheduleError::TcDataEmpty => { + write!(f, "Error: empty Tc Data field") + } + ScheduleError::TimestampError(e) => { + write!(f, "Timestamp Error: {e}") + } + ScheduleError::WrongService => { + write!(f, "Error: Service not 11.") + } + ScheduleError::WrongSubservice => { + write!(f, "Error: Subservice not 4.") + } + } + } +} + +impl From for ScheduleError { + fn from(e: PusError) -> Self { + ScheduleError::PusError(e) + } +} + +impl From for ScheduleError { + fn from(e: StoreError) -> Self { + ScheduleError::StoreError(e) + } +} + +impl From for ScheduleError { + fn from(e: TimestampError) -> Self { + ScheduleError::TimestampError(e) + } +} + +#[cfg(feature = "std")] +impl Error for ScheduleError {} + +pub trait PusSchedulerInterface { + type TimeProvider: CcsdsTimeProvider + TimeReader; + + fn reset( + &mut self, + store: &mut (impl PoolProviderMemInPlace + ?Sized), + ) -> Result<(), StoreError>; + + fn is_enabled(&self) -> bool; + + fn enable(&mut self); + + /// A disabled scheduler should still delete commands where the execution time has been reached + /// but should not release them to be executed. + fn disable(&mut self); + + /// Insert a telecommand which was already unwrapped from the outer Service 11 packet and stored + /// inside the telecommand packet pool. + fn insert_unwrapped_and_stored_tc( + &mut self, + time_stamp: UnixTimestamp, + info: TcInfo, + ) -> Result<(), ScheduleError>; + + /// Insert a telecommand based on the fully wrapped time-tagged telecommand. The timestamp + /// provider needs to be supplied via a generic. + fn insert_wrapped_tc( + &mut self, + pus_tc: &(impl IsPusTelecommand + PusPacket + GenericPusTcSecondaryHeader), + pool: &mut (impl PoolProviderMemInPlace + ?Sized), + ) -> Result { + if PusPacket::service(pus_tc) != 11 { + return Err(ScheduleError::WrongService); + } + if PusPacket::subservice(pus_tc) != 4 { + return Err(ScheduleError::WrongSubservice); + } + if pus_tc.user_data().is_empty() { + return Err(ScheduleError::TcDataEmpty); + } + let user_data = pus_tc.user_data(); + let stamp: Self::TimeProvider = TimeReader::from_bytes(user_data)?; + let unix_stamp = stamp.unix_stamp(); + let stamp_len = stamp.len_as_bytes(); + self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool) + } + + /// Insert a telecommand which was already unwrapped from the outer Service 11 packet but still + /// needs to be stored inside the telecommand pool. + fn insert_unwrapped_tc( + &mut self, + time_stamp: UnixTimestamp, + tc: &[u8], + pool: &mut (impl PoolProviderMemInPlace + ?Sized), + ) -> Result { + let check_tc = PusTcReader::new(tc)?; + if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { + return Err(ScheduleError::NestedScheduledTc); + } + let req_id = RequestId::from_tc(&check_tc.0); + + match pool.add(tc) { + Ok(addr) => { + let info = TcInfo::new(addr, req_id); + self.insert_unwrapped_and_stored_tc(time_stamp, info)?; + Ok(info) + } + Err(err) => Err(err.into()), + } + } +} + #[cfg(feature = "alloc")] pub mod alloc_mod { use super::*; - use crate::pool::{PoolProvider, StoreAddr, StoreError}; + use crate::pool::{PoolProviderMemInPlace, StoreAddr, StoreError}; use alloc::collections::btree_map::{Entry, Range}; use alloc::collections::BTreeMap; use alloc::vec; use alloc::vec::Vec; - use core::fmt::{Display, Formatter}; use core::time::Duration; use spacepackets::ecss::scheduling::TimeWindowType; - use spacepackets::ecss::tc::{ - GenericPusTcSecondaryHeader, IsPusTelecommand, PusTc, PusTcReader, - }; - use spacepackets::ecss::{PusError, PusPacket}; + use spacepackets::ecss::tc::{PusTc, PusTcReader}; + use spacepackets::ecss::PusPacket; use spacepackets::time::cds::DaysLen24Bits; - use spacepackets::time::{cds, CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; + use spacepackets::time::{cds, CcsdsTimeProvider, UnixTimestamp}; #[cfg(feature = "std")] use std::time::SystemTimeError; @@ -159,86 +312,14 @@ pub mod alloc_mod { WithStoreDeletion(Result), } - #[derive(Debug, Clone, PartialEq, Eq)] - #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] - pub enum ScheduleError { - PusError(PusError), - /// The release time is within the time-margin added on top of the current time. - /// The first parameter is the current time, the second one the time margin, and the third one - /// the release time. - ReleaseTimeInTimeMargin(UnixTimestamp, Duration, UnixTimestamp), - /// Nested time-tagged commands are not allowed. - NestedScheduledTc, - StoreError(StoreError), - TcDataEmpty, - TimestampError(TimestampError), - WrongSubservice, - WrongService, - } - - impl Display for ScheduleError { - fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { - match self { - ScheduleError::PusError(e) => { - write!(f, "Pus Error: {e}") - } - ScheduleError::ReleaseTimeInTimeMargin(current_time, margin, timestamp) => { - write!( - f, - "Error: time margin too short, current time: {current_time:?}, time margin: {margin:?}, release time: {timestamp:?}" - ) - } - ScheduleError::NestedScheduledTc => { - write!(f, "Error: nested scheduling is not allowed") - } - ScheduleError::StoreError(e) => { - write!(f, "Store Error: {e}") - } - ScheduleError::TcDataEmpty => { - write!(f, "Error: empty Tc Data field") - } - ScheduleError::TimestampError(e) => { - write!(f, "Timestamp Error: {e}") - } - ScheduleError::WrongService => { - write!(f, "Error: Service not 11.") - } - ScheduleError::WrongSubservice => { - write!(f, "Error: Subservice not 4.") - } - } - } - } - - impl From for ScheduleError { - fn from(e: PusError) -> Self { - ScheduleError::PusError(e) - } - } - - impl From for ScheduleError { - fn from(e: StoreError) -> Self { - ScheduleError::StoreError(e) - } - } - - impl From for ScheduleError { - fn from(e: TimestampError) -> Self { - ScheduleError::TimestampError(e) - } - } - - #[cfg(feature = "std")] - impl Error for ScheduleError {} - /// This is the core data structure for scheduling PUS telecommands with [alloc] support. /// /// It is assumed that the actual telecommand data is stored in a separate TC pool offering - /// a [crate::pool::PoolProvider] API. This data structure just tracks the store addresses and their - /// release times and offers a convenient API to insert and release telecommands and perform - /// other functionality specified by the ECSS standard in section 6.11. The time is tracked - /// as a [spacepackets::time::UnixTimestamp] but the only requirement to the timekeeping of - /// the user is that it is convertible to that timestamp. + /// a [crate::pool::PoolProviderMemInPlace] API. This data structure just tracks the store + /// addresses and their release times and offers a convenient API to insert and release + /// telecommands and perform other functionality specified by the ECSS standard in section 6.11. + /// The time is tracked as a [spacepackets::time::UnixTimestamp] but the only requirement to + /// the timekeeping of the user is that it is convertible to that timestamp. /// /// The standard also specifies that the PUS scheduler can be enabled and disabled. /// A disabled scheduler should still delete commands where the execution time has been reached @@ -292,44 +373,6 @@ pub mod alloc_mod { num_entries } - pub fn is_enabled(&self) -> bool { - self.enabled - } - - pub fn enable(&mut self) { - self.enabled = true; - } - - /// A disabled scheduler should still delete commands where the execution time has been reached - /// but should not release them to be executed. - pub fn disable(&mut self) { - self.enabled = false; - } - - /// This will disable the scheduler and clear the schedule as specified in 6.11.4.4. - /// Be careful with this command as it will delete all the commands in the schedule. - /// - /// The holding store for the telecommands needs to be passed so all the stored telecommands - /// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error - /// will be returned but the method will still try to delete all the commands in the schedule. - pub fn reset( - &mut self, - store: &mut (impl PoolProvider + ?Sized), - ) -> Result<(), StoreError> { - self.enabled = false; - let mut deletion_ok = Ok(()); - for tc_lists in &mut self.tc_map { - for tc in tc_lists.1 { - let res = store.delete(tc.addr); - if res.is_err() { - deletion_ok = res; - } - } - } - self.tc_map.clear(); - deletion_ok - } - pub fn update_time(&mut self, current_time: UnixTimestamp) { self.current_time = current_time; } @@ -346,11 +389,11 @@ pub mod alloc_mod { info: TcInfo, ) -> Result<(), ScheduleError> { if time_stamp < self.current_time + self.time_margin { - return Err(ScheduleError::ReleaseTimeInTimeMargin( - self.current_time, - self.time_margin, - time_stamp, - )); + return Err(ScheduleError::ReleaseTimeInTimeMargin { + current_time: self.current_time, + time_margin: self.time_margin, + release_time: time_stamp, + }); } match self.tc_map.entry(time_stamp) { Entry::Vacant(e) => { @@ -369,7 +412,7 @@ pub mod alloc_mod { &mut self, time_stamp: UnixTimestamp, tc: &[u8], - pool: &mut (impl PoolProvider + ?Sized), + pool: &mut (impl PoolProviderMemInPlace + ?Sized), ) -> Result { let check_tc = PusTcReader::new(tc)?; if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { @@ -387,35 +430,12 @@ pub mod alloc_mod { } } - /// Insert a telecommand based on the fully wrapped time-tagged telecommand. The timestamp - /// provider needs to be supplied via a generic. - pub fn insert_wrapped_tc( - &mut self, - pus_tc: &(impl IsPusTelecommand + PusPacket + GenericPusTcSecondaryHeader), - pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - if PusPacket::service(pus_tc) != 11 { - return Err(ScheduleError::WrongService); - } - if PusPacket::subservice(pus_tc) != 4 { - return Err(ScheduleError::WrongSubservice); - } - if pus_tc.user_data().is_empty() { - return Err(ScheduleError::TcDataEmpty); - } - let user_data = pus_tc.user_data(); - let stamp: TimeStamp = TimeReader::from_bytes(user_data)?; - let unix_stamp = stamp.unix_stamp(); - let stamp_len = stamp.len_as_bytes(); - self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool) - } - /// Insert a telecommand based on the fully wrapped time-tagged telecommand using a CDS /// short timestamp with 16-bit length of days field. pub fn insert_wrapped_tc_cds_short( &mut self, pus_tc: &PusTc, - pool: &mut (impl PoolProvider + ?Sized), + pool: &mut (impl PoolProviderMemInPlace + ?Sized), ) -> Result { self.insert_wrapped_tc::(pus_tc, pool) } @@ -425,7 +445,7 @@ pub mod alloc_mod { pub fn insert_wrapped_tc_cds_long( &mut self, pus_tc: &PusTc, - pool: &mut (impl PoolProvider + ?Sized), + pool: &mut (impl PoolProviderMemInPlace + ?Sized), ) -> Result { self.insert_wrapped_tc::>(pus_tc, pool) } @@ -441,7 +461,7 @@ pub mod alloc_mod { pub fn delete_by_time_filter( &mut self, time_window: TimeWindow, - pool: &mut (impl PoolProvider + ?Sized), + pool: &mut (impl PoolProviderMemInPlace + ?Sized), ) -> Result { let range = self.retrieve_by_time_filter(time_window); let mut del_packets = 0; @@ -471,7 +491,7 @@ pub mod alloc_mod { /// the last deletion will be supplied in addition to the number of deleted commands. pub fn delete_all( &mut self, - pool: &mut (impl PoolProvider + ?Sized), + pool: &mut (impl PoolProviderMemInPlace + ?Sized), ) -> Result { self.delete_by_time_filter(TimeWindow::::new_select_all(), pool) } @@ -518,7 +538,7 @@ pub mod alloc_mod { /// called repeatedly. pub fn delete_by_request_id(&mut self, req_id: &RequestId) -> Option { if let DeletionResult::WithoutStoreDeletion(v) = - self.delete_by_request_id_internal(req_id, None::<&mut dyn PoolProvider>) + self.delete_by_request_id_internal_without_store_deletion(req_id) { return v; } @@ -529,20 +549,19 @@ pub mod alloc_mod { pub fn delete_by_request_id_and_from_pool( &mut self, req_id: &RequestId, - pool: &mut (impl PoolProvider + ?Sized), + pool: &mut (impl PoolProviderMemInPlace + ?Sized), ) -> Result { if let DeletionResult::WithStoreDeletion(v) = - self.delete_by_request_id_internal(req_id, Some(pool)) + self.delete_by_request_id_internal_with_store_deletion(req_id, pool) { return v; } panic!("unexpected deletion result"); } - fn delete_by_request_id_internal( + fn delete_by_request_id_internal_without_store_deletion( &mut self, req_id: &RequestId, - pool: Option<&mut (impl PoolProvider + ?Sized)>, ) -> DeletionResult { let mut idx_found = None; for time_bucket in &mut self.tc_map { @@ -553,24 +572,33 @@ pub mod alloc_mod { } if let Some(idx) = idx_found { let addr = time_bucket.1.remove(idx).addr; - if let Some(pool) = pool { - return match pool.delete(addr) { - Ok(_) => DeletionResult::WithStoreDeletion(Ok(true)), - Err(e) => DeletionResult::WithStoreDeletion(Err(e)), - }; - } return DeletionResult::WithoutStoreDeletion(Some(addr)); } } - if pool.is_none() { - DeletionResult::WithoutStoreDeletion(None) - } else { - DeletionResult::WithStoreDeletion(Ok(false)) - } + DeletionResult::WithoutStoreDeletion(None) } - /// Retrieve all telecommands which should be release based on the current time. - pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec> { - self.tc_map.range(..=self.current_time) + + fn delete_by_request_id_internal_with_store_deletion( + &mut self, + req_id: &RequestId, + pool: &mut (impl PoolProviderMemInPlace + ?Sized), + ) -> DeletionResult { + let mut idx_found = None; + for time_bucket in &mut self.tc_map { + for (idx, tc_info) in time_bucket.1.iter().enumerate() { + if &tc_info.request_id == req_id { + idx_found = Some(idx); + } + } + if let Some(idx) = idx_found { + let addr = time_bucket.1.remove(idx).addr; + return match pool.delete(addr) { + Ok(_) => DeletionResult::WithStoreDeletion(Ok(true)), + Err(e) => DeletionResult::WithStoreDeletion(Err(e)), + }; + } + } + DeletionResult::WithStoreDeletion(Ok(false)) } #[cfg(feature = "std")] @@ -582,29 +610,31 @@ pub mod alloc_mod { /// Utility method which calls [Self::telecommands_to_release] and then calls a releaser /// closure for each telecommand which should be released. This function will also delete - /// the telecommands from the holding store after calling the release closure, if the scheduler - /// is disabled. + /// the telecommands from the holding store after calling the release closure if the user + /// returns [true] from the release closure. /// /// # Arguments /// /// * `releaser` - Closure where the first argument is whether the scheduler is enabled and - /// the second argument is the telecommand information also containing the store address. - /// This closure should return whether the command should be deleted if the scheduler is - /// disabled to prevent memory leaks. - /// * `store` - The holding store of the telecommands. - pub fn release_telecommands bool>( + /// the second argument is the telecommand information also containing the store + /// address. This closure should return whether the command should be deleted. Please + /// note that returning false might lead to memory leaks if the TC is not cleared from + /// the store in some other way. + /// * `tc_store` - The holding store of the telecommands. + pub fn release_telecommands bool>( &mut self, mut releaser: R, - tc_store: &mut (impl PoolProvider + ?Sized), + tc_store: &mut (impl PoolProviderMemInPlace + ?Sized), ) -> Result { let tcs_to_release = self.telecommands_to_release(); let mut released_tcs = 0; let mut store_error = Ok(()); for tc in tcs_to_release { for info in tc.1 { - let should_delete = releaser(self.enabled, info); + let tc = tc_store.read(&info.addr).map_err(|e| (released_tcs, e))?; + let should_delete = releaser(self.enabled, info, tc); released_tcs += 1; - if should_delete && !self.is_enabled() { + if should_delete { let res = tc_store.delete(info.addr); if res.is_err() { store_error = res; @@ -617,13 +647,112 @@ pub mod alloc_mod { .map(|_| released_tcs) .map_err(|e| (released_tcs, e)) } + + /// This utility method is similar to [Self::release_telecommands] but will not perform + /// store deletions and thus does not require a mutable reference of the TC store. + /// + /// It will returns a [Vec] of [TcInfo]s to transfer the list of released + /// telecommands to the user. The user should take care of deleting those telecommands + /// from the holding store to prevent memory leaks. + pub fn release_telecommands_no_deletion( + &mut self, + mut releaser: R, + tc_store: &(impl PoolProviderMemInPlace + ?Sized), + ) -> Result, (Vec, StoreError)> { + let tcs_to_release = self.telecommands_to_release(); + let mut released_tcs = Vec::new(); + for tc in tcs_to_release { + for info in tc.1 { + let tc = tc_store + .read(&info.addr) + .map_err(|e| (released_tcs.clone(), e))?; + releaser(self.is_enabled(), info, tc); + released_tcs.push(*info); + } + } + self.tc_map.retain(|k, _| k > &self.current_time); + Ok(released_tcs) + } + + /// Retrieve all telecommands which should be release based on the current time. + pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec> { + self.tc_map.range(..=self.current_time) + } + } + + impl PusSchedulerInterface for PusScheduler { + type TimeProvider = cds::TimeProvider; + + /// This will disable the scheduler and clear the schedule as specified in 6.11.4.4. + /// Be careful with this command as it will delete all the commands in the schedule. + /// + /// The holding store for the telecommands needs to be passed so all the stored telecommands + /// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error + /// will be returned but the method will still try to delete all the commands in the schedule. + fn reset( + &mut self, + store: &mut (impl PoolProviderMemInPlace + ?Sized), + ) -> Result<(), StoreError> { + self.enabled = false; + let mut deletion_ok = Ok(()); + for tc_lists in &mut self.tc_map { + for tc in tc_lists.1 { + let res = store.delete(tc.addr); + if res.is_err() { + deletion_ok = res; + } + } + } + self.tc_map.clear(); + deletion_ok + } + + fn is_enabled(&self) -> bool { + self.enabled + } + + fn enable(&mut self) { + self.enabled = true; + } + + /// A disabled scheduler should still delete commands where the execution time has been reached + /// but should not release them to be executed. + fn disable(&mut self) { + self.enabled = false; + } + + fn insert_unwrapped_and_stored_tc( + &mut self, + time_stamp: UnixTimestamp, + info: TcInfo, + ) -> Result<(), ScheduleError> { + if time_stamp < self.current_time + self.time_margin { + return Err(ScheduleError::ReleaseTimeInTimeMargin { + current_time: self.current_time, + time_margin: self.time_margin, + release_time: time_stamp, + }); + } + match self.tc_map.entry(time_stamp) { + Entry::Vacant(e) => { + e.insert(vec![info]); + } + Entry::Occupied(mut v) => { + v.get_mut().push(info); + } + } + Ok(()) + } } } #[cfg(test)] mod tests { use super::*; - use crate::pool::{LocalPool, PoolCfg, PoolProvider, StoreAddr, StoreError}; + use crate::pool::{ + PoolProviderMemInPlace, StaticMemoryPool, StaticPoolAddr, StaticPoolConfig, StoreAddr, + StoreError, + }; use alloc::collections::btree_map::Range; use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader}; use spacepackets::ecss::WritablePusPacket; @@ -694,7 +823,7 @@ mod tests { } fn ping_tc_to_store( - pool: &mut LocalPool, + pool: &mut StaticMemoryPool, buf: &mut [u8], seq_count: u16, app_data: Option<&'static [u8]>, @@ -718,7 +847,7 @@ mod tests { #[test] fn reset() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -769,10 +898,10 @@ mod tests { .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(100), TcInfo::new( - StoreAddr { + StoreAddr::from(StaticPoolAddr { pool_idx: 0, packet_idx: 1, - }, + }), RequestId { seq_count: 1, apid: 0, @@ -786,10 +915,10 @@ mod tests { .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(100), TcInfo::new( - StoreAddr { + StoreAddr::from(StaticPoolAddr { pool_idx: 0, packet_idx: 2, - }, + }), RequestId { seq_count: 2, apid: 1, @@ -803,10 +932,11 @@ mod tests { .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(300), TcInfo::new( - StoreAddr { + StaticPoolAddr { pool_idx: 0, packet_idx: 2, - }, + } + .into(), RequestId { source_id: 10, seq_count: 20, @@ -869,7 +999,7 @@ mod tests { } #[test] fn release_basic() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -886,7 +1016,7 @@ mod tests { .expect("insertion failed"); let mut i = 0; - let mut test_closure_1 = |boolvar: bool, tc_info: &TcInfo| { + let mut test_closure_1 = |boolvar: bool, tc_info: &TcInfo, _tc: &[u8]| { common_check(boolvar, &tc_info.addr, vec![tc_info_0.addr()], &mut i); true }; @@ -905,10 +1035,11 @@ mod tests { .release_telecommands(&mut test_closure_1, &mut pool) .expect("deletion failed"); assert_eq!(released, 1); - assert!(pool.has_element_at(&tc_info_0.addr()).unwrap()); + // TC is deleted. + assert!(!pool.has_element_at(&tc_info_0.addr()).unwrap()); // test 3, late timestamp, release 1 overdue tc - let mut test_closure_2 = |boolvar: bool, tc_info: &TcInfo| { + let mut test_closure_2 = |boolvar: bool, tc_info: &TcInfo, _tc: &[u8]| { common_check(boolvar, &tc_info.addr, vec![tc_info_1.addr()], &mut i); true }; @@ -919,7 +1050,8 @@ mod tests { .release_telecommands(&mut test_closure_2, &mut pool) .expect("deletion failed"); assert_eq!(released, 1); - assert!(pool.has_element_at(&tc_info_1.addr()).unwrap()); + // TC is deleted. + assert!(!pool.has_element_at(&tc_info_1.addr()).unwrap()); //test 4: no tcs left scheduler @@ -932,7 +1064,7 @@ mod tests { #[test] fn release_multi_with_same_time() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -949,7 +1081,7 @@ mod tests { .expect("insertion failed"); let mut i = 0; - let mut test_closure = |boolvar: bool, store_addr: &TcInfo| { + let mut test_closure = |boolvar: bool, store_addr: &TcInfo, _tc: &[u8]| { common_check( boolvar, &store_addr.addr, @@ -974,8 +1106,8 @@ mod tests { .release_telecommands(&mut test_closure, &mut pool) .expect("deletion failed"); assert_eq!(released, 2); - assert!(pool.has_element_at(&tc_info_0.addr()).unwrap()); - assert!(pool.has_element_at(&tc_info_1.addr()).unwrap()); + assert!(!pool.has_element_at(&tc_info_0.addr()).unwrap()); + assert!(!pool.has_element_at(&tc_info_1.addr()).unwrap()); //test 3: no tcs left released = scheduler @@ -989,7 +1121,7 @@ mod tests { #[test] fn release_with_scheduler_disabled() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -1008,7 +1140,7 @@ mod tests { .expect("insertion failed"); let mut i = 0; - let mut test_closure_1 = |boolvar: bool, tc_info: &TcInfo| { + let mut test_closure_1 = |boolvar: bool, tc_info: &TcInfo, _tc: &[u8]| { common_check_disabled(boolvar, &tc_info.addr, vec![tc_info_0.addr()], &mut i); true }; @@ -1030,7 +1162,7 @@ mod tests { assert!(!pool.has_element_at(&tc_info_0.addr()).unwrap()); // test 3, late timestamp, release 1 overdue tc - let mut test_closure_2 = |boolvar: bool, tc_info: &TcInfo| { + let mut test_closure_2 = |boolvar: bool, tc_info: &TcInfo, _tc: &[u8]| { common_check_disabled(boolvar, &tc_info.addr, vec![tc_info_1.addr()], &mut i); true }; @@ -1057,7 +1189,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); @@ -1072,7 +1204,7 @@ mod tests { assert!(pool.has_element_at(&tc_info_0.addr()).unwrap()); let data = pool.read(&tc_info_0.addr()).unwrap(); - let check_tc = PusTcReader::new(&data).expect("incorrect Pus tc raw data"); + let check_tc = PusTcReader::new(data).expect("incorrect Pus tc raw data"); assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); assert_eq!(scheduler.num_scheduled_telecommands(), 1); @@ -1082,7 +1214,7 @@ mod tests { let mut addr_vec = Vec::new(); let mut i = 0; - let mut test_closure = |boolvar: bool, tc_info: &TcInfo| { + let mut test_closure = |boolvar: bool, tc_info: &TcInfo, _tc: &[u8]| { common_check(boolvar, &tc_info.addr, vec![info.addr], &mut i); // check that tc remains unchanged addr_vec.push(tc_info.addr); @@ -1094,7 +1226,7 @@ mod tests { .unwrap(); let data = pool.read(&addr_vec[0]).unwrap(); - let check_tc = PusTcReader::new(&data).expect("incorrect Pus tc raw data"); + let check_tc = PusTcReader::new(data).expect("incorrect Pus tc raw data"); assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); } @@ -1103,7 +1235,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 32] = [0; 32]; let tc = scheduled_tc(UnixTimestamp::new_only_seconds(100), &mut buf); @@ -1118,7 +1250,7 @@ mod tests { assert!(pool.has_element_at(&info.addr).unwrap()); let data = pool.read(&info.addr).unwrap(); - let check_tc = PusTcReader::new(&data).expect("incorrect Pus tc raw data"); + let check_tc = PusTcReader::new(data).expect("incorrect Pus tc raw data"); assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); assert_eq!(scheduler.num_scheduled_telecommands(), 1); @@ -1128,7 +1260,7 @@ mod tests { let mut addr_vec = Vec::new(); let mut i = 0; - let mut test_closure = |boolvar: bool, tc_info: &TcInfo| { + let mut test_closure = |boolvar: bool, tc_info: &TcInfo, _tc: &[u8]| { common_check(boolvar, &tc_info.addr, vec![info.addr], &mut i); // check that tc remains unchanged addr_vec.push(tc_info.addr); @@ -1140,7 +1272,7 @@ mod tests { .unwrap(); let data = pool.read(&addr_vec[0]).unwrap(); - let check_tc = PusTcReader::new(&data).expect("incorrect PUS tc raw data"); + let check_tc = PusTcReader::new(data).expect("incorrect PUS tc raw data"); assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); } @@ -1149,7 +1281,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 32] = [0; 32]; let tc = wrong_tc_service(UnixTimestamp::new_only_seconds(100), &mut buf); @@ -1170,7 +1302,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 32] = [0; 32]; let tc = wrong_tc_subservice(UnixTimestamp::new_only_seconds(100), &mut buf); @@ -1190,7 +1322,7 @@ mod tests { fn insert_wrapped_tc_faulty_app_data() { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let tc = invalid_time_tagged_cmd(); let insert_res = scheduler.insert_wrapped_tc::(&tc, &mut pool); assert!(insert_res.is_err()); @@ -1205,7 +1337,7 @@ mod tests { fn insert_doubly_wrapped_time_tagged_cmd() { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 64] = [0; 64]; let tc = double_wrapped_time_tagged_tc(UnixTimestamp::new_only_seconds(50), &mut buf); let insert_res = scheduler.insert_wrapped_tc::(&tc, &mut pool); @@ -1241,7 +1373,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 32] = [0; 32]; @@ -1250,9 +1382,13 @@ mod tests { assert!(insert_res.is_err()); let err = insert_res.unwrap_err(); match err { - ScheduleError::ReleaseTimeInTimeMargin(curr_time, margin, release_time) => { - assert_eq!(curr_time, UnixTimestamp::new_only_seconds(0)); - assert_eq!(margin, Duration::from_secs(5)); + ScheduleError::ReleaseTimeInTimeMargin { + current_time, + time_margin, + release_time, + } => { + assert_eq!(current_time, UnixTimestamp::new_only_seconds(0)); + assert_eq!(time_margin, Duration::from_secs(5)); assert_eq!(release_time, UnixTimestamp::new_only_seconds(4)); } _ => panic!("unexepcted error {err}"), @@ -1261,7 +1397,7 @@ mod tests { #[test] fn test_store_error_propagation_release() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; @@ -1271,7 +1407,7 @@ mod tests { .expect("insertion failed"); let mut i = 0; - let test_closure_1 = |boolvar: bool, tc_info: &TcInfo| { + let test_closure_1 = |boolvar: bool, tc_info: &TcInfo, _tc: &[u8]| { common_check_disabled(boolvar, &tc_info.addr, vec![tc_info_0.addr()], &mut i); true }; @@ -1284,7 +1420,8 @@ mod tests { let release_res = scheduler.release_telecommands(test_closure_1, &mut pool); assert!(release_res.is_err()); let err = release_res.unwrap_err(); - assert_eq!(err.0, 1); + // TC could not even be read.. + assert_eq!(err.0, 0); match err.1 { StoreError::DataDoesNotExist(addr) => { assert_eq!(tc_info_0.addr(), addr); @@ -1295,7 +1432,7 @@ mod tests { #[test] fn test_store_error_propagation_reset() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; @@ -1319,7 +1456,7 @@ mod tests { #[test] fn test_delete_by_req_id_simple_retrieve_addr() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; @@ -1338,7 +1475,7 @@ mod tests { #[test] fn test_delete_by_req_id_simple_delete_all() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; @@ -1357,7 +1494,7 @@ mod tests { #[test] fn test_delete_by_req_id_complex() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; @@ -1386,7 +1523,7 @@ mod tests { let del_res = scheduler.delete_by_request_id_and_from_pool(&tc_info_2.request_id(), &mut pool); assert!(del_res.is_ok()); - assert_eq!(del_res.unwrap(), true); + assert!(del_res.unwrap()); assert!(!pool.has_element_at(&tc_info_2.addr()).unwrap()); assert_eq!(scheduler.num_scheduled_telecommands(), 1); @@ -1394,7 +1531,7 @@ mod tests { let addr_1 = scheduler.delete_by_request_id_and_from_pool(&tc_info_1.request_id(), &mut pool); assert!(addr_1.is_ok()); - assert_eq!(addr_1.unwrap(), true); + assert!(addr_1.unwrap()); assert!(!pool.has_element_at(&tc_info_1.addr()).unwrap()); assert_eq!(scheduler.num_scheduled_telecommands(), 0); } @@ -1404,7 +1541,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(1, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(1, 64)])); let mut buf: [u8; 32] = [0; 32]; // Store is full after this. @@ -1424,7 +1561,7 @@ mod tests { } fn insert_command_with_release_time( - pool: &mut LocalPool, + pool: &mut StaticMemoryPool, scheduler: &mut PusScheduler, seq_count: u16, release_secs: u64, @@ -1443,7 +1580,7 @@ mod tests { #[test] fn test_time_window_retrieval_select_all() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let tc_info_0 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); @@ -1474,7 +1611,7 @@ mod tests { #[test] fn test_time_window_retrieval_select_from_stamp() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let _ = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); @@ -1505,7 +1642,7 @@ mod tests { #[test] fn test_time_window_retrieval_select_to_time() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let tc_info_0 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); @@ -1536,7 +1673,7 @@ mod tests { #[test] fn test_time_window_retrieval_select_from_time_to_time() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let _ = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); @@ -1571,7 +1708,7 @@ mod tests { #[test] fn test_deletion_all() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); @@ -1598,7 +1735,7 @@ mod tests { #[test] fn test_deletion_from_start_time() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); @@ -1619,7 +1756,7 @@ mod tests { #[test] fn test_deletion_to_end_time() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let cmd_0_to_delete = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); @@ -1641,7 +1778,7 @@ mod tests { #[test] fn test_deletion_from_start_time_to_end_time() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let cmd_out_of_range_0 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); diff --git a/satrs-core/src/pus/scheduler_srv.rs b/satrs-core/src/pus/scheduler_srv.rs index 9f391b1..2225c03 100644 --- a/satrs-core/src/pus/scheduler_srv.rs +++ b/satrs-core/src/pus/scheduler_srv.rs @@ -1,82 +1,72 @@ -use crate::pool::SharedPool; -use crate::pus::scheduler::PusScheduler; -use crate::pus::{EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError}; +use super::scheduler::PusSchedulerInterface; +use super::{EcssTcInMemConverter, PusServiceBase, PusServiceHelper}; +use crate::pool::PoolProviderMemInPlace; +use crate::pus::{PusPacketHandlerResult, PusPacketHandlingError}; +use alloc::string::ToString; use spacepackets::ecss::{scheduling, PusPacket}; use spacepackets::time::cds::TimeProvider; -use std::boxed::Box; - -use super::verification::VerificationReporterWithSender; -use super::{EcssTcInMemConverter, PusServiceBase, PusServiceHandler}; /// This is a helper class for [std] environments to handle generic PUS 11 (scheduling service) -/// packets. This handler is constrained to using the [PusScheduler], but is able to process -/// the most important PUS requests for a scheduling service. +/// packets. This handler is able to handle the most important PUS requests for a scheduling +/// service which provides the [PusSchedulerInterface]. /// /// Please note that this class does not do the regular periodic handling like releasing any /// telecommands inside the scheduler. The user can retrieve the wrapped scheduler via the /// [Self::scheduler] and [Self::scheduler_mut] function and then use the scheduler API to release /// telecommands when applicable. -pub struct PusService11SchedHandler { - pub psb: PusServiceHandler, - shared_tc_store: SharedPool, - scheduler: PusScheduler, +pub struct PusService11SchedHandler< + TcInMemConverter: EcssTcInMemConverter, + Scheduler: PusSchedulerInterface, +> { + pub service_helper: PusServiceHelper, + scheduler: Scheduler, } -impl PusService11SchedHandler { - pub fn new( - tc_receiver: Box, - tm_sender: Box, - tm_apid: u16, - verification_handler: VerificationReporterWithSender, - tc_in_mem_converter: TcInMemConverter, - shared_tc_store: SharedPool, - scheduler: PusScheduler, - ) -> Self { +impl + PusService11SchedHandler +{ + pub fn new(service_helper: PusServiceHelper, scheduler: Scheduler) -> Self { Self { - psb: PusServiceHandler::new( - tc_receiver, - tm_sender, - tm_apid, - verification_handler, - tc_in_mem_converter, - ), - shared_tc_store, + service_helper, scheduler, } } - pub fn scheduler_mut(&mut self) -> &mut PusScheduler { + pub fn scheduler_mut(&mut self) -> &mut Scheduler { &mut self.scheduler } - pub fn scheduler(&self) -> &PusScheduler { + pub fn scheduler(&self) -> &Scheduler { &self.scheduler } - pub fn handle_one_tc(&mut self) -> Result { - let possible_packet = self.psb.retrieve_and_accept_next_packet()?; + pub fn handle_one_tc( + &mut self, + sched_tc_pool: &mut (impl PoolProviderMemInPlace + ?Sized), + ) -> Result { + let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?; if possible_packet.is_none() { return Ok(PusPacketHandlerResult::Empty); } let ecss_tc_and_token = possible_packet.unwrap(); let tc = self - .psb + .service_helper .tc_in_mem_converter - .convert_ecss_tc_in_memory_to_reader(&ecss_tc_and_token)?; - let subservice = tc.subservice(); - let std_service = scheduling::Subservice::try_from(subservice); - if std_service.is_err() { + .convert_ecss_tc_in_memory_to_reader(&ecss_tc_and_token.tc_in_memory)?; + let subservice = PusPacket::subservice(&tc); + let standard_subservice = scheduling::Subservice::try_from(subservice); + if standard_subservice.is_err() { return Ok(PusPacketHandlerResult::CustomSubservice( - tc.subservice(), + subservice, ecss_tc_and_token.token, )); } let mut partial_error = None; let time_stamp = PusServiceBase::get_current_timestamp(&mut partial_error); - match std_service.unwrap() { + match standard_subservice.unwrap() { scheduling::Subservice::TcEnableScheduling => { let start_token = self - .psb + .service_helper .common .verification_handler .get_mut() @@ -85,19 +75,21 @@ impl PusService11SchedHandler { let start_token = self - .psb + .service_helper .common .verification_handler .get_mut() @@ -106,32 +98,32 @@ impl PusService11SchedHandler { let start_token = self - .psb + .service_helper .common .verification_handler .get_mut() .start_success(ecss_tc_and_token.token, Some(&time_stamp)) .expect("Error sending start success"); - let mut pool = self.shared_tc_store.write().expect("Locking pool failed"); - self.scheduler - .reset(pool.as_mut()) + .reset(sched_tc_pool) .expect("Error resetting TC Pool"); - self.psb + self.service_helper .common .verification_handler .get_mut() @@ -140,19 +132,19 @@ impl PusService11SchedHandler { let start_token = self - .psb + .service_helper .common .verification_handler .get_mut() .start_success(ecss_tc_and_token.token, Some(&time_stamp)) .expect("error sending start success"); - let mut pool = self.shared_tc_store.write().expect("locking pool failed"); + // let mut pool = self.sched_tc_pool.write().expect("locking pool failed"); self.scheduler - .insert_wrapped_tc::(&tc, pool.as_mut()) + .insert_wrapped_tc::(&tc, sched_tc_pool) .expect("insertion of activity into pool failed"); - self.psb + self.service_helper .common .verification_handler .get_mut() @@ -162,7 +154,7 @@ impl PusService11SchedHandler { // Treat unhandled standard subservices as custom subservices for now. return Ok(PusPacketHandlerResult::CustomSubservice( - tc.subservice(), + subservice, ecss_tc_and_token.token, )); } @@ -175,3 +167,189 @@ impl PusService11SchedHandler, + sched_tc_pool: StaticMemoryPool, + } + + impl Pus11HandlerWithStoreTester { + pub fn new() -> Self { + let test_scheduler = TestScheduler::default(); + let pool_cfg = StaticPoolConfig::new(alloc::vec![(16, 16), (8, 32), (4, 64)]); + let sched_tc_pool = StaticMemoryPool::new(pool_cfg.clone()); + let (common, srv_handler) = PusServiceHandlerWithSharedStoreCommon::new(); + Self { + common, + handler: PusService11SchedHandler::new(srv_handler, test_scheduler), + sched_tc_pool, + } + } + } + + impl PusTestHarness for Pus11HandlerWithStoreTester { + delegate! { + to self.common { + fn send_tc(&mut self, tc: &PusTcCreator) -> VerificationToken; + fn read_next_tm(&mut self) -> PusTmReader<'_>; + fn check_no_tm_available(&self) -> bool; + fn check_next_verification_tm(&self, subservice: u8, expected_request_id: RequestId); + } + } + } + + #[derive(Default)] + pub struct TestScheduler { + reset_count: u32, + enabled: bool, + enabled_count: u32, + disabled_count: u32, + inserted_tcs: VecDeque, + } + + impl PusSchedulerInterface for TestScheduler { + type TimeProvider = cds::TimeProvider; + + fn reset( + &mut self, + _store: &mut (impl crate::pool::PoolProviderMemInPlace + ?Sized), + ) -> Result<(), crate::pool::StoreError> { + self.reset_count += 1; + Ok(()) + } + + fn is_enabled(&self) -> bool { + self.enabled + } + + fn enable(&mut self) { + self.enabled_count += 1; + self.enabled = true; + } + + fn disable(&mut self) { + self.disabled_count += 1; + self.enabled = false; + } + + fn insert_unwrapped_and_stored_tc( + &mut self, + _time_stamp: spacepackets::time::UnixTimestamp, + info: crate::pus::scheduler::TcInfo, + ) -> Result<(), crate::pus::scheduler::ScheduleError> { + self.inserted_tcs.push_back(info); + Ok(()) + } + } + + fn generic_subservice_send( + test_harness: &mut Pus11HandlerWithStoreTester, + subservice: Subservice, + ) { + let mut reply_header = SpHeader::tm_unseg(TEST_APID, 0, 0).unwrap(); + let tc_header = PusTcSecondaryHeader::new_simple(11, subservice as u8); + let enable_scheduling = PusTcCreator::new(&mut reply_header, tc_header, &[0; 7], true); + let token = test_harness.send_tc(&enable_scheduling); + + let request_id = token.req_id(); + test_harness + .handler + .handle_one_tc(&mut test_harness.sched_tc_pool) + .unwrap(); + test_harness.check_next_verification_tm(1, request_id); + test_harness.check_next_verification_tm(3, request_id); + test_harness.check_next_verification_tm(7, request_id); + } + + #[test] + fn test_scheduling_enabling_tc() { + let mut test_harness = Pus11HandlerWithStoreTester::new(); + test_harness.handler.scheduler_mut().disable(); + assert!(!test_harness.handler.scheduler().is_enabled()); + generic_subservice_send(&mut test_harness, Subservice::TcEnableScheduling); + assert!(test_harness.handler.scheduler().is_enabled()); + assert_eq!(test_harness.handler.scheduler().enabled_count, 1); + } + + #[test] + fn test_scheduling_disabling_tc() { + let mut test_harness = Pus11HandlerWithStoreTester::new(); + test_harness.handler.scheduler_mut().enable(); + assert!(test_harness.handler.scheduler().is_enabled()); + generic_subservice_send(&mut test_harness, Subservice::TcDisableScheduling); + assert!(!test_harness.handler.scheduler().is_enabled()); + assert_eq!(test_harness.handler.scheduler().disabled_count, 1); + } + + #[test] + fn test_reset_scheduler_tc() { + let mut test_harness = Pus11HandlerWithStoreTester::new(); + generic_subservice_send(&mut test_harness, Subservice::TcResetScheduling); + assert_eq!(test_harness.handler.scheduler().reset_count, 1); + } + + #[test] + fn test_insert_activity_tc() { + let mut test_harness = Pus11HandlerWithStoreTester::new(); + let mut reply_header = SpHeader::tm_unseg(TEST_APID, 0, 0).unwrap(); + let mut sec_header = PusTcSecondaryHeader::new_simple(17, 1); + let ping_tc = PusTcCreator::new(&mut reply_header, sec_header, &[], true); + let req_id_ping_tc = scheduler::RequestId::from_tc(&ping_tc); + let stamper = cds::TimeProvider::from_now_with_u16_days().expect("time provider failed"); + let mut sched_app_data: [u8; 64] = [0; 64]; + let mut written_len = stamper.write_to_bytes(&mut sched_app_data).unwrap(); + let ping_raw = ping_tc.to_vec().expect("generating raw tc failed"); + sched_app_data[written_len..written_len + ping_raw.len()].copy_from_slice(&ping_raw); + written_len += ping_raw.len(); + reply_header = SpHeader::tm_unseg(TEST_APID, 1, 0).unwrap(); + sec_header = PusTcSecondaryHeader::new_simple(11, Subservice::TcInsertActivity as u8); + let enable_scheduling = PusTcCreator::new( + &mut reply_header, + sec_header, + &sched_app_data[..written_len], + true, + ); + let token = test_harness.send_tc(&enable_scheduling); + + let request_id = token.req_id(); + test_harness + .handler + .handle_one_tc(&mut test_harness.sched_tc_pool) + .unwrap(); + test_harness.check_next_verification_tm(1, request_id); + test_harness.check_next_verification_tm(3, request_id); + test_harness.check_next_verification_tm(7, request_id); + let tc_info = test_harness + .handler + .scheduler_mut() + .inserted_tcs + .pop_front() + .unwrap(); + assert_eq!(tc_info.request_id(), req_id_ping_tc); + } +} diff --git a/satrs-core/src/pus/test.rs b/satrs-core/src/pus/test.rs index 5d59ab3..bdaed69 100644 --- a/satrs-core/src/pus/test.rs +++ b/satrs-core/src/pus/test.rs @@ -1,50 +1,33 @@ use crate::pus::{ - EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult, - PusPacketHandlingError, PusTmWrapper, + PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError, PusTmWrapper, }; use spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader}; use spacepackets::ecss::PusPacket; use spacepackets::SpHeader; -use std::boxed::Box; -use super::verification::VerificationReporterWithSender; -use super::{EcssTcInMemConverter, PusServiceBase, PusServiceHandler}; +use super::{EcssTcInMemConverter, PusServiceBase, PusServiceHelper}; /// This is a helper class for [std] environments to handle generic PUS 17 (test service) packets. /// This handler only processes ping requests and generates a ping reply for them accordingly. pub struct PusService17TestHandler { - pub psb: PusServiceHandler, + pub service_helper: PusServiceHelper, } impl PusService17TestHandler { - pub fn new( - tc_receiver: Box, - tm_sender: Box, - tm_apid: u16, - verification_handler: VerificationReporterWithSender, - tc_in_mem_converter: TcInMemConverter, - ) -> Self { - Self { - psb: PusServiceHandler::new( - tc_receiver, - tm_sender, - tm_apid, - verification_handler, - tc_in_mem_converter, - ), - } + pub fn new(service_helper: PusServiceHelper) -> Self { + Self { service_helper } } pub fn handle_one_tc(&mut self) -> Result { - let possible_packet = self.psb.retrieve_and_accept_next_packet()?; + let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?; if possible_packet.is_none() { return Ok(PusPacketHandlerResult::Empty); } let ecss_tc_and_token = possible_packet.unwrap(); let tc = self - .psb + .service_helper .tc_in_mem_converter - .convert_ecss_tc_in_memory_to_reader(&ecss_tc_and_token)?; + .convert_ecss_tc_in_memory_to_reader(&ecss_tc_and_token.tc_in_memory)?; if tc.service() != 17 { return Err(PusPacketHandlingError::WrongService(tc.service())); } @@ -52,7 +35,7 @@ impl PusService17TestHandler PusService17TestHandler PusService17TestHandler PusService17TestHandler, + } + + impl Pus17HandlerWithStoreTester { + pub fn new() -> Self { + let (common, srv_handler) = PusServiceHandlerWithSharedStoreCommon::new(); + let pus_17_handler = PusService17TestHandler::new(srv_handler); + Self { + common, + handler: pus_17_handler, + } + } + } + + impl PusTestHarness for Pus17HandlerWithStoreTester { + delegate! { + to self.common { + fn send_tc(&mut self, tc: &PusTcCreator) -> VerificationToken; + fn read_next_tm(&mut self) -> PusTmReader<'_>; + fn check_no_tm_available(&self) -> bool; + fn check_next_verification_tm( + &self, + subservice: u8, + expected_request_id: RequestId + ); + } + } + } + impl SimplePusPacketHandler for Pus17HandlerWithStoreTester { + delegate! { + to self.handler { + fn handle_one_tc(&mut self) -> Result; + } + } + } + + struct Pus17HandlerWithVecTester { + common: PusServiceHandlerWithVecCommon, + handler: PusService17TestHandler, + } + + impl Pus17HandlerWithVecTester { + pub fn new() -> Self { + let (common, srv_handler) = PusServiceHandlerWithVecCommon::new(); + Self { + common, + handler: PusService17TestHandler::new(srv_handler), + } + } + } + + impl PusTestHarness for Pus17HandlerWithVecTester { + delegate! { + to self.common { + fn send_tc(&mut self, tc: &PusTcCreator) -> VerificationToken; + fn read_next_tm(&mut self) -> PusTmReader<'_>; + fn check_no_tm_available(&self) -> bool; + fn check_next_verification_tm( + &self, + subservice: u8, + expected_request_id: RequestId, + ); + } + } + } + impl SimplePusPacketHandler for Pus17HandlerWithVecTester { + delegate! { + to self.handler { + fn handle_one_tc(&mut self) -> Result; + } + } + } + + fn ping_test(test_harness: &mut (impl PusTestHarness + SimplePusPacketHandler)) { // Create a ping TC, verify acceptance. let mut sp_header = SpHeader::tc(TEST_APID, SequenceFlags::Unsegmented, 0, 0).unwrap(); let sec_header = PusTcSecondaryHeader::new_simple(17, 1); let ping_tc = PusTcCreator::new_no_app_data(&mut sp_header, sec_header, true); - let token = verification_handler.add_tc(&ping_tc); - let token = verification_handler - .acceptance_success(token, None) - .unwrap(); - let tc_size = ping_tc.write_to_bytes(&mut pus_buf).unwrap(); - let mut tc_pool = tc_pool_shared.write().unwrap(); - let addr = tc_pool.add(&pus_buf[..tc_size]).unwrap(); - drop(tc_pool); - // Send accepted TC to test service handler. - test_srv_tc_tx - .send(EcssTcAndToken::new(addr, token)) - .unwrap(); - let result = pus_17_handler.handle_one_tc(); + let token = test_harness.send_tc(&ping_tc); + let request_id = token.req_id(); + let result = test_harness.handle_one_tc(); assert!(result.is_ok()); // We should see 4 replies in the TM queue now: Acceptance TM, Start TM, ping reply and // Completion TM - let mut next_msg = tm_rx.try_recv(); - assert!(next_msg.is_ok()); - let mut tm_addr = next_msg.unwrap(); - let tm_pool = tm_pool_shared.read().unwrap(); - let tm_raw = tm_pool.read(&tm_addr).unwrap(); - let (tm, _) = PusTmReader::new(tm_raw, 0).unwrap(); - assert_eq!(tm.service(), 1); - assert_eq!(tm.subservice(), 1); - let req_id = RequestId::from_bytes(tm.user_data()).expect("generating request ID failed"); - assert_eq!(req_id, token.req_id()); // Acceptance TM - next_msg = tm_rx.try_recv(); - assert!(next_msg.is_ok()); - tm_addr = next_msg.unwrap(); - let tm_raw = tm_pool.read(&tm_addr).unwrap(); - // Is generated with CDS short timestamp. - let (tm, _) = PusTmReader::new(tm_raw, 7).unwrap(); - assert_eq!(tm.service(), 1); - assert_eq!(tm.subservice(), 3); - let req_id = RequestId::from_bytes(tm.user_data()).expect("generating request ID failed"); - assert_eq!(req_id, token.req_id()); + test_harness.check_next_verification_tm(1, request_id); + + // Start TM + test_harness.check_next_verification_tm(3, request_id); // Ping reply - next_msg = tm_rx.try_recv(); - assert!(next_msg.is_ok()); - tm_addr = next_msg.unwrap(); - let tm_raw = tm_pool.read(&tm_addr).unwrap(); - // Is generated with CDS short timestamp. - let (tm, _) = PusTmReader::new(tm_raw, 7).unwrap(); + let tm = test_harness.read_next_tm(); assert_eq!(tm.service(), 17); assert_eq!(tm.subservice(), 2); assert!(tm.user_data().is_empty()); // TM completion - next_msg = tm_rx.try_recv(); - assert!(next_msg.is_ok()); - tm_addr = next_msg.unwrap(); - let tm_raw = tm_pool.read(&tm_addr).unwrap(); - // Is generated with CDS short timestamp. - let (tm, _) = PusTmReader::new(tm_raw, 7).unwrap(); - assert_eq!(tm.service(), 1); - assert_eq!(tm.subservice(), 7); - let req_id = RequestId::from_bytes(tm.user_data()).expect("generating request ID failed"); - assert_eq!(req_id, token.req_id()); + test_harness.check_next_verification_tm(7, request_id); + } + + #[test] + fn test_basic_ping_processing_using_store() { + let mut test_harness = Pus17HandlerWithStoreTester::new(); + ping_test(&mut test_harness); + } + + #[test] + fn test_basic_ping_processing_using_vec() { + let mut test_harness = Pus17HandlerWithVecTester::new(); + ping_test(&mut test_harness); + } + + #[test] + fn test_empty_tc_queue() { + let mut test_harness = Pus17HandlerWithStoreTester::new(); + let result = test_harness.handle_one_tc(); + assert!(result.is_ok()); + let result = result.unwrap(); + if let PusPacketHandlerResult::Empty = result { + } else { + panic!("unexpected result type {result:?}") + } + } + + #[test] + fn test_sending_unsupported_service() { + let mut test_harness = Pus17HandlerWithStoreTester::new(); + let mut sp_header = SpHeader::tc(TEST_APID, SequenceFlags::Unsegmented, 0, 0).unwrap(); + let sec_header = PusTcSecondaryHeader::new_simple(3, 1); + let ping_tc = PusTcCreator::new_no_app_data(&mut sp_header, sec_header, true); + test_harness.send_tc(&ping_tc); + let result = test_harness.handle_one_tc(); + assert!(result.is_err()); + let error = result.unwrap_err(); + if let PusPacketHandlingError::WrongService(num) = error { + assert_eq!(num, 3); + } else { + panic!("unexpected error type {error}") + } + } + + #[test] + fn test_sending_custom_subservice() { + let mut test_harness = Pus17HandlerWithStoreTester::new(); + let mut sp_header = SpHeader::tc(TEST_APID, SequenceFlags::Unsegmented, 0, 0).unwrap(); + let sec_header = PusTcSecondaryHeader::new_simple(17, 200); + let ping_tc = PusTcCreator::new_no_app_data(&mut sp_header, sec_header, true); + test_harness.send_tc(&ping_tc); + let result = test_harness.handle_one_tc(); + assert!(result.is_ok()); + let result = result.unwrap(); + if let PusPacketHandlerResult::CustomSubservice(subservice, _) = result { + assert_eq!(subservice, 200); + } else { + panic!("unexpected result type {result:?}") + } } } diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index ff38b89..6407bf3 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -15,7 +15,7 @@ //! ``` //! use std::sync::{Arc, mpsc, RwLock}; //! use std::time::Duration; -//! use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; +//! use satrs_core::pool::{PoolProviderMemInPlaceWithGuards, StaticMemoryPool, StaticPoolConfig}; //! use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; //! use satrs_core::seq_count::SeqCountProviderSimple; //! use satrs_core::pus::MpscTmInStoreSender; @@ -28,9 +28,9 @@ //! const EMPTY_STAMP: [u8; 7] = [0; 7]; //! const TEST_APID: u16 = 0x02; //! -//! let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); -//! let tm_pool = LocalPool::new(pool_cfg.clone()); -//! let shared_tm_store = SharedTmStore::new(Box::new(tm_pool)); +//! let pool_cfg = StaticPoolConfig::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); +//! let tm_pool = StaticMemoryPool::new(pool_cfg.clone()); +//! let shared_tm_store = SharedTmStore::new(tm_pool); //! let tm_store = shared_tm_store.clone_backing_pool(); //! let (verif_tx, verif_rx) = mpsc::channel(); //! let sender = MpscTmInStoreSender::new(0, "Test Sender", shared_tm_store, verif_tx); @@ -208,6 +208,8 @@ impl WasAtLeastAccepted for TcStateAccepted {} impl WasAtLeastAccepted for TcStateStarted {} impl WasAtLeastAccepted for TcStateCompleted {} +/// Token wrapper to model all possible verification tokens. These tokens are used to +/// enforce the correct order for the verification steps when doing verification reporting. #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum TcStateToken { None(VerificationToken), @@ -1323,7 +1325,7 @@ mod std_mod { #[cfg(test)] mod tests { - use crate::pool::{LocalPool, PoolCfg}; + use crate::pool::{PoolProviderMemInPlaceWithGuards, StaticMemoryPool, StaticPoolConfig}; use crate::pus::tests::CommonTmInfo; use crate::pus::verification::{ EcssTmSenderCore, EcssTmtcError, FailParams, FailParamsWithStep, RequestId, TcStateNone, @@ -1447,12 +1449,11 @@ mod tests { fn base_init(api_sel: bool) -> (TestBase<'static>, VerificationToken) { let mut reporter = base_reporter(); let (tc, req_id) = base_tc_init(None); - let init_tok; - if api_sel { - init_tok = reporter.add_tc_with_req_id(req_id); + let init_tok = if api_sel { + reporter.add_tc_with_req_id(req_id) } else { - init_tok = reporter.add_tc(&tc); - } + reporter.add_tc(&tc) + }; (TestBase { vr: reporter, tc }, init_tok) } @@ -1475,7 +1476,7 @@ mod tests { time_stamp: EMPTY_STAMP, }, additional_data: None, - req_id: req_id.clone(), + req_id: *req_id, }; let mut service_queue = sender.service_queue.borrow_mut(); assert_eq!(service_queue.len(), 1); @@ -1485,9 +1486,8 @@ mod tests { #[test] fn test_mpsc_verif_send_sync() { - let pool = LocalPool::new(PoolCfg::new(vec![(8, 8)])); - let tm_store = Box::new(pool); - let shared_tm_store = SharedTmStore::new(tm_store); + let pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(8, 8)])); + let shared_tm_store = SharedTmStore::new(pool); let (tx, _) = mpsc::channel(); let mpsc_verif_sender = MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store, tx); is_send(&mpsc_verif_sender); @@ -1505,7 +1505,7 @@ mod tests { fn test_basic_acceptance_success() { let (b, tok) = base_init(false); let mut sender = TestSender::default(); - b.vr.acceptance_success(tok, &mut sender, Some(&EMPTY_STAMP)) + b.vr.acceptance_success(tok, &sender, Some(&EMPTY_STAMP)) .expect("Sending acceptance success failed"); acceptance_check(&mut sender, &tok.req_id); } @@ -1605,7 +1605,7 @@ mod tests { #[test] fn test_basic_acceptance_failure_with_fail_data() { let (b, tok) = base_init(false); - let mut sender = TestSender::default(); + let sender = TestSender::default(); let fail_code = EcssEnumU8::new(10); let fail_data = EcssEnumU32::new(12); let mut fail_data_raw = [0; 4]; @@ -1615,7 +1615,7 @@ mod tests { &fail_code, Some(fail_data_raw.as_slice()), ); - b.vr.acceptance_failure(tok, &mut sender, fail_params) + b.vr.acceptance_failure(tok, &sender, fail_params) .expect("Sending acceptance success failed"); let cmp_info = TmInfo { common: CommonTmInfo { @@ -1784,8 +1784,7 @@ mod tests { .rep() .start_success(accepted_token, &mut sender, Some(&[0, 1, 0, 1, 0, 1, 0])) .expect("Sending start success failed"); - let mut empty = b - .rep() + b.rep() .step_success( &started_token, &mut sender, @@ -1793,16 +1792,13 @@ mod tests { EcssEnumU8::new(0), ) .expect("Sending step 0 success failed"); - assert_eq!(empty, ()); - empty = - b.vr.step_success( - &started_token, - &mut sender, - Some(&EMPTY_STAMP), - EcssEnumU8::new(1), - ) - .expect("Sending step 1 success failed"); - assert_eq!(empty, ()); + b.vr.step_success( + &started_token, + &mut sender, + Some(&EMPTY_STAMP), + EcssEnumU8::new(1), + ) + .expect("Sending step 1 success failed"); assert_eq!(sender.service_queue.borrow().len(), 4); step_success_check(&mut sender, tok.req_id); } @@ -1818,16 +1814,12 @@ mod tests { .helper .start_success(accepted_token, Some(&[0, 1, 0, 1, 0, 1, 0])) .expect("Sending start success failed"); - let mut empty = b - .helper + b.helper .step_success(&started_token, Some(&EMPTY_STAMP), EcssEnumU8::new(0)) .expect("Sending step 0 success failed"); - assert_eq!(empty, ()); - empty = b - .helper + b.helper .step_success(&started_token, Some(&EMPTY_STAMP), EcssEnumU8::new(1)) .expect("Sending step 1 success failed"); - assert_eq!(empty, ()); let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); assert_eq!(sender.service_queue.borrow().len(), 4); step_success_check(sender, tok.req_id); @@ -2122,10 +2114,8 @@ mod tests { let started_token = b.vr.start_success(accepted_token, &mut sender, Some(&[0, 1, 0, 1, 0, 1, 0])) .expect("Sending start success failed"); - let empty = - b.vr.completion_success(started_token, &mut sender, Some(&EMPTY_STAMP)) - .expect("Sending completion success failed"); - assert_eq!(empty, ()); + b.vr.completion_success(started_token, &mut sender, Some(&EMPTY_STAMP)) + .expect("Sending completion success failed"); completion_success_check(&mut sender, tok.req_id); } @@ -2140,11 +2130,9 @@ mod tests { .helper .start_success(accepted_token, Some(&[0, 1, 0, 1, 0, 1, 0])) .expect("Sending start success failed"); - let empty = b - .helper + b.helper .completion_success(started_token, Some(&EMPTY_STAMP)) .expect("Sending completion success failed"); - assert_eq!(empty, ()); let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); completion_success_check(sender, tok.req_id); } @@ -2152,8 +2140,8 @@ mod tests { #[test] // TODO: maybe a bit more extensive testing, all I have time for right now fn test_seq_count_increment() { - let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); - let tm_pool = Box::new(LocalPool::new(pool_cfg.clone())); + let pool_cfg = StaticPoolConfig::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); + let tm_pool = StaticMemoryPool::new(pool_cfg.clone()); let shared_tm_store = SharedTmStore::new(tm_pool); let shared_tm_pool = shared_tm_store.clone_backing_pool(); let (verif_tx, verif_rx) = mpsc::channel(); diff --git a/satrs-core/src/tmtc/tm_helper.rs b/satrs-core/src/tmtc/tm_helper.rs index 6dca7ee..fc0a8e9 100644 --- a/satrs-core/src/tmtc/tm_helper.rs +++ b/satrs-core/src/tmtc/tm_helper.rs @@ -8,7 +8,9 @@ pub use std_mod::*; #[cfg(feature = "std")] pub mod std_mod { - use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr}; + use crate::pool::{ + PoolProviderMemInPlace, SharedStaticMemoryPool, StaticMemoryPool, StoreAddr, + }; use crate::pus::EcssTmtcError; use spacepackets::ecss::tm::PusTmCreator; use spacepackets::ecss::WritablePusPacket; @@ -16,22 +18,25 @@ pub mod std_mod { #[derive(Clone)] pub struct SharedTmStore { - pool: SharedPool, + pub shared_pool: SharedStaticMemoryPool, } impl SharedTmStore { - pub fn new(backing_pool: ShareablePoolProvider) -> Self { + pub fn new(shared_pool: StaticMemoryPool) -> Self { Self { - pool: Arc::new(RwLock::new(backing_pool)), + shared_pool: Arc::new(RwLock::new(shared_pool)), } } - pub fn clone_backing_pool(&self) -> SharedPool { - self.pool.clone() + pub fn clone_backing_pool(&self) -> SharedStaticMemoryPool { + self.shared_pool.clone() } pub fn add_pus_tm(&self, pus_tm: &PusTmCreator) -> Result { - let mut pg = self.pool.write().map_err(|_| EcssTmtcError::StoreLock)?; + let mut pg = self + .shared_pool + .write() + .map_err(|_| EcssTmtcError::StoreLock)?; let (addr, buf) = pg.free_element(pus_tm.len_written())?; pus_tm .write_to_bytes(buf) @@ -91,3 +96,33 @@ impl PusTmWithCdsShortHelper { PusTmCreator::new(&mut reply_header, tc_header, source_data, true) } } + +#[cfg(test)] +mod tests { + use spacepackets::{ecss::PusPacket, time::cds::TimeProvider, CcsdsPacket}; + + use super::PusTmWithCdsShortHelper; + + #[test] + fn test_helper_with_stamper() { + let mut pus_tm_helper = PusTmWithCdsShortHelper::new(0x123); + let stamper = TimeProvider::new_with_u16_days(0, 0); + let tm = pus_tm_helper.create_pus_tm_with_stamper(17, 1, &[1, 2, 3, 4], &stamper, 25); + assert_eq!(tm.service(), 17); + assert_eq!(tm.subservice(), 1); + assert_eq!(tm.user_data(), &[1, 2, 3, 4]); + assert_eq!(tm.seq_count(), 25); + assert_eq!(tm.timestamp(), [64, 0, 0, 0, 0, 0, 0]) + } + + #[test] + fn test_helper_from_now() { + let mut pus_tm_helper = PusTmWithCdsShortHelper::new(0x123); + let tm = pus_tm_helper.create_pus_tm_timestamp_now(17, 1, &[1, 2, 3, 4], 25); + assert_eq!(tm.service(), 17); + assert_eq!(tm.subservice(), 1); + assert_eq!(tm.user_data(), &[1, 2, 3, 4]); + assert_eq!(tm.seq_count(), 25); + assert_eq!(tm.timestamp().len(), 7); + } +} diff --git a/satrs-core/tests/pools.rs b/satrs-core/tests/pools.rs index 375d624..cce2114 100644 --- a/satrs-core/tests/pools.rs +++ b/satrs-core/tests/pools.rs @@ -1,4 +1,6 @@ -use satrs_core::pool::{LocalPool, PoolCfg, PoolGuard, PoolProvider, StoreAddr}; +use satrs_core::pool::{ + PoolGuard, PoolProviderMemInPlace, StaticMemoryPool, StaticPoolConfig, StoreAddr, +}; use std::ops::DerefMut; use std::sync::mpsc; use std::sync::mpsc::{Receiver, Sender}; @@ -9,8 +11,8 @@ const DUMMY_DATA: [u8; 4] = [0, 1, 2, 3]; #[test] fn threaded_usage() { - let pool_cfg = PoolCfg::new(vec![(16, 6), (32, 3), (8, 12)]); - let shared_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); + let pool_cfg = StaticPoolConfig::new(vec![(16, 6), (32, 3), (8, 12)]); + let shared_pool = Arc::new(RwLock::new(StaticMemoryPool::new(pool_cfg))); let shared_clone = shared_pool.clone(); let (tx, rx): (Sender, Receiver) = mpsc::channel(); let jh0 = thread::spawn(move || { diff --git a/satrs-core/tests/pus_verification.rs b/satrs-core/tests/pus_verification.rs index 9c9fc63..a62b7a5 100644 --- a/satrs-core/tests/pus_verification.rs +++ b/satrs-core/tests/pus_verification.rs @@ -1,7 +1,10 @@ #[cfg(feature = "crossbeam")] pub mod crossbeam_test { use hashbrown::HashMap; - use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider}; + use satrs_core::pool::{ + PoolProviderMemInPlace, PoolProviderMemInPlaceWithGuards, StaticMemoryPool, + StaticPoolConfig, + }; use satrs_core::pus::verification::{ FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender, }; @@ -33,9 +36,9 @@ pub mod crossbeam_test { // each reporter have an own sequence count provider. let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); // Shared pool object to store the verification PUS telemetry - let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); - let shared_tm_store = SharedTmStore::new(Box::new(LocalPool::new(pool_cfg.clone()))); - let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); + let pool_cfg = StaticPoolConfig::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); + let shared_tm_store = SharedTmStore::new(StaticMemoryPool::new(pool_cfg.clone())); + let shared_tc_pool_0 = Arc::new(RwLock::new(StaticMemoryPool::new(pool_cfg))); let shared_tc_pool_1 = shared_tc_pool_0.clone(); let (tx, rx) = crossbeam_channel::bounded(10); let sender = diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 6adf040..ed1ef32 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -29,7 +29,7 @@ use satrs_core::event_man::{ }; use satrs_core::events::EventU32; use satrs_core::hk::HkRequest; -use satrs_core::pool::{LocalPool, PoolCfg}; +use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StaticPoolConfig}; use satrs_core::pus::event_man::{ DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, PusEventDispatcher, @@ -42,7 +42,9 @@ use satrs_core::pus::test::PusService17TestHandler; use satrs_core::pus::verification::{ TcStateStarted, VerificationReporterCfg, VerificationReporterWithSender, VerificationToken, }; -use satrs_core::pus::{EcssTcInStoreConverter, MpscTcInStoreReceiver, MpscTmInStoreSender}; +use satrs_core::pus::{ + EcssTcInSharedStoreConverter, MpscTcReceiver, MpscTmInStoreSender, PusServiceHelper, +}; use satrs_core::seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}; use satrs_core::spacepackets::ecss::tm::{PusTmCreator, PusTmZeroCopyWriter}; use satrs_core::spacepackets::{ @@ -66,7 +68,7 @@ use std::time::Duration; fn main() { setup_logger().expect("setting up logging with fern failed"); println!("Running OBSW example"); - let tm_pool = LocalPool::new(PoolCfg::new(vec![ + let tm_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ (30, 32), (15, 64), (15, 128), @@ -74,9 +76,9 @@ fn main() { (15, 1024), (15, 2048), ])); - let shared_tm_store = SharedTmStore::new(Box::new(tm_pool)); + let shared_tm_store = SharedTmStore::new(tm_pool); let tm_store_event = shared_tm_store.clone(); - let tc_pool = LocalPool::new(PoolCfg::new(vec![ + let tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ (30, 32), (15, 64), (15, 128), @@ -85,8 +87,16 @@ fn main() { (15, 2048), ])); let tc_store = TcStore { - pool: Arc::new(RwLock::new(Box::new(tc_pool))), + pool: Arc::new(RwLock::new(tc_pool)), }; + let sched_tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ + (30, 32), + (15, 64), + (15, 128), + (15, 256), + (15, 1024), + (15, 2048), + ])); let seq_count_provider = CcsdsSimpleSeqCountProvider::new(); let mut msg_counter_map: HashMap = HashMap::new(); @@ -172,18 +182,18 @@ fn main() { shared_tm_store.clone(), tm_funnel_tx.clone(), ); - let test_srv_receiver = MpscTcInStoreReceiver::new( + let test_srv_receiver = MpscTcReceiver::new( TcReceiverId::PusTest as ChannelId, "PUS_17_TC_RECV", pus_test_rx, ); - let pus17_handler = PusService17TestHandler::new( + let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( Box::new(test_srv_receiver), Box::new(test_srv_tm_sender), PUS_APID, verif_reporter.clone(), - EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), - ); + EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), + )); let mut pus_17_wrapper = Service17CustomWrapper { pus17_handler, test_srv_event_sender, @@ -195,7 +205,7 @@ fn main() { shared_tm_store.clone(), tm_funnel_tx.clone(), ); - let sched_srv_receiver = MpscTcInStoreReceiver::new( + let sched_srv_receiver = MpscTcReceiver::new( TcReceiverId::PusSched as ChannelId, "PUS_11_TC_RECV", pus_sched_rx, @@ -203,16 +213,18 @@ fn main() { let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) .expect("Creating PUS Scheduler failed"); let pus_11_handler = PusService11SchedHandler::new( - Box::new(sched_srv_receiver), - Box::new(sched_srv_tm_sender), - PUS_APID, - verif_reporter.clone(), - EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), - tc_store.pool.clone(), + PusServiceHelper::new( + Box::new(sched_srv_receiver), + Box::new(sched_srv_tm_sender), + PUS_APID, + verif_reporter.clone(), + EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), + ), scheduler, ); let mut pus_11_wrapper = Pus11Wrapper { pus_11_handler, + sched_tc_pool, tc_source_wrapper, }; @@ -222,17 +234,19 @@ fn main() { shared_tm_store.clone(), tm_funnel_tx.clone(), ); - let event_srv_receiver = MpscTcInStoreReceiver::new( + let event_srv_receiver = MpscTcReceiver::new( TcReceiverId::PusEvent as ChannelId, "PUS_5_TC_RECV", pus_event_rx, ); let pus_5_handler = PusService5EventHandler::new( - Box::new(event_srv_receiver), - Box::new(event_srv_tm_sender), - PUS_APID, - verif_reporter.clone(), - EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), + PusServiceHelper::new( + Box::new(event_srv_receiver), + Box::new(event_srv_tm_sender), + PUS_APID, + verif_reporter.clone(), + EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), + ), event_request_tx, ); let mut pus_5_wrapper = Pus5Wrapper { pus_5_handler }; @@ -243,7 +257,7 @@ fn main() { shared_tm_store.clone(), tm_funnel_tx.clone(), ); - let action_srv_receiver = MpscTcInStoreReceiver::new( + let action_srv_receiver = MpscTcReceiver::new( TcReceiverId::PusAction as ChannelId, "PUS_8_TC_RECV", pus_action_rx, @@ -253,7 +267,7 @@ fn main() { Box::new(action_srv_tm_sender), PUS_APID, verif_reporter.clone(), - EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), + EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), request_map.clone(), ); let mut pus_8_wrapper = Pus8Wrapper { pus_8_handler }; @@ -265,13 +279,13 @@ fn main() { tm_funnel_tx.clone(), ); let hk_srv_receiver = - MpscTcInStoreReceiver::new(TcReceiverId::PusHk as ChannelId, "PUS_8_TC_RECV", pus_hk_rx); + MpscTcReceiver::new(TcReceiverId::PusHk as ChannelId, "PUS_8_TC_RECV", pus_hk_rx); let pus_3_handler = PusService3HkHandler::new( Box::new(hk_srv_receiver), Box::new(hk_srv_tm_sender), PUS_APID, verif_reporter.clone(), - EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), + EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), request_map, ); let mut pus_3_wrapper = Pus3Wrapper { pus_3_handler }; diff --git a/satrs-example/src/pus/action.rs b/satrs-example/src/pus/action.rs index 366539e..3b8cc8d 100644 --- a/satrs-example/src/pus/action.rs +++ b/satrs-example/src/pus/action.rs @@ -4,8 +4,8 @@ use satrs_core::pus::verification::{ FailParams, TcStateAccepted, VerificationReporterWithSender, VerificationToken, }; use satrs_core::pus::{ - EcssTcInMemConverter, EcssTcInStoreConverter, EcssTcReceiver, EcssTmSender, - PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHandler, + EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcReceiver, EcssTmSender, + PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHelper, }; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; @@ -14,7 +14,7 @@ use std::collections::HashMap; use std::sync::mpsc::Sender; pub struct PusService8ActionHandler { - psb: PusServiceHandler, + service_helper: PusServiceHelper, request_handlers: HashMap>, } @@ -28,7 +28,7 @@ impl PusService8ActionHandler>, ) -> Self { Self { - psb: PusServiceHandler::new( + service_helper: PusServiceHelper::new( tc_receiver, tm_sender, tm_apid, @@ -47,7 +47,7 @@ impl PusService8ActionHandler Result<(), PusPacketHandlingError> { let user_data = tc.user_data(); if user_data.len() < 8 { - self.psb + self.service_helper .common .verification_handler .borrow_mut() @@ -77,7 +77,7 @@ impl PusService8ActionHandler PusService8ActionHandler Result { - let possible_packet = self.psb.retrieve_and_accept_next_packet()?; + let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?; if possible_packet.is_none() { return Ok(PusPacketHandlerResult::Empty); } let ecss_tc_and_token = possible_packet.unwrap(); - self.psb + self.service_helper .tc_in_mem_converter - .cache_ecss_tc_in_memory(&ecss_tc_and_token)?; - let tc = PusTcReader::new(self.psb.tc_in_mem_converter.tc_slice_raw())?.0; + .cache_ecss_tc_in_memory(&ecss_tc_and_token.tc_in_memory)?; + let tc = PusTcReader::new(self.service_helper.tc_in_mem_converter.tc_slice_raw())?.0; let subservice = tc.subservice(); let mut partial_error = None; let time_stamp = PusServiceBase::get_current_timestamp(&mut partial_error); @@ -116,7 +116,7 @@ impl PusService8ActionHandler { let fail_data = [subservice]; - self.psb + self.service_helper .common .verification_handler .get_mut() @@ -142,7 +142,7 @@ impl PusService8ActionHandler, + pub(crate) pus_8_handler: PusService8ActionHandler, } impl Pus8Wrapper { diff --git a/satrs-example/src/pus/event.rs b/satrs-example/src/pus/event.rs index 3dc9dc1..08aa786 100644 --- a/satrs-example/src/pus/event.rs +++ b/satrs-example/src/pus/event.rs @@ -1,9 +1,9 @@ use log::{error, warn}; use satrs_core::pus::event_srv::PusService5EventHandler; -use satrs_core::pus::{EcssTcInStoreConverter, PusPacketHandlerResult}; +use satrs_core::pus::{EcssTcInSharedStoreConverter, PusPacketHandlerResult}; pub struct Pus5Wrapper { - pub pus_5_handler: PusService5EventHandler, + pub pus_5_handler: PusService5EventHandler, } impl Pus5Wrapper { diff --git a/satrs-example/src/pus/hk.rs b/satrs-example/src/pus/hk.rs index 94b21b2..ef373c4 100644 --- a/satrs-example/src/pus/hk.rs +++ b/satrs-example/src/pus/hk.rs @@ -3,8 +3,8 @@ use log::{error, warn}; use satrs_core::hk::{CollectionIntervalFactor, HkRequest}; use satrs_core::pus::verification::{FailParams, StdVerifReporterWithSender}; use satrs_core::pus::{ - EcssTcInMemConverter, EcssTcInStoreConverter, EcssTcReceiver, EcssTmSender, - PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHandler, + EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcReceiver, EcssTmSender, + PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHelper, }; use satrs_core::spacepackets::ecss::{hk, PusPacket}; use satrs_example::{hk_err, tmtc_err, TargetIdWithApid}; @@ -12,7 +12,7 @@ use std::collections::HashMap; use std::sync::mpsc::Sender; pub struct PusService3HkHandler { - psb: PusServiceHandler, + psb: PusServiceHelper, request_handlers: HashMap>, } @@ -26,7 +26,7 @@ impl PusService3HkHandler>, ) -> Self { Self { - psb: PusServiceHandler::new( + psb: PusServiceHelper::new( tc_receiver, tm_sender, tm_apid, @@ -46,7 +46,7 @@ impl PusService3HkHandler PusService3HkHandler, + pub(crate) pus_3_handler: PusService3HkHandler, } impl Pus3Wrapper { diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index 36c622e..75f3494 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -1,50 +1,54 @@ use crate::tmtc::PusTcSource; use log::{error, info, warn}; -use satrs_core::pus::scheduler::TcInfo; +use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool}; +use satrs_core::pus::scheduler::{PusScheduler, TcInfo}; use satrs_core::pus::scheduler_srv::PusService11SchedHandler; -use satrs_core::pus::{EcssTcInStoreConverter, PusPacketHandlerResult}; +use satrs_core::pus::{EcssTcInSharedStoreConverter, PusPacketHandlerResult}; pub struct Pus11Wrapper { - pub pus_11_handler: PusService11SchedHandler, + pub pus_11_handler: PusService11SchedHandler, + pub sched_tc_pool: StaticMemoryPool, pub tc_source_wrapper: PusTcSource, } impl Pus11Wrapper { pub fn release_tcs(&mut self) { - let releaser = |enabled: bool, info: &TcInfo| -> bool { + let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool { if enabled { + // Transfer TC from scheduler TC pool to shared TC pool. + let released_tc_addr = self + .tc_source_wrapper + .tc_store + .pool + .write() + .expect("locking pool failed") + .add(tc) + .expect("adding TC to shared pool failed"); + self.tc_source_wrapper .tc_source - .send(info.addr()) + .send(released_tc_addr) .expect("sending TC to TC source failed"); } true }; - let mut pool = self - .tc_source_wrapper - .tc_store - .pool - .write() - .expect("error locking pool"); - self.pus_11_handler .scheduler_mut() .update_time_from_now() .unwrap(); - if let Ok(released_tcs) = self + let released_tcs = self .pus_11_handler .scheduler_mut() - .release_telecommands(releaser, pool.as_mut()) - { - if released_tcs > 0 { - info!("{released_tcs} TC(s) released from scheduler"); - } + .release_telecommands(releaser, &mut self.sched_tc_pool) + .expect("releasing TCs failed"); + if released_tcs > 0 { + info!("{released_tcs} TC(s) released from scheduler"); } } pub fn handle_next_packet(&mut self) -> bool { - match self.pus_11_handler.handle_one_tc() { + match self.pus_11_handler.handle_one_tc(&mut self.sched_tc_pool) { Ok(result) => match result { PusPacketHandlerResult::RequestHandled => {} PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index 3b08391..a52d111 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -7,12 +7,12 @@ use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::spacepackets::time::cds::TimeProvider; use satrs_core::spacepackets::time::TimeWriter; -use satrs_core::{events::EventU32, pus::EcssTcInStoreConverter}; +use satrs_core::{events::EventU32, pus::EcssTcInSharedStoreConverter}; use satrs_example::{tmtc_err, TEST_EVENT}; use std::sync::mpsc::Sender; pub struct Service17CustomWrapper { - pub pus17_handler: PusService17TestHandler, + pub pus17_handler: PusService17TestHandler, pub test_srv_event_sender: Sender<(EventU32, Option)>, } @@ -38,9 +38,13 @@ impl Service17CustomWrapper { warn!("PUS17: Subservice {subservice} not implemented") } PusPacketHandlerResult::CustomSubservice(subservice, token) => { - let (tc, _) = - PusTcReader::new(self.pus17_handler.psb.tc_in_mem_converter.tc_slice_raw()) - .unwrap(); + let (tc, _) = PusTcReader::new( + self.pus17_handler + .service_helper + .tc_in_mem_converter + .tc_slice_raw(), + ) + .unwrap(); let time_stamper = TimeProvider::from_now_with_u16_days().unwrap(); let mut stamp_buf: [u8; 7] = [0; 7]; time_stamper.write_to_bytes(&mut stamp_buf).unwrap(); @@ -51,14 +55,14 @@ impl Service17CustomWrapper { .expect("Sending test event failed"); let start_token = self .pus17_handler - .psb + .service_helper .common .verification_handler .get_mut() .start_success(token, Some(&stamp_buf)) .expect("Error sending start success"); self.pus17_handler - .psb + .service_helper .common .verification_handler .get_mut() @@ -67,7 +71,7 @@ impl Service17CustomWrapper { } else { let fail_data = [tc.subservice()]; self.pus17_handler - .psb + .service_helper .common .verification_handler .get_mut() diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 40b2b6b..b4a584d 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -5,7 +5,7 @@ use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use thiserror::Error; use crate::pus::PusReceiver; -use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; +use satrs_core::pool::{PoolProviderMemInPlace, SharedStaticMemoryPool, StoreAddr, StoreError}; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::tmtc::tm_helper::SharedTmStore; @@ -41,7 +41,7 @@ pub enum MpscStoreAndSendError { #[derive(Clone)] pub struct TcStore { - pub pool: SharedPool, + pub pool: SharedStaticMemoryPool, } impl TcStore { diff --git a/satrs-example/src/udp.rs b/satrs-example/src/udp.rs index e3ca9f6..3853fd3 100644 --- a/satrs-example/src/udp.rs +++ b/satrs-example/src/udp.rs @@ -3,7 +3,7 @@ use std::{net::SocketAddr, sync::mpsc::Receiver}; use log::{info, warn}; use satrs_core::{ hal::std::udp_server::{ReceiveResult, UdpTcServer}, - pool::{SharedPool, StoreAddr}, + pool::{PoolProviderMemInPlaceWithGuards, SharedStaticMemoryPool, StoreAddr}, tmtc::CcsdsError, }; @@ -12,7 +12,7 @@ use crate::tmtc::MpscStoreAndSendError; pub struct UdpTmtcServer { pub udp_tc_server: UdpTcServer>, pub tm_rx: Receiver, - pub tm_store: SharedPool, + pub tm_store: SharedStaticMemoryPool, } impl UdpTmtcServer { pub fn periodic_operation(&mut self) {