diff --git a/satrs-core/src/pool.rs b/satrs-core/src/pool.rs index 3e644d3..ebbec26 100644 --- a/satrs-core/src/pool.rs +++ b/satrs-core/src/pool.rs @@ -13,11 +13,11 @@ //! # Example //! //! ``` -//! use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider}; +//! use satrs_core::pool::{StaticMemoryPool, PoolCfg, PoolProvider}; //! //! // 4 buckets of 4 bytes, 2 of 8 bytes and 1 of 16 bytes //! let pool_cfg = PoolCfg::new(vec![(4, 4), (2, 8), (1, 16)]); -//! let mut local_pool = LocalPool::new(pool_cfg); +//! let mut local_pool = StaticMemoryPool::new(pool_cfg); //! let mut addr; //! { //! // Add new data to the pool @@ -83,16 +83,17 @@ use serde::{Deserialize, Serialize}; use std::error::Error; type NumBlocks = u16; +pub type StoreAddr = u64; /// Simple address type used for transactions with the local pool. #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct StoreAddr { +pub struct StaticPoolAddr { pub(crate) pool_idx: u16, pub(crate) packet_idx: NumBlocks, } -impl StoreAddr { +impl StaticPoolAddr { pub const INVALID_ADDR: u32 = 0xFFFFFFFF; pub fn raw(&self) -> u32 { @@ -100,7 +101,22 @@ impl StoreAddr { } } -impl Display for StoreAddr { +impl From for StoreAddr { + fn from(value: StaticPoolAddr) -> Self { + ((value.pool_idx as u64) << 16) | value.packet_idx as u64 + } +} + +impl From for StaticPoolAddr { + fn from(value: StoreAddr) -> Self { + Self { + pool_idx: ((value >> 16) & 0xff) as u16, + packet_idx: (value & 0xff) as u16, + } + } +} + +impl Display for StaticPoolAddr { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { write!( f, @@ -182,6 +198,7 @@ impl Error for StoreError { #[cfg(feature = "alloc")] mod alloc_mod { + use super::StaticPoolAddr; use crate::pool::{NumBlocks, StoreAddr, StoreError, StoreIdError}; use alloc::boxed::Box; use alloc::vec; @@ -229,7 +246,7 @@ mod alloc_mod { } pub struct PoolGuard<'a> { - pool: &'a mut LocalPool, + pool: &'a mut StaticMemoryPool, pub addr: StoreAddr, no_deletion: bool, deletion_failed_error: Option, @@ -237,7 +254,7 @@ mod alloc_mod { /// This helper object impl<'a> PoolGuard<'a> { - pub fn new(pool: &'a mut LocalPool, addr: StoreAddr) -> Self { + pub fn new(pool: &'a mut StaticMemoryPool, addr: StoreAddr) -> Self { Self { pool, addr, @@ -272,7 +289,7 @@ mod alloc_mod { } impl<'a> PoolRwGuard<'a> { - pub fn new(pool: &'a mut LocalPool, addr: StoreAddr) -> Self { + pub fn new(pool: &'a mut StaticMemoryPool, addr: StoreAddr) -> Self { Self { guard: PoolGuard::new(pool, addr), } @@ -345,18 +362,18 @@ mod alloc_mod { /// Pool implementation providing sub-pools with fixed size memory blocks. More details in /// the [module documentation][crate::pool] - pub struct LocalPool { + pub struct StaticMemoryPool { pool_cfg: PoolCfg, pool: Vec>, sizes_lists: Vec>, } - impl LocalPool { + impl StaticMemoryPool { /// Create a new local pool from the [given configuration][PoolCfg]. This function will sanitize /// the given configuration as well. - pub fn new(mut cfg: PoolCfg) -> LocalPool { + pub fn new(mut cfg: PoolCfg) -> StaticMemoryPool { let subpools_num = cfg.sanitize(); - let mut local_pool = LocalPool { + let mut local_pool = StaticMemoryPool { pool_cfg: cfg, pool: Vec::with_capacity(subpools_num), sizes_lists: Vec::with_capacity(subpools_num), @@ -372,39 +389,39 @@ mod alloc_mod { local_pool } - fn addr_check(&self, addr: &StoreAddr) -> Result { + fn addr_check(&self, addr: &StaticPoolAddr) -> Result { self.validate_addr(addr)?; let pool_idx = addr.pool_idx as usize; let size_list = self.sizes_lists.get(pool_idx).unwrap(); let curr_size = size_list[addr.packet_idx as usize]; if curr_size == STORE_FREE { - return Err(StoreError::DataDoesNotExist(*addr)); + return Err(StoreError::DataDoesNotExist(StoreAddr::from(*addr))); } Ok(curr_size) } - fn validate_addr(&self, addr: &StoreAddr) -> Result<(), StoreError> { + fn validate_addr(&self, addr: &StaticPoolAddr) -> Result<(), StoreError> { let pool_idx = addr.pool_idx as usize; if pool_idx >= self.pool_cfg.cfg.len() { return Err(StoreError::InvalidStoreId( StoreIdError::InvalidSubpool(addr.pool_idx), - Some(*addr), + Some(StoreAddr::from(*addr)), )); } if addr.packet_idx >= self.pool_cfg.cfg[addr.pool_idx as usize].0 { return Err(StoreError::InvalidStoreId( StoreIdError::InvalidPacketIdx(addr.packet_idx), - Some(*addr), + Some(StoreAddr::from(*addr)), )); } Ok(()) } - fn reserve(&mut self, data_len: usize) -> Result { + fn reserve(&mut self, data_len: usize) -> Result { let subpool_idx = self.find_subpool(data_len, 0)?; let (slot, size_slot_ref) = self.find_empty(subpool_idx)?; *size_slot_ref = data_len; - Ok(StoreAddr { + Ok(StaticPoolAddr { pool_idx: subpool_idx, packet_idx: slot, }) @@ -422,7 +439,7 @@ mod alloc_mod { Err(StoreError::DataTooLarge(req_size)) } - fn write(&mut self, addr: &StoreAddr, data: &[u8]) -> Result<(), StoreError> { + fn write(&mut self, addr: &StaticPoolAddr, data: &[u8]) -> Result<(), StoreError> { let packet_pos = self.raw_pos(addr).ok_or(StoreError::InternalError(0))?; let subpool = self .pool @@ -449,13 +466,13 @@ mod alloc_mod { Err(StoreError::StoreFull(subpool)) } - fn raw_pos(&self, addr: &StoreAddr) -> Option { + fn raw_pos(&self, addr: &StaticPoolAddr) -> Option { let (_, size) = self.pool_cfg.cfg.get(addr.pool_idx as usize)?; Some(addr.packet_idx as usize * size) } } - impl PoolProvider for LocalPool { + impl PoolProvider for StaticMemoryPool { fn add(&mut self, data: &[u8]) -> Result { let data_len = data.len(); if data_len > POOL_MAX_SIZE { @@ -463,7 +480,7 @@ mod alloc_mod { } let addr = self.reserve(data_len)?; self.write(&addr, data)?; - Ok(addr) + Ok(addr.into()) } fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError> { @@ -474,12 +491,13 @@ mod alloc_mod { let raw_pos = self.raw_pos(&addr).unwrap(); let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + len]; - Ok((addr, block)) + Ok((addr.into(), block)) } fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError> { - let curr_size = self.addr_check(addr)?; - let raw_pos = self.raw_pos(addr).unwrap(); + let addr = StaticPoolAddr::from(*addr); + let curr_size = self.addr_check(&addr)?; + let raw_pos = self.raw_pos(&addr).unwrap(); let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap() [raw_pos..raw_pos + curr_size]; Ok(block) @@ -490,8 +508,9 @@ mod alloc_mod { } fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError> { - let curr_size = self.addr_check(addr)?; - let raw_pos = self.raw_pos(addr).unwrap(); + let addr = StaticPoolAddr::from(*addr); + let curr_size = self.addr_check(&addr)?; + let raw_pos = self.raw_pos(&addr).unwrap(); let block = &self.pool.get(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + curr_size]; Ok(block) @@ -502,6 +521,7 @@ mod alloc_mod { } fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError> { + let addr = StaticPoolAddr::from(addr); self.addr_check(&addr)?; let block_size = self.pool_cfg.cfg.get(addr.pool_idx as usize).unwrap().1; let raw_pos = self.raw_pos(&addr).unwrap(); @@ -514,7 +534,8 @@ mod alloc_mod { } fn has_element_at(&self, addr: &StoreAddr) -> Result { - self.validate_addr(addr)?; + let addr = StaticPoolAddr::from(*addr); + self.validate_addr(&addr)?; let pool_idx = addr.pool_idx as usize; let size_list = self.sizes_lists.get(pool_idx).unwrap(); let curr_size = size_list[addr.packet_idx as usize]; @@ -529,15 +550,15 @@ mod alloc_mod { #[cfg(test)] mod tests { use crate::pool::{ - LocalPool, PoolCfg, PoolGuard, PoolProvider, PoolRwGuard, StoreAddr, StoreError, - StoreIdError, POOL_MAX_SIZE, + PoolCfg, PoolGuard, PoolProvider, PoolRwGuard, StaticMemoryPool, StaticPoolAddr, StoreAddr, + StoreError, StoreIdError, POOL_MAX_SIZE, }; use std::vec; - fn basic_small_pool() -> LocalPool { + fn basic_small_pool() -> StaticMemoryPool { // 4 buckets of 4 bytes, 2 of 8 bytes and 1 of 16 bytes let pool_cfg = PoolCfg::new(vec![(4, 4), (2, 8), (1, 16)]); - LocalPool::new(pool_cfg) + StaticMemoryPool::new(pool_cfg) } #[test] @@ -600,10 +621,11 @@ mod tests { let (addr, buf_ref) = res.unwrap(); assert_eq!( addr, - StoreAddr { + StaticPoolAddr { pool_idx: 2, packet_idx: 0 } + .into() ); assert_eq!(buf_ref.len(), 12); } @@ -655,10 +677,13 @@ mod tests { fn test_read_does_not_exist() { let local_pool = basic_small_pool(); // Try to access data which does not exist - let res = local_pool.read(&StoreAddr { - packet_idx: 0, - pool_idx: 0, - }); + let res = local_pool.read( + &StaticPoolAddr { + packet_idx: 0, + pool_idx: 0, + } + .into(), + ); assert!(res.is_err()); assert!(matches!( res.unwrap_err(), @@ -684,10 +709,11 @@ mod tests { #[test] fn test_invalid_pool_idx() { let local_pool = basic_small_pool(); - let addr = StoreAddr { + let addr = StaticPoolAddr { pool_idx: 3, packet_idx: 0, - }; + } + .into(); let res = local_pool.read(&addr); assert!(res.is_err()); let err = res.unwrap_err(); @@ -700,12 +726,12 @@ mod tests { #[test] fn test_invalid_packet_idx() { let local_pool = basic_small_pool(); - let addr = StoreAddr { + let addr = StaticPoolAddr { pool_idx: 2, packet_idx: 1, }; assert_eq!(addr.raw(), 0x00020001); - let res = local_pool.read(&addr); + let res = local_pool.read(&addr.into()); assert!(res.is_err()); let err = res.unwrap_err(); assert!(matches!( diff --git a/satrs-core/src/pus/event_srv.rs b/satrs-core/src/pus/event_srv.rs index c467c30..3099830 100644 --- a/satrs-core/src/pus/event_srv.rs +++ b/satrs-core/src/pus/event_srv.rs @@ -152,7 +152,7 @@ mod tests { let (common, srv_handler) = PusServiceHandlerWithStoreCommon::new(); Self { common, - handler: PusService5EventHandler::new(srv_handler, event_request_tx) + handler: PusService5EventHandler::new(srv_handler, event_request_tx), } } } diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index d833891..415d231 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -924,7 +924,7 @@ pub mod tests { use spacepackets::ecss::{PusPacket, WritablePusPacket}; use spacepackets::CcsdsPacket; - use crate::pool::{LocalPool, PoolCfg, SharedPool, StoreAddr}; + use crate::pool::{PoolCfg, SharedPool, StaticMemoryPool, StoreAddr}; use crate::pus::verification::RequestId; use crate::tmtc::tm_helper::SharedTmStore; @@ -988,8 +988,8 @@ pub mod tests { /// The PUS service handler is instantiated with a [EcssTcInStoreConverter]. pub fn new() -> (Self, PusServiceHelper) { let pool_cfg = PoolCfg::new(vec![(16, 16), (8, 32), (4, 64)]); - let tc_pool = LocalPool::new(pool_cfg.clone()); - let tm_pool = LocalPool::new(pool_cfg); + let tc_pool = StaticMemoryPool::new(pool_cfg.clone()); + let tm_pool = StaticMemoryPool::new(pool_cfg); let shared_tc_pool = SharedPool::new(RwLock::new(Box::new(tc_pool))); let shared_tm_pool = SharedTmStore::new(Box::new(tm_pool)); let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel(); diff --git a/satrs-core/src/pus/scheduler.rs b/satrs-core/src/pus/scheduler.rs index 61883ee..70adaaa 100644 --- a/satrs-core/src/pus/scheduler.rs +++ b/satrs-core/src/pus/scheduler.rs @@ -7,12 +7,13 @@ use core::fmt::Debug; use serde::{Deserialize, Serialize}; use spacepackets::ecss::scheduling::TimeWindowType; use spacepackets::ecss::tc::{GenericPusTcSecondaryHeader, IsPusTelecommand}; -use spacepackets::time::CcsdsTimeProvider; +use spacepackets::ecss::PusPacket; +use spacepackets::time::{CcsdsTimeProvider, TimeReader, UnixTimestamp}; use spacepackets::CcsdsPacket; #[cfg(feature = "std")] use std::error::Error; -use crate::pool::StoreAddr; +use crate::pool::{PoolProvider, StoreError}; #[cfg(feature = "alloc")] pub use alloc_mod::*; @@ -56,17 +57,19 @@ impl RequestId { } } +pub type AddrInStore = u64; + /// This is the format stored internally by the TC scheduler for each scheduled telecommand. -/// It consists of the address of that telecommand in the TC pool and a request ID. +/// It consists of a generic address for that telecommand in the TC pool and a request ID. #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct TcInfo { - addr: StoreAddr, + addr: AddrInStore, request_id: RequestId, } impl TcInfo { - pub fn addr(&self) -> StoreAddr { + pub fn addr(&self) -> AddrInStore { self.addr } @@ -74,7 +77,7 @@ impl TcInfo { self.request_id } - pub fn new(addr: StoreAddr, request_id: RequestId) -> Self { + pub fn new(addr: u64, request_id: RequestId) -> Self { TcInfo { addr, request_id } } } @@ -133,6 +136,43 @@ impl TimeWindow { } } +pub trait PusSchedulerInterface { + type TimeProvider: CcsdsTimeProvider + TimeReader; + + fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError>; + + fn is_enabled(&self) -> bool; + + fn enable(&mut self); + + /// A disabled scheduler should still delete commands where the execution time has been reached + /// but should not release them to be executed. + fn disable(&mut self); + + fn insert_wrapped_tc( + &mut self, + pus_tc: &(impl IsPusTelecommand + PusPacket + GenericPusTcSecondaryHeader), + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result; + + /// Insert a telecommand which was already unwrapped from the outer Service 11 packet and stored + /// inside the telecommand packet pool. + fn insert_unwrapped_and_stored_tc( + &mut self, + time_stamp: UnixTimestamp, + info: TcInfo, + ) -> Result<(), ScheduleError>; + + /// Insert a telecommand which was already unwrapped from the outer Service 11 packet but still + /// needs to be stored inside the telecommand pool. + fn insert_unwrapped_tc( + &mut self, + time_stamp: UnixTimestamp, + tc: &[u8], + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result; +} + #[cfg(feature = "alloc")] pub mod alloc_mod { use super::*; @@ -292,44 +332,6 @@ pub mod alloc_mod { num_entries } - pub fn is_enabled(&self) -> bool { - self.enabled - } - - pub fn enable(&mut self) { - self.enabled = true; - } - - /// A disabled scheduler should still delete commands where the execution time has been reached - /// but should not release them to be executed. - pub fn disable(&mut self) { - self.enabled = false; - } - - /// This will disable the scheduler and clear the schedule as specified in 6.11.4.4. - /// Be careful with this command as it will delete all the commands in the schedule. - /// - /// The holding store for the telecommands needs to be passed so all the stored telecommands - /// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error - /// will be returned but the method will still try to delete all the commands in the schedule. - pub fn reset( - &mut self, - store: &mut (impl PoolProvider + ?Sized), - ) -> Result<(), StoreError> { - self.enabled = false; - let mut deletion_ok = Ok(()); - for tc_lists in &mut self.tc_map { - for tc in tc_lists.1 { - let res = store.delete(tc.addr); - if res.is_err() { - deletion_ok = res; - } - } - } - self.tc_map.clear(); - deletion_ok - } - pub fn update_time(&mut self, current_time: UnixTimestamp) { self.current_time = current_time; } @@ -387,29 +389,6 @@ pub mod alloc_mod { } } - /// Insert a telecommand based on the fully wrapped time-tagged telecommand. The timestamp - /// provider needs to be supplied via a generic. - pub fn insert_wrapped_tc( - &mut self, - pus_tc: &(impl IsPusTelecommand + PusPacket + GenericPusTcSecondaryHeader), - pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - if PusPacket::service(pus_tc) != 11 { - return Err(ScheduleError::WrongService); - } - if PusPacket::subservice(pus_tc) != 4 { - return Err(ScheduleError::WrongSubservice); - } - if pus_tc.user_data().is_empty() { - return Err(ScheduleError::TcDataEmpty); - } - let user_data = pus_tc.user_data(); - let stamp: TimeStamp = TimeReader::from_bytes(user_data)?; - let unix_stamp = stamp.unix_stamp(); - let stamp_len = stamp.len_as_bytes(); - self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool) - } - /// Insert a telecommand based on the fully wrapped time-tagged telecommand using a CDS /// short timestamp with 16-bit length of days field. pub fn insert_wrapped_tc_cds_short( @@ -618,12 +597,123 @@ pub mod alloc_mod { .map_err(|e| (released_tcs, e)) } } + + impl PusSchedulerInterface for PusScheduler { + type TimeProvider = cds::TimeProvider; + + /// This will disable the scheduler and clear the schedule as specified in 6.11.4.4. + /// Be careful with this command as it will delete all the commands in the schedule. + /// + /// The holding store for the telecommands needs to be passed so all the stored telecommands + /// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error + /// will be returned but the method will still try to delete all the commands in the schedule. + fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError> { + self.enabled = false; + let mut deletion_ok = Ok(()); + for tc_lists in &mut self.tc_map { + for tc in tc_lists.1 { + let res = store.delete(tc.addr); + if res.is_err() { + deletion_ok = res; + } + } + } + self.tc_map.clear(); + deletion_ok + } + + fn is_enabled(&self) -> bool { + self.enabled + } + + fn enable(&mut self) { + self.enabled = true; + } + + /// A disabled scheduler should still delete commands where the execution time has been reached + /// but should not release them to be executed. + fn disable(&mut self) { + self.enabled = false; + } + + /// Insert a telecommand based on the fully wrapped time-tagged telecommand. The timestamp + /// provider needs to be supplied via a generic. + fn insert_wrapped_tc( + &mut self, + pus_tc: &(impl IsPusTelecommand + PusPacket + GenericPusTcSecondaryHeader), + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + if PusPacket::service(pus_tc) != 11 { + return Err(ScheduleError::WrongService); + } + if PusPacket::subservice(pus_tc) != 4 { + return Err(ScheduleError::WrongSubservice); + } + if pus_tc.user_data().is_empty() { + return Err(ScheduleError::TcDataEmpty); + } + let user_data = pus_tc.user_data(); + let stamp: Self::TimeProvider = TimeReader::from_bytes(user_data)?; + let unix_stamp = stamp.unix_stamp(); + let stamp_len = stamp.len_as_bytes(); + self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool) + } + + fn insert_unwrapped_and_stored_tc( + &mut self, + time_stamp: UnixTimestamp, + info: TcInfo, + ) -> Result<(), ScheduleError> { + if time_stamp < self.current_time + self.time_margin { + return Err(ScheduleError::ReleaseTimeInTimeMargin( + self.current_time, + self.time_margin, + time_stamp, + )); + } + match self.tc_map.entry(time_stamp) { + Entry::Vacant(e) => { + e.insert(vec![info]); + } + Entry::Occupied(mut v) => { + v.get_mut().push(info); + } + } + Ok(()) + } + + /// Insert a telecommand which was already unwrapped from the outer Service 11 packet but still + /// needs to be stored inside the telecommand pool. + fn insert_unwrapped_tc( + &mut self, + time_stamp: UnixTimestamp, + tc: &[u8], + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + let check_tc = PusTcReader::new(tc)?; + if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { + return Err(ScheduleError::NestedScheduledTc); + } + let req_id = RequestId::from_tc(&check_tc.0); + + match pool.add(tc) { + Ok(addr) => { + let info = TcInfo::new(addr, req_id); + self.insert_unwrapped_and_stored_tc(time_stamp, info)?; + Ok(info) + } + Err(err) => Err(err.into()), + } + } + } } #[cfg(test)] mod tests { use super::*; - use crate::pool::{LocalPool, PoolCfg, PoolProvider, StoreAddr, StoreError}; + use crate::pool::{ + PoolCfg, PoolProvider, StaticMemoryPool, StaticPoolAddr, StoreAddr, StoreError, + }; use alloc::collections::btree_map::Range; use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader}; use spacepackets::ecss::WritablePusPacket; @@ -694,7 +784,7 @@ mod tests { } fn ping_tc_to_store( - pool: &mut LocalPool, + pool: &mut StaticMemoryPool, buf: &mut [u8], seq_count: u16, app_data: Option<&'static [u8]>, @@ -718,7 +808,7 @@ mod tests { #[test] fn reset() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -769,10 +859,10 @@ mod tests { .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(100), TcInfo::new( - StoreAddr { + StoreAddr::from(StaticPoolAddr { pool_idx: 0, packet_idx: 1, - }, + }), RequestId { seq_count: 1, apid: 0, @@ -786,10 +876,10 @@ mod tests { .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(100), TcInfo::new( - StoreAddr { + StoreAddr::from(StaticPoolAddr { pool_idx: 0, packet_idx: 2, - }, + }), RequestId { seq_count: 2, apid: 1, @@ -803,10 +893,11 @@ mod tests { .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(300), TcInfo::new( - StoreAddr { + StaticPoolAddr { pool_idx: 0, packet_idx: 2, - }, + } + .into(), RequestId { source_id: 10, seq_count: 20, @@ -869,7 +960,7 @@ mod tests { } #[test] fn release_basic() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -932,7 +1023,7 @@ mod tests { #[test] fn release_multi_with_same_time() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -989,7 +1080,7 @@ mod tests { #[test] fn release_with_scheduler_disabled() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -1057,7 +1148,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); @@ -1103,7 +1194,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 32] = [0; 32]; let tc = scheduled_tc(UnixTimestamp::new_only_seconds(100), &mut buf); @@ -1149,7 +1240,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 32] = [0; 32]; let tc = wrong_tc_service(UnixTimestamp::new_only_seconds(100), &mut buf); @@ -1170,7 +1261,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 32] = [0; 32]; let tc = wrong_tc_subservice(UnixTimestamp::new_only_seconds(100), &mut buf); @@ -1190,7 +1281,7 @@ mod tests { fn insert_wrapped_tc_faulty_app_data() { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let tc = invalid_time_tagged_cmd(); let insert_res = scheduler.insert_wrapped_tc::(&tc, &mut pool); assert!(insert_res.is_err()); @@ -1205,7 +1296,7 @@ mod tests { fn insert_doubly_wrapped_time_tagged_cmd() { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 64] = [0; 64]; let tc = double_wrapped_time_tagged_tc(UnixTimestamp::new_only_seconds(50), &mut buf); let insert_res = scheduler.insert_wrapped_tc::(&tc, &mut pool); @@ -1241,7 +1332,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 32] = [0; 32]; @@ -1261,7 +1352,7 @@ mod tests { #[test] fn test_store_error_propagation_release() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; @@ -1295,7 +1386,7 @@ mod tests { #[test] fn test_store_error_propagation_reset() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; @@ -1319,7 +1410,7 @@ mod tests { #[test] fn test_delete_by_req_id_simple_retrieve_addr() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; @@ -1338,7 +1429,7 @@ mod tests { #[test] fn test_delete_by_req_id_simple_delete_all() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; @@ -1357,7 +1448,7 @@ mod tests { #[test] fn test_delete_by_req_id_complex() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; @@ -1404,7 +1495,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let mut pool = LocalPool::new(PoolCfg::new(vec![(1, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(1, 64)])); let mut buf: [u8; 32] = [0; 32]; // Store is full after this. @@ -1424,7 +1515,7 @@ mod tests { } fn insert_command_with_release_time( - pool: &mut LocalPool, + pool: &mut StaticMemoryPool, scheduler: &mut PusScheduler, seq_count: u16, release_secs: u64, @@ -1443,7 +1534,7 @@ mod tests { #[test] fn test_time_window_retrieval_select_all() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let tc_info_0 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); @@ -1474,7 +1565,7 @@ mod tests { #[test] fn test_time_window_retrieval_select_from_stamp() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let _ = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); @@ -1505,7 +1596,7 @@ mod tests { #[test] fn test_time_window_retrieval_select_to_time() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let tc_info_0 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); @@ -1536,7 +1627,7 @@ mod tests { #[test] fn test_time_window_retrieval_select_from_time_to_time() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let _ = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); @@ -1571,7 +1662,7 @@ mod tests { #[test] fn test_deletion_all() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); @@ -1598,7 +1689,7 @@ mod tests { #[test] fn test_deletion_from_start_time() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); @@ -1619,7 +1710,7 @@ mod tests { #[test] fn test_deletion_to_end_time() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let cmd_0_to_delete = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); @@ -1641,7 +1732,7 @@ mod tests { #[test] fn test_deletion_from_start_time_to_end_time() { - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut pool = StaticMemoryPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let cmd_out_of_range_0 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); diff --git a/satrs-core/src/pus/scheduler_srv.rs b/satrs-core/src/pus/scheduler_srv.rs index b5a7d13..71927b9 100644 --- a/satrs-core/src/pus/scheduler_srv.rs +++ b/satrs-core/src/pus/scheduler_srv.rs @@ -1,6 +1,6 @@ +use super::scheduler::PusSchedulerInterface; use super::{EcssTcInMemConverter, PusServiceBase, PusServiceHelper}; use crate::pool::SharedPool; -use crate::pus::scheduler::PusScheduler; use crate::pus::{PusPacketHandlerResult, PusPacketHandlingError}; use spacepackets::ecss::{scheduling, PusPacket}; use spacepackets::time::cds::TimeProvider; @@ -13,17 +13,22 @@ use spacepackets::time::cds::TimeProvider; /// telecommands inside the scheduler. The user can retrieve the wrapped scheduler via the /// [Self::scheduler] and [Self::scheduler_mut] function and then use the scheduler API to release /// telecommands when applicable. -pub struct PusService11SchedHandler { +pub struct PusService11SchedHandler< + TcInMemConverter: EcssTcInMemConverter, + Scheduler: PusSchedulerInterface, +> { pub service_helper: PusServiceHelper, shared_tc_store: SharedPool, - scheduler: PusScheduler, + scheduler: Scheduler, } -impl PusService11SchedHandler { +impl + PusService11SchedHandler +{ pub fn new( service_helper: PusServiceHelper, shared_tc_store: SharedPool, - scheduler: PusScheduler, + scheduler: Scheduler, ) -> Self { Self { service_helper, @@ -32,11 +37,11 @@ impl PusService11SchedHandler &mut PusScheduler { + pub fn scheduler_mut(&mut self) -> &mut Scheduler { &mut self.scheduler } - pub fn scheduler(&self) -> &PusScheduler { + pub fn scheduler(&self) -> &Scheduler { &self.scheduler } @@ -50,11 +55,11 @@ impl PusService11SchedHandler PusService11SchedHandler { // Treat unhandled standard subservices as custom subservices for now. return Ok(PusPacketHandlerResult::CustomSubservice( - tc.subservice(), + subservice, ecss_tc_and_token.token, )); } @@ -169,9 +174,9 @@ mod tests { events::EventU32, pool::SharedPool, pus::{ - scheduler::{PusScheduler, RequestId}, + scheduler::PusScheduler, tests::{PusServiceHandlerWithStoreCommon, PusTestHarness}, - verification::{TcStateAccepted, VerificationToken}, + verification::{RequestId, TcStateAccepted, VerificationToken}, EcssTcInStoreConverter, PusPacketHandlerResult, PusPacketHandlingError, }, }; @@ -184,7 +189,7 @@ mod tests { struct Pus11HandlerWithStoreTester { common: PusServiceHandlerWithStoreCommon, - handler: PusService11SchedHandler, + handler: PusService11SchedHandler, } impl Pus11HandlerWithStoreTester { @@ -211,4 +216,7 @@ mod tests { } } } + + #[test] + fn test_scheduling_tc() {} } diff --git a/satrs-core/src/pus/test.rs b/satrs-core/src/pus/test.rs index fb548db..a469e2d 100644 --- a/satrs-core/src/pus/test.rs +++ b/satrs-core/src/pus/test.rs @@ -153,7 +153,7 @@ mod tests { let (common, srv_handler) = PusServiceHandlerWithVecCommon::new(); Self { common, - handler: PusService17TestHandler::new(srv_handler) + handler: PusService17TestHandler::new(srv_handler), } } } diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index 77261fe..2a8e4ab 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -15,7 +15,7 @@ //! ``` //! use std::sync::{Arc, mpsc, RwLock}; //! use std::time::Duration; -//! use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; +//! use satrs_core::pool::{StaticMemoryPool, PoolCfg, PoolProvider, SharedPool}; //! use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; //! use satrs_core::seq_count::SeqCountProviderSimple; //! use satrs_core::pus::MpscTmInStoreSender; @@ -29,7 +29,7 @@ //! const TEST_APID: u16 = 0x02; //! //! let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); -//! let tm_pool = LocalPool::new(pool_cfg.clone()); +//! let tm_pool = StaticMemoryPool::new(pool_cfg.clone()); //! let shared_tm_store = SharedTmStore::new(Box::new(tm_pool)); //! let tm_store = shared_tm_store.clone_backing_pool(); //! let (verif_tx, verif_rx) = mpsc::channel(); @@ -1325,7 +1325,7 @@ mod std_mod { #[cfg(test)] mod tests { - use crate::pool::{LocalPool, PoolCfg}; + use crate::pool::{PoolCfg, StaticMemoryPool}; use crate::pus::tests::CommonTmInfo; use crate::pus::verification::{ EcssTmSenderCore, EcssTmtcError, FailParams, FailParamsWithStep, RequestId, TcStateNone, @@ -1487,7 +1487,7 @@ mod tests { #[test] fn test_mpsc_verif_send_sync() { - let pool = LocalPool::new(PoolCfg::new(vec![(8, 8)])); + let pool = StaticMemoryPool::new(PoolCfg::new(vec![(8, 8)])); let tm_store = Box::new(pool); let shared_tm_store = SharedTmStore::new(tm_store); let (tx, _) = mpsc::channel(); @@ -2155,7 +2155,7 @@ mod tests { // TODO: maybe a bit more extensive testing, all I have time for right now fn test_seq_count_increment() { let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); - let tm_pool = Box::new(LocalPool::new(pool_cfg.clone())); + let tm_pool = Box::new(StaticMemoryPool::new(pool_cfg.clone())); let shared_tm_store = SharedTmStore::new(tm_pool); let shared_tm_pool = shared_tm_store.clone_backing_pool(); let (verif_tx, verif_rx) = mpsc::channel(); diff --git a/satrs-core/tests/pools.rs b/satrs-core/tests/pools.rs index 375d624..6a20a0d 100644 --- a/satrs-core/tests/pools.rs +++ b/satrs-core/tests/pools.rs @@ -1,4 +1,4 @@ -use satrs_core::pool::{LocalPool, PoolCfg, PoolGuard, PoolProvider, StoreAddr}; +use satrs_core::pool::{PoolCfg, PoolGuard, PoolProvider, StaticMemoryPool, StoreAddr}; use std::ops::DerefMut; use std::sync::mpsc; use std::sync::mpsc::{Receiver, Sender}; @@ -10,7 +10,7 @@ const DUMMY_DATA: [u8; 4] = [0, 1, 2, 3]; #[test] fn threaded_usage() { let pool_cfg = PoolCfg::new(vec![(16, 6), (32, 3), (8, 12)]); - let shared_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); + let shared_pool = Arc::new(RwLock::new(StaticMemoryPool::new(pool_cfg))); let shared_clone = shared_pool.clone(); let (tx, rx): (Sender, Receiver) = mpsc::channel(); let jh0 = thread::spawn(move || { diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 569a46d..6efced4 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -29,7 +29,7 @@ use satrs_core::event_man::{ }; use satrs_core::events::EventU32; use satrs_core::hk::HkRequest; -use satrs_core::pool::{LocalPool, PoolCfg}; +use satrs_core::pool::{PoolCfg, StaticMemoryPool}; use satrs_core::pus::event_man::{ DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, PusEventDispatcher, @@ -68,7 +68,7 @@ use std::time::Duration; fn main() { setup_logger().expect("setting up logging with fern failed"); println!("Running OBSW example"); - let tm_pool = LocalPool::new(PoolCfg::new(vec![ + let tm_pool = StaticMemoryPool::new(PoolCfg::new(vec![ (30, 32), (15, 64), (15, 128), @@ -78,7 +78,7 @@ fn main() { ])); let shared_tm_store = SharedTmStore::new(Box::new(tm_pool)); let tm_store_event = shared_tm_store.clone(); - let tc_pool = LocalPool::new(PoolCfg::new(vec![ + let tc_pool = StaticMemoryPool::new(PoolCfg::new(vec![ (30, 32), (15, 64), (15, 128), diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index 36c622e..7d679cf 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -1,11 +1,11 @@ use crate::tmtc::PusTcSource; use log::{error, info, warn}; -use satrs_core::pus::scheduler::TcInfo; +use satrs_core::pus::scheduler::{PusScheduler, TcInfo}; use satrs_core::pus::scheduler_srv::PusService11SchedHandler; use satrs_core::pus::{EcssTcInStoreConverter, PusPacketHandlerResult}; pub struct Pus11Wrapper { - pub pus_11_handler: PusService11SchedHandler, + pub pus_11_handler: PusService11SchedHandler, pub tc_source_wrapper: PusTcSource, }