diff --git a/satrs-core/src/pool.rs b/satrs-core/src/pool.rs index 87afadf..4c04a9a 100644 --- a/satrs-core/src/pool.rs +++ b/satrs-core/src/pool.rs @@ -186,7 +186,7 @@ impl Error for StoreError { } } -pub trait PoolProvider { +pub trait MemPoolProvider { /// Add new data to the pool. The provider should attempt to reserve a memory block with the /// appropriate size and then copy the given data to the block. Yields a [StoreAddr] which can /// be used to access the data stored in the pool @@ -219,9 +219,8 @@ pub trait PoolProvider { #[cfg(feature = "alloc")] mod alloc_mod { - use super::{PoolProvider, StaticPoolAddr}; + use super::{MemPoolProvider, StaticPoolAddr}; use crate::pool::{NumBlocks, StoreAddr, StoreError, StoreIdError}; - use alloc::boxed::Box; use alloc::vec; use alloc::vec::Vec; use delegate::delegate; @@ -229,9 +228,7 @@ mod alloc_mod { use std::sync::{Arc, RwLock}; #[cfg(feature = "std")] - pub type ShareablePoolProvider = Box; - #[cfg(feature = "std")] - pub type SharedPool = Arc>; + pub type SharedStaticMemoryPool = Arc>; type PoolSize = usize; const STORE_FREE: PoolSize = PoolSize::MAX; @@ -266,16 +263,16 @@ mod alloc_mod { } } - pub struct PoolGuard<'a> { - pool: &'a mut StaticMemoryPool, + pub struct PoolGuard<'a, MemProvider: MemPoolProvider> { + pool: &'a mut MemProvider, pub addr: StoreAddr, no_deletion: bool, deletion_failed_error: Option, } /// This helper object - impl<'a> PoolGuard<'a> { - pub fn new(pool: &'a mut StaticMemoryPool, addr: StoreAddr) -> Self { + impl<'a, MemProvider: MemPoolProvider> PoolGuard<'a, MemProvider> { + pub fn new(pool: &'a mut MemProvider, addr: StoreAddr) -> Self { Self { pool, addr, @@ -295,7 +292,7 @@ mod alloc_mod { } } - impl Drop for PoolGuard<'_> { + impl Drop for PoolGuard<'_, MemProvider> { fn drop(&mut self) { if !self.no_deletion { if let Err(e) = self.pool.delete(self.addr) { @@ -305,12 +302,12 @@ mod alloc_mod { } } - pub struct PoolRwGuard<'a> { - guard: PoolGuard<'a>, + pub struct PoolRwGuard<'a, MemProvider: MemPoolProvider> { + guard: PoolGuard<'a, MemProvider>, } - impl<'a> PoolRwGuard<'a> { - pub fn new(pool: &'a mut StaticMemoryPool, addr: StoreAddr) -> Self { + impl<'a, MemProvider: MemPoolProvider> PoolRwGuard<'a, MemProvider> { + pub fn new(pool: &'a mut MemProvider, addr: StoreAddr) -> Self { Self { guard: PoolGuard::new(pool, addr), } @@ -330,7 +327,8 @@ mod alloc_mod { ); } - pub trait PoolProviderWithGuards: PoolProvider { + pub trait MemPoolProviderWithGuards: MemPoolProvider { + type MemProvider: MemPoolProvider; /// This function behaves like [PoolProvider::read], but consumes the provided address /// and returns a RAII conformant guard object. /// @@ -339,7 +337,7 @@ mod alloc_mod { /// This can prevent memory leaks. Users can read the data and release the guard /// if the data in the store is valid for further processing. If the data is faulty, no /// manual deletion is necessary when returning from a processing function prematurely. - fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard; + fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard; /// This function behaves like [PoolProvider::modify], but consumes the provided address /// and returns a RAII conformant guard object. @@ -349,7 +347,7 @@ mod alloc_mod { /// This can prevent memory leaks. Users can read (and modify) the data and release the guard /// if the data in the store is valid for further processing. If the data is faulty, no /// manual deletion is necessary when returning from a processing function prematurely. - fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard; + fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard; } /// Pool implementation providing sub-pools with fixed size memory blocks. @@ -474,7 +472,7 @@ mod alloc_mod { } } - impl PoolProvider for StaticMemoryPool { + impl MemPoolProvider for StaticMemoryPool { fn add(&mut self, data: &[u8]) -> Result { let data_len = data.len(); if data_len > POOL_MAX_SIZE { @@ -539,12 +537,13 @@ mod alloc_mod { Ok(true) } } - impl PoolProviderWithGuards for StaticMemoryPool { - fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard { + impl MemPoolProviderWithGuards for StaticMemoryPool { + type MemProvider = StaticMemoryPool; + fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard { PoolRwGuard::new(self, addr) } - fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard { + fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard { PoolGuard::new(self, addr) } } @@ -553,7 +552,7 @@ mod alloc_mod { #[cfg(test)] mod tests { use crate::pool::{ - PoolGuard, PoolProvider, PoolProviderWithGuards, PoolRwGuard, StaticMemoryPool, + MemPoolProvider, MemPoolProviderWithGuards, PoolGuard, PoolRwGuard, StaticMemoryPool, StaticPoolAddr, StaticPoolConfig, StoreError, StoreIdError, POOL_MAX_SIZE, }; use std::vec; diff --git a/satrs-core/src/pus/event_srv.rs b/satrs-core/src/pus/event_srv.rs index 3099830..fd5af22 100644 --- a/satrs-core/src/pus/event_srv.rs +++ b/satrs-core/src/pus/event_srv.rs @@ -132,9 +132,9 @@ mod tests { events::EventU32, pus::{ event_man::EventRequestWithToken, - tests::{PusServiceHandlerWithStoreCommon, PusTestHarness, TEST_APID}, + tests::{PusServiceHandlerWithSharedStoreCommon, PusTestHarness, TEST_APID}, verification::{TcStateAccepted, VerificationToken}, - EcssTcInStoreConverter, PusPacketHandlerResult, PusPacketHandlingError, + EcssTcInSharedStoreConverter, PusPacketHandlerResult, PusPacketHandlingError, }, }; @@ -143,13 +143,13 @@ mod tests { const TEST_EVENT_0: EventU32 = EventU32::const_new(crate::events::Severity::INFO, 5, 25); struct Pus5HandlerWithStoreTester { - common: PusServiceHandlerWithStoreCommon, - handler: PusService5EventHandler, + common: PusServiceHandlerWithSharedStoreCommon, + handler: PusService5EventHandler, } impl Pus5HandlerWithStoreTester { pub fn new(event_request_tx: Sender) -> Self { - let (common, srv_handler) = PusServiceHandlerWithStoreCommon::new(); + let (common, srv_handler) = PusServiceHandlerWithSharedStoreCommon::new(); Self { common, handler: PusService5EventHandler::new(srv_handler, event_request_tx), diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index 084d4e8..0c6e24b 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -369,7 +369,7 @@ mod alloc_mod { #[cfg(feature = "std")] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] pub mod std_mod { - use crate::pool::{SharedPool, StoreAddr}; + use crate::pool::{MemPoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr}; use crate::pus::verification::{ StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; @@ -748,13 +748,13 @@ pub mod std_mod { /// Converter structure for PUS telecommands which are stored inside a [SharedPool] structure. /// This is useful if run-time allocation for these packets should be avoided. Please note /// that this structure is not able to convert TCs which are stored as a `Vec`. - pub struct EcssTcInStoreConverter { - shared_tc_store: SharedPool, + pub struct EcssTcInSharedStoreConverter { + shared_tc_store: SharedStaticMemoryPool, pus_buf: Vec, } - impl EcssTcInStoreConverter { - pub fn new(shared_tc_store: SharedPool, max_expected_tc_size: usize) -> Self { + impl EcssTcInSharedStoreConverter { + pub fn new(shared_tc_store: SharedStaticMemoryPool, max_expected_tc_size: usize) -> Self { Self { shared_tc_store, pus_buf: alloc::vec![0; max_expected_tc_size], @@ -777,7 +777,7 @@ pub mod std_mod { } } - impl EcssTcInMemConverter for EcssTcInStoreConverter { + impl EcssTcInMemConverter for EcssTcInSharedStoreConverter { fn cache_ecss_tc_in_memory( &mut self, tc_in_memory: &TcInMemory, @@ -924,7 +924,9 @@ pub mod tests { use spacepackets::ecss::{PusPacket, WritablePusPacket}; use spacepackets::CcsdsPacket; - use crate::pool::{SharedPool, StaticMemoryPool, StaticPoolConfig, StoreAddr}; + use crate::pool::{ + MemPoolProvider, SharedStaticMemoryPool, StaticMemoryPool, StaticPoolConfig, StoreAddr, + }; use crate::pus::verification::RequestId; use crate::tmtc::tm_helper::SharedTmStore; @@ -932,7 +934,7 @@ pub mod tests { TcStateAccepted, VerificationReporterCfg, VerificationReporterWithSender, VerificationToken, }; use super::{ - EcssTcAndToken, EcssTcInStoreConverter, EcssTcInVecConverter, MpscTcReceiver, + EcssTcAndToken, EcssTcInSharedStoreConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, MpscTmInStoreSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceHelper, TcInMemory, }; @@ -971,27 +973,27 @@ pub mod tests { } /// Common fields for a PUS service test harness. - pub struct PusServiceHandlerWithStoreCommon { + pub struct PusServiceHandlerWithSharedStoreCommon { pus_buf: [u8; 2048], tm_buf: [u8; 2048], - tc_pool: SharedPool, + tc_pool: SharedStaticMemoryPool, tm_pool: SharedTmStore, tc_sender: mpsc::Sender, tm_receiver: mpsc::Receiver, verification_handler: VerificationReporterWithSender, } - impl PusServiceHandlerWithStoreCommon { + impl PusServiceHandlerWithSharedStoreCommon { /// This function generates the structure in addition to the PUS service handler /// [PusServiceHandler] which might be required for a specific PUS service handler. /// /// The PUS service handler is instantiated with a [EcssTcInStoreConverter]. - pub fn new() -> (Self, PusServiceHelper) { + pub fn new() -> (Self, PusServiceHelper) { let pool_cfg = StaticPoolConfig::new(vec![(16, 16), (8, 32), (4, 64)]); let tc_pool = StaticMemoryPool::new(pool_cfg.clone()); let tm_pool = StaticMemoryPool::new(pool_cfg); - let shared_tc_pool = SharedPool::new(RwLock::new(Box::new(tc_pool))); - let shared_tm_pool = SharedTmStore::new(Box::new(tm_pool)); + let shared_tc_pool = SharedStaticMemoryPool::new(RwLock::new(tc_pool)); + let shared_tm_pool = SharedTmStore::new(tm_pool); let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel(); let (tm_tx, tm_rx) = mpsc::channel(); @@ -1003,7 +1005,8 @@ pub mod tests { let test_srv_tm_sender = MpscTmInStoreSender::new(0, "TEST_SENDER", shared_tm_pool.clone(), tm_tx); let test_srv_tc_receiver = MpscTcReceiver::new(0, "TEST_RECEIVER", test_srv_tc_rx); - let in_store_converter = EcssTcInStoreConverter::new(shared_tc_pool.clone(), 2048); + let in_store_converter = + EcssTcInSharedStoreConverter::new(shared_tc_pool.clone(), 2048); ( Self { pus_buf: [0; 2048], @@ -1044,7 +1047,7 @@ pub mod tests { let next_msg = self.tm_receiver.try_recv(); assert!(next_msg.is_ok()); let tm_addr = next_msg.unwrap(); - let tm_pool = self.tm_pool.pool.read().unwrap(); + let tm_pool = self.tm_pool.shared_pool.read().unwrap(); let tm_raw = tm_pool.read(&tm_addr).unwrap(); self.tm_buf[0..tm_raw.len()].copy_from_slice(tm_raw); PusTmReader::new(&self.tm_buf, 7).unwrap().0 @@ -1062,7 +1065,7 @@ pub mod tests { let next_msg = self.tm_receiver.try_recv(); assert!(next_msg.is_ok()); let tm_addr = next_msg.unwrap(); - let tm_pool = self.tm_pool.pool.read().unwrap(); + let tm_pool = self.tm_pool.shared_pool.read().unwrap(); let tm_raw = tm_pool.read(&tm_addr).unwrap(); let tm = PusTmReader::new(tm_raw, 7).unwrap().0; assert_eq!(PusPacket::service(&tm), 1); diff --git a/satrs-core/src/pus/scheduler.rs b/satrs-core/src/pus/scheduler.rs index 071c5e0..21879b9 100644 --- a/satrs-core/src/pus/scheduler.rs +++ b/satrs-core/src/pus/scheduler.rs @@ -14,7 +14,7 @@ use spacepackets::CcsdsPacket; #[cfg(feature = "std")] use std::error::Error; -use crate::pool::{PoolProvider, StoreError}; +use crate::pool::{MemPoolProvider, StoreError}; #[cfg(feature = "alloc")] pub use alloc_mod::*; @@ -220,7 +220,7 @@ impl Error for ScheduleError {} pub trait PusSchedulerInterface { type TimeProvider: CcsdsTimeProvider + TimeReader; - fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError>; + fn reset(&mut self, store: &mut (impl MemPoolProvider + ?Sized)) -> Result<(), StoreError>; fn is_enabled(&self) -> bool; @@ -243,7 +243,7 @@ pub trait PusSchedulerInterface { fn insert_wrapped_tc( &mut self, pus_tc: &(impl IsPusTelecommand + PusPacket + GenericPusTcSecondaryHeader), - pool: &mut (impl PoolProvider + ?Sized), + pool: &mut (impl MemPoolProvider + ?Sized), ) -> Result { if PusPacket::service(pus_tc) != 11 { return Err(ScheduleError::WrongService); @@ -267,7 +267,7 @@ pub trait PusSchedulerInterface { &mut self, time_stamp: UnixTimestamp, tc: &[u8], - pool: &mut (impl PoolProvider + ?Sized), + pool: &mut (impl MemPoolProvider + ?Sized), ) -> Result { let check_tc = PusTcReader::new(tc)?; if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { @@ -289,7 +289,7 @@ pub trait PusSchedulerInterface { #[cfg(feature = "alloc")] pub mod alloc_mod { use super::*; - use crate::pool::{PoolProvider, StoreAddr, StoreError}; + use crate::pool::{MemPoolProvider, StoreAddr, StoreError}; use alloc::collections::btree_map::{Entry, Range}; use alloc::collections::BTreeMap; use alloc::vec; @@ -409,7 +409,7 @@ pub mod alloc_mod { &mut self, time_stamp: UnixTimestamp, tc: &[u8], - pool: &mut (impl PoolProvider + ?Sized), + pool: &mut (impl MemPoolProvider + ?Sized), ) -> Result { let check_tc = PusTcReader::new(tc)?; if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { @@ -432,7 +432,7 @@ pub mod alloc_mod { pub fn insert_wrapped_tc_cds_short( &mut self, pus_tc: &PusTc, - pool: &mut (impl PoolProvider + ?Sized), + pool: &mut (impl MemPoolProvider + ?Sized), ) -> Result { self.insert_wrapped_tc::(pus_tc, pool) } @@ -442,7 +442,7 @@ pub mod alloc_mod { pub fn insert_wrapped_tc_cds_long( &mut self, pus_tc: &PusTc, - pool: &mut (impl PoolProvider + ?Sized), + pool: &mut (impl MemPoolProvider + ?Sized), ) -> Result { self.insert_wrapped_tc::>(pus_tc, pool) } @@ -458,7 +458,7 @@ pub mod alloc_mod { pub fn delete_by_time_filter( &mut self, time_window: TimeWindow, - pool: &mut (impl PoolProvider + ?Sized), + pool: &mut (impl MemPoolProvider + ?Sized), ) -> Result { let range = self.retrieve_by_time_filter(time_window); let mut del_packets = 0; @@ -488,7 +488,7 @@ pub mod alloc_mod { /// the last deletion will be supplied in addition to the number of deleted commands. pub fn delete_all( &mut self, - pool: &mut (impl PoolProvider + ?Sized), + pool: &mut (impl MemPoolProvider + ?Sized), ) -> Result { self.delete_by_time_filter(TimeWindow::::new_select_all(), pool) } @@ -535,7 +535,7 @@ pub mod alloc_mod { /// called repeatedly. pub fn delete_by_request_id(&mut self, req_id: &RequestId) -> Option { if let DeletionResult::WithoutStoreDeletion(v) = - self.delete_by_request_id_internal(req_id, None::<&mut dyn PoolProvider>) + self.delete_by_request_id_internal(req_id, None::<&mut dyn MemPoolProvider>) { return v; } @@ -546,7 +546,7 @@ pub mod alloc_mod { pub fn delete_by_request_id_and_from_pool( &mut self, req_id: &RequestId, - pool: &mut (impl PoolProvider + ?Sized), + pool: &mut (impl MemPoolProvider + ?Sized), ) -> Result { if let DeletionResult::WithStoreDeletion(v) = self.delete_by_request_id_internal(req_id, Some(pool)) @@ -559,7 +559,7 @@ pub mod alloc_mod { fn delete_by_request_id_internal( &mut self, req_id: &RequestId, - pool: Option<&mut (impl PoolProvider + ?Sized)>, + pool: Option<&mut (impl MemPoolProvider + ?Sized)>, ) -> DeletionResult { let mut idx_found = None; for time_bucket in &mut self.tc_map { @@ -585,10 +585,6 @@ pub mod alloc_mod { DeletionResult::WithStoreDeletion(Ok(false)) } } - /// Retrieve all telecommands which should be release based on the current time. - pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec> { - self.tc_map.range(..=self.current_time) - } #[cfg(feature = "std")] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] @@ -609,17 +605,18 @@ pub mod alloc_mod { /// This closure should return whether the command should be deleted if the scheduler is /// disabled to prevent memory leaks. /// * `store` - The holding store of the telecommands. - pub fn release_telecommands bool>( + pub fn release_telecommands bool>( &mut self, mut releaser: R, - tc_store: &mut (impl PoolProvider + ?Sized), + tc_store: &mut (impl MemPoolProvider + ?Sized), ) -> Result { let tcs_to_release = self.telecommands_to_release(); let mut released_tcs = 0; let mut store_error = Ok(()); for tc in tcs_to_release { for info in tc.1 { - let should_delete = releaser(self.enabled, info); + let tc = tc_store.read(&info.addr).map_err(|e| (released_tcs, e))?; + let should_delete = releaser(self.enabled, info, tc); released_tcs += 1; if should_delete && !self.is_enabled() { let res = tc_store.delete(info.addr); @@ -634,6 +631,11 @@ pub mod alloc_mod { .map(|_| released_tcs) .map_err(|e| (released_tcs, e)) } + + /// Retrieve all telecommands which should be release based on the current time. + pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec> { + self.tc_map.range(..=self.current_time) + } } impl PusSchedulerInterface for PusScheduler { @@ -645,7 +647,7 @@ pub mod alloc_mod { /// The holding store for the telecommands needs to be passed so all the stored telecommands /// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error /// will be returned but the method will still try to delete all the commands in the schedule. - fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError> { + fn reset(&mut self, store: &mut (impl MemPoolProvider + ?Sized)) -> Result<(), StoreError> { self.enabled = false; let mut deletion_ok = Ok(()); for tc_lists in &mut self.tc_map { @@ -703,7 +705,7 @@ pub mod alloc_mod { mod tests { use super::*; use crate::pool::{ - PoolProvider, StaticMemoryPool, StaticPoolAddr, StaticPoolConfig, StoreAddr, StoreError, + MemPoolProvider, StaticMemoryPool, StaticPoolAddr, StaticPoolConfig, StoreAddr, StoreError, }; use alloc::collections::btree_map::Range; use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader}; @@ -968,7 +970,7 @@ mod tests { .expect("insertion failed"); let mut i = 0; - let mut test_closure_1 = |boolvar: bool, tc_info: &TcInfo| { + let mut test_closure_1 = |boolvar: bool, tc_info: &TcInfo, tc: &[u8]| { common_check(boolvar, &tc_info.addr, vec![tc_info_0.addr()], &mut i); true }; @@ -990,7 +992,7 @@ mod tests { assert!(pool.has_element_at(&tc_info_0.addr()).unwrap()); // test 3, late timestamp, release 1 overdue tc - let mut test_closure_2 = |boolvar: bool, tc_info: &TcInfo| { + let mut test_closure_2 = |boolvar: bool, tc_info: &TcInfo, tc: &[u8]| { common_check(boolvar, &tc_info.addr, vec![tc_info_1.addr()], &mut i); true }; @@ -1031,7 +1033,7 @@ mod tests { .expect("insertion failed"); let mut i = 0; - let mut test_closure = |boolvar: bool, store_addr: &TcInfo| { + let mut test_closure = |boolvar: bool, store_addr: &TcInfo, tc: &[u8]| { common_check( boolvar, &store_addr.addr, @@ -1090,7 +1092,7 @@ mod tests { .expect("insertion failed"); let mut i = 0; - let mut test_closure_1 = |boolvar: bool, tc_info: &TcInfo| { + let mut test_closure_1 = |boolvar: bool, tc_info: &TcInfo, tc: &[u8]| { common_check_disabled(boolvar, &tc_info.addr, vec![tc_info_0.addr()], &mut i); true }; @@ -1112,7 +1114,7 @@ mod tests { assert!(!pool.has_element_at(&tc_info_0.addr()).unwrap()); // test 3, late timestamp, release 1 overdue tc - let mut test_closure_2 = |boolvar: bool, tc_info: &TcInfo| { + let mut test_closure_2 = |boolvar: bool, tc_info: &TcInfo, tc: &[u8]| { common_check_disabled(boolvar, &tc_info.addr, vec![tc_info_1.addr()], &mut i); true }; @@ -1154,7 +1156,7 @@ mod tests { assert!(pool.has_element_at(&tc_info_0.addr()).unwrap()); let data = pool.read(&tc_info_0.addr()).unwrap(); - let check_tc = PusTcReader::new(&data).expect("incorrect Pus tc raw data"); + let check_tc = PusTcReader::new(data).expect("incorrect Pus tc raw data"); assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); assert_eq!(scheduler.num_scheduled_telecommands(), 1); @@ -1164,7 +1166,7 @@ mod tests { let mut addr_vec = Vec::new(); let mut i = 0; - let mut test_closure = |boolvar: bool, tc_info: &TcInfo| { + let mut test_closure = |boolvar: bool, tc_info: &TcInfo, _tc: &[u8]| { common_check(boolvar, &tc_info.addr, vec![info.addr], &mut i); // check that tc remains unchanged addr_vec.push(tc_info.addr); @@ -1176,7 +1178,7 @@ mod tests { .unwrap(); let data = pool.read(&addr_vec[0]).unwrap(); - let check_tc = PusTcReader::new(&data).expect("incorrect Pus tc raw data"); + let check_tc = PusTcReader::new(data).expect("incorrect Pus tc raw data"); assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); } @@ -1210,7 +1212,7 @@ mod tests { let mut addr_vec = Vec::new(); let mut i = 0; - let mut test_closure = |boolvar: bool, tc_info: &TcInfo| { + let mut test_closure = |boolvar: bool, tc_info: &TcInfo, tc: &[u8]| { common_check(boolvar, &tc_info.addr, vec![info.addr], &mut i); // check that tc remains unchanged addr_vec.push(tc_info.addr); @@ -1357,7 +1359,7 @@ mod tests { .expect("insertion failed"); let mut i = 0; - let test_closure_1 = |boolvar: bool, tc_info: &TcInfo| { + let test_closure_1 = |boolvar: bool, tc_info: &TcInfo, _tc: &[u8]| { common_check_disabled(boolvar, &tc_info.addr, vec![tc_info_0.addr()], &mut i); true }; @@ -1370,7 +1372,8 @@ mod tests { let release_res = scheduler.release_telecommands(test_closure_1, &mut pool); assert!(release_res.is_err()); let err = release_res.unwrap_err(); - assert_eq!(err.0, 1); + // TC could not even be read.. + assert_eq!(err.0, 0); match err.1 { StoreError::DataDoesNotExist(addr) => { assert_eq!(tc_info_0.addr(), addr); diff --git a/satrs-core/src/pus/scheduler_srv.rs b/satrs-core/src/pus/scheduler_srv.rs index 18d4be3..132c6aa 100644 --- a/satrs-core/src/pus/scheduler_srv.rs +++ b/satrs-core/src/pus/scheduler_srv.rs @@ -1,6 +1,6 @@ use super::scheduler::PusSchedulerInterface; use super::{EcssTcInMemConverter, PusServiceBase, PusServiceHelper}; -use crate::pool::SharedPool; +use crate::pool::MemPoolProvider; use crate::pus::{PusPacketHandlerResult, PusPacketHandlingError}; use spacepackets::ecss::{scheduling, PusPacket}; use spacepackets::time::cds::TimeProvider; @@ -15,24 +15,28 @@ use spacepackets::time::cds::TimeProvider; /// telecommands when applicable. pub struct PusService11SchedHandler< TcInMemConverter: EcssTcInMemConverter, + MemPool: MemPoolProvider, Scheduler: PusSchedulerInterface, > { pub service_helper: PusServiceHelper, - shared_tc_store: SharedPool, + pub sched_tc_pool: MemPool, scheduler: Scheduler, } -impl - PusService11SchedHandler +impl< + TcInMemConverter: EcssTcInMemConverter, + MemPool: MemPoolProvider, + Scheduler: PusSchedulerInterface, + > PusService11SchedHandler { pub fn new( service_helper: PusServiceHelper, - shared_tc_store: SharedPool, + sched_tc_pool: MemPool, scheduler: Scheduler, ) -> Self { Self { service_helper, - shared_tc_store, + sched_tc_pool, scheduler, } } @@ -117,10 +121,10 @@ impl .start_success(ecss_tc_and_token.token, Some(&time_stamp)) .expect("Error sending start success"); - let mut pool = self.shared_tc_store.write().expect("Locking pool failed"); + // let mut pool = self.shared_tc_store.write().expect("Locking pool failed"); self.scheduler - .reset(pool.as_mut()) + .reset(&mut self.sched_tc_pool) .expect("Error resetting TC Pool"); self.service_helper @@ -139,9 +143,9 @@ impl .start_success(ecss_tc_and_token.token, Some(&time_stamp)) .expect("error sending start success"); - let mut pool = self.shared_tc_store.write().expect("locking pool failed"); + // let mut pool = self.sched_tc_pool.write().expect("locking pool failed"); self.scheduler - .insert_wrapped_tc::(&tc, pool.as_mut()) + .insert_wrapped_tc::(&tc, &mut self.sched_tc_pool) .expect("insertion of activity into pool failed"); self.service_helper @@ -170,15 +174,15 @@ impl #[cfg(test)] mod tests { + use crate::pool::StaticMemoryPool; use crate::pus::scheduler::RequestId as RequestIdSched; use crate::{ events::EventU32, - pool::SharedPool, pus::{ scheduler::{PusScheduler, PusSchedulerInterface, TcInfo}, - tests::{PusServiceHandlerWithStoreCommon, PusTestHarness}, + tests::{PusServiceHandlerWithSharedStoreCommon, PusTestHarness}, verification::{RequestId, TcStateAccepted, VerificationToken}, - EcssTcInStoreConverter, PusPacketHandlerResult, PusPacketHandlingError, + EcssTcInSharedStoreConverter, PusPacketHandlerResult, PusPacketHandlingError, }, }; use alloc::collections::VecDeque; @@ -193,16 +197,17 @@ mod tests { const TEST_EVENT_0: EventU32 = EventU32::const_new(crate::events::Severity::INFO, 5, 25); struct Pus11HandlerWithStoreTester { - common: PusServiceHandlerWithStoreCommon, - handler: PusService11SchedHandler, + common: PusServiceHandlerWithSharedStoreCommon, + handler: + PusService11SchedHandler, } impl Pus11HandlerWithStoreTester { - pub fn new(shared_tc_store: SharedPool, scheduler: PusScheduler) -> Self { - let (common, srv_handler) = PusServiceHandlerWithStoreCommon::new(); + pub fn new(sched_tc_pool: StaticMemoryPool, scheduler: PusScheduler) -> Self { + let (common, srv_handler) = PusServiceHandlerWithSharedStoreCommon::new(); Self { common, - handler: PusService11SchedHandler::new(srv_handler, shared_tc_store, scheduler), + handler: PusService11SchedHandler::new(srv_handler, sched_tc_pool, scheduler), } } } @@ -236,7 +241,7 @@ mod tests { fn reset( &mut self, - store: &mut (impl crate::pool::PoolProvider + ?Sized), + store: &mut (impl crate::pool::MemPoolProvider + ?Sized), ) -> Result<(), crate::pool::StoreError> { self.reset_count += 1; Ok(()) diff --git a/satrs-core/src/pus/test.rs b/satrs-core/src/pus/test.rs index a469e2d..9ca7d27 100644 --- a/satrs-core/src/pus/test.rs +++ b/satrs-core/src/pus/test.rs @@ -92,12 +92,13 @@ impl PusService17TestHandler, + common: PusServiceHandlerWithSharedStoreCommon, + handler: PusService17TestHandler, } impl Pus17HandlerWithStoreTester { pub fn new() -> Self { - let (common, srv_handler) = PusServiceHandlerWithStoreCommon::new(); + let (common, srv_handler) = PusServiceHandlerWithSharedStoreCommon::new(); let pus_17_handler = PusService17TestHandler::new(srv_handler); Self { common, diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index f60cd16..73a85ce 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -1325,7 +1325,7 @@ mod std_mod { #[cfg(test)] mod tests { - use crate::pool::{StaticMemoryPool, StaticPoolConfig}; + use crate::pool::{MemPoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig}; use crate::pus::tests::CommonTmInfo; use crate::pus::verification::{ EcssTmSenderCore, EcssTmtcError, FailParams, FailParamsWithStep, RequestId, TcStateNone, @@ -1487,8 +1487,7 @@ mod tests { #[test] fn test_mpsc_verif_send_sync() { let pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(8, 8)])); - let tm_store = Box::new(pool); - let shared_tm_store = SharedTmStore::new(tm_store); + let shared_tm_store = SharedTmStore::new(pool); let (tx, _) = mpsc::channel(); let mpsc_verif_sender = MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store, tx); is_send(&mpsc_verif_sender); @@ -2154,7 +2153,7 @@ mod tests { // TODO: maybe a bit more extensive testing, all I have time for right now fn test_seq_count_increment() { let pool_cfg = StaticPoolConfig::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); - let tm_pool = Box::new(StaticMemoryPool::new(pool_cfg.clone())); + let tm_pool = StaticMemoryPool::new(pool_cfg.clone()); let shared_tm_store = SharedTmStore::new(tm_pool); let shared_tm_pool = shared_tm_store.clone_backing_pool(); let (verif_tx, verif_rx) = mpsc::channel(); diff --git a/satrs-core/src/tmtc/tm_helper.rs b/satrs-core/src/tmtc/tm_helper.rs index 9743ec3..4a9bf5f 100644 --- a/satrs-core/src/tmtc/tm_helper.rs +++ b/satrs-core/src/tmtc/tm_helper.rs @@ -8,7 +8,7 @@ pub use std_mod::*; #[cfg(feature = "std")] pub mod std_mod { - use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr}; + use crate::pool::{MemPoolProvider, SharedStaticMemoryPool, StaticMemoryPool, StoreAddr}; use crate::pus::EcssTmtcError; use spacepackets::ecss::tm::PusTmCreator; use spacepackets::ecss::WritablePusPacket; @@ -16,22 +16,25 @@ pub mod std_mod { #[derive(Clone)] pub struct SharedTmStore { - pub pool: SharedPool, + pub shared_pool: SharedStaticMemoryPool, } impl SharedTmStore { - pub fn new(backing_pool: ShareablePoolProvider) -> Self { + pub fn new(shared_pool: StaticMemoryPool) -> Self { Self { - pool: Arc::new(RwLock::new(backing_pool)), + shared_pool: Arc::new(RwLock::new(shared_pool)), } } - pub fn clone_backing_pool(&self) -> SharedPool { - self.pool.clone() + pub fn clone_backing_pool(&self) -> SharedStaticMemoryPool { + self.shared_pool.clone() } pub fn add_pus_tm(&self, pus_tm: &PusTmCreator) -> Result { - let mut pg = self.pool.write().map_err(|_| EcssTmtcError::StoreLock)?; + let mut pg = self + .shared_pool + .write() + .map_err(|_| EcssTmtcError::StoreLock)?; let (addr, buf) = pg.free_element(pus_tm.len_written())?; pus_tm .write_to_bytes(buf) diff --git a/satrs-core/tests/pools.rs b/satrs-core/tests/pools.rs index 17cd4eb..7666087 100644 --- a/satrs-core/tests/pools.rs +++ b/satrs-core/tests/pools.rs @@ -1,4 +1,4 @@ -use satrs_core::pool::{PoolGuard, PoolProvider, StaticMemoryPool, StaticPoolConfig, StoreAddr}; +use satrs_core::pool::{MemPoolProvider, PoolGuard, StaticMemoryPool, StaticPoolConfig, StoreAddr}; use std::ops::DerefMut; use std::sync::mpsc; use std::sync::mpsc::{Receiver, Sender}; diff --git a/satrs-core/tests/pus_verification.rs b/satrs-core/tests/pus_verification.rs index 536b040..af57568 100644 --- a/satrs-core/tests/pus_verification.rs +++ b/satrs-core/tests/pus_verification.rs @@ -2,7 +2,7 @@ pub mod crossbeam_test { use hashbrown::HashMap; use satrs_core::pool::{ - PoolProvider, PoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig, + MemPoolProvider, MemPoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig, }; use satrs_core::pus::verification::{ FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender, diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 5189a05..43b5b6e 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -29,7 +29,7 @@ use satrs_core::event_man::{ }; use satrs_core::events::EventU32; use satrs_core::hk::HkRequest; -use satrs_core::pool::{StaticMemoryPool, StaticPoolConfig}; +use satrs_core::pool::{MemPoolProvider, StaticMemoryPool, StaticPoolConfig}; use satrs_core::pus::event_man::{ DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, PusEventDispatcher, @@ -43,7 +43,7 @@ use satrs_core::pus::verification::{ TcStateStarted, VerificationReporterCfg, VerificationReporterWithSender, VerificationToken, }; use satrs_core::pus::{ - EcssTcInStoreConverter, MpscTcReceiver, MpscTmInStoreSender, PusServiceHelper, + EcssTcInSharedStoreConverter, MpscTcReceiver, MpscTmInStoreSender, PusServiceHelper, }; use satrs_core::seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}; use satrs_core::spacepackets::ecss::tm::{PusTmCreator, PusTmZeroCopyWriter}; @@ -76,7 +76,7 @@ fn main() { (15, 1024), (15, 2048), ])); - let shared_tm_store = SharedTmStore::new(Box::new(tm_pool)); + let shared_tm_store = SharedTmStore::new(tm_pool); let tm_store_event = shared_tm_store.clone(); let tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ (30, 32), @@ -87,8 +87,16 @@ fn main() { (15, 2048), ])); let tc_store = TcStore { - pool: Arc::new(RwLock::new(Box::new(tc_pool))), + pool: Arc::new(RwLock::new(tc_pool)), }; + let tc_sched_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ + (30, 32), + (15, 64), + (15, 128), + (15, 256), + (15, 1024), + (15, 2048), + ])); let seq_count_provider = CcsdsSimpleSeqCountProvider::new(); let mut msg_counter_map: HashMap = HashMap::new(); @@ -184,7 +192,7 @@ fn main() { Box::new(test_srv_tm_sender), PUS_APID, verif_reporter.clone(), - EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), + EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), )); let mut pus_17_wrapper = Service17CustomWrapper { pus17_handler, @@ -210,9 +218,9 @@ fn main() { Box::new(sched_srv_tm_sender), PUS_APID, verif_reporter.clone(), - EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), + EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), ), - tc_store.pool.clone(), + tc_sched_pool, scheduler, ); let mut pus_11_wrapper = Pus11Wrapper { @@ -237,7 +245,7 @@ fn main() { Box::new(event_srv_tm_sender), PUS_APID, verif_reporter.clone(), - EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), + EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), ), event_request_tx, ); @@ -259,7 +267,7 @@ fn main() { Box::new(action_srv_tm_sender), PUS_APID, verif_reporter.clone(), - EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), + EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), request_map.clone(), ); let mut pus_8_wrapper = Pus8Wrapper { pus_8_handler }; @@ -277,7 +285,7 @@ fn main() { Box::new(hk_srv_tm_sender), PUS_APID, verif_reporter.clone(), - EcssTcInStoreConverter::new(tc_store.pool.clone(), 2048), + EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), request_map, ); let mut pus_3_wrapper = Pus3Wrapper { pus_3_handler }; diff --git a/satrs-example/src/pus/action.rs b/satrs-example/src/pus/action.rs index 41e6c85..3b8cc8d 100644 --- a/satrs-example/src/pus/action.rs +++ b/satrs-example/src/pus/action.rs @@ -4,7 +4,7 @@ use satrs_core::pus::verification::{ FailParams, TcStateAccepted, VerificationReporterWithSender, VerificationToken, }; use satrs_core::pus::{ - EcssTcInMemConverter, EcssTcInStoreConverter, EcssTcReceiver, EcssTmSender, + EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHelper, }; use satrs_core::spacepackets::ecss::tc::PusTcReader; @@ -142,7 +142,7 @@ impl PusService8ActionHandler, + pub(crate) pus_8_handler: PusService8ActionHandler, } impl Pus8Wrapper { diff --git a/satrs-example/src/pus/event.rs b/satrs-example/src/pus/event.rs index 3dc9dc1..08aa786 100644 --- a/satrs-example/src/pus/event.rs +++ b/satrs-example/src/pus/event.rs @@ -1,9 +1,9 @@ use log::{error, warn}; use satrs_core::pus::event_srv::PusService5EventHandler; -use satrs_core::pus::{EcssTcInStoreConverter, PusPacketHandlerResult}; +use satrs_core::pus::{EcssTcInSharedStoreConverter, PusPacketHandlerResult}; pub struct Pus5Wrapper { - pub pus_5_handler: PusService5EventHandler, + pub pus_5_handler: PusService5EventHandler, } impl Pus5Wrapper { diff --git a/satrs-example/src/pus/hk.rs b/satrs-example/src/pus/hk.rs index 088afc5..ef373c4 100644 --- a/satrs-example/src/pus/hk.rs +++ b/satrs-example/src/pus/hk.rs @@ -3,7 +3,7 @@ use log::{error, warn}; use satrs_core::hk::{CollectionIntervalFactor, HkRequest}; use satrs_core::pus::verification::{FailParams, StdVerifReporterWithSender}; use satrs_core::pus::{ - EcssTcInMemConverter, EcssTcInStoreConverter, EcssTcReceiver, EcssTmSender, + EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHelper, }; use satrs_core::spacepackets::ecss::{hk, PusPacket}; @@ -148,7 +148,7 @@ impl PusService3HkHandler, + pub(crate) pus_3_handler: PusService3HkHandler, } impl Pus3Wrapper { diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index 7d679cf..7676543 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -1,46 +1,72 @@ use crate::tmtc::PusTcSource; use log::{error, info, warn}; -use satrs_core::pus::scheduler::{PusScheduler, TcInfo}; +use satrs_core::pool::{MemPoolProvider, StaticMemoryPool}; +use satrs_core::pus::scheduler::{PusScheduler, PusSchedulerInterface, TcInfo}; use satrs_core::pus::scheduler_srv::PusService11SchedHandler; -use satrs_core::pus::{EcssTcInStoreConverter, PusPacketHandlerResult}; +use satrs_core::pus::{EcssTcInSharedStoreConverter, PusPacketHandlerResult}; pub struct Pus11Wrapper { - pub pus_11_handler: PusService11SchedHandler, + pub pus_11_handler: + PusService11SchedHandler, pub tc_source_wrapper: PusTcSource, } impl Pus11Wrapper { pub fn release_tcs(&mut self) { - let releaser = |enabled: bool, info: &TcInfo| -> bool { + let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool { + // Transfer TC from scheduler TC pool to shared TC pool. + let released_tc_addr = self + .tc_source_wrapper + .tc_store + .pool + .write() + .expect("locking pool failed") + .add(tc) + .expect("adding TC to shared pool failed"); + if enabled { self.tc_source_wrapper .tc_source - .send(info.addr()) + .send(released_tc_addr) .expect("sending TC to TC source failed"); } true }; - let mut pool = self - .tc_source_wrapper - .tc_store - .pool - .write() - .expect("error locking pool"); - self.pus_11_handler .scheduler_mut() .update_time_from_now() .unwrap(); - if let Ok(released_tcs) = self - .pus_11_handler - .scheduler_mut() - .release_telecommands(releaser, pool.as_mut()) - { - if released_tcs > 0 { - info!("{released_tcs} TC(s) released from scheduler"); + let sched_is_enabled = self.pus_11_handler.scheduler().is_enabled(); + + // We have to implement some boilerplate ourself, because the borrow checker falls over + // multiple borrows of the same object. + let tcs_to_release = self.pus_11_handler.scheduler().telecommands_to_release(); + let mut released_tcs = 0; + let mut tcs_to_delete = Vec::new(); + for tc in tcs_to_release { + for info in tc.1 { + let tc = self + .pus_11_handler + .sched_tc_pool + .read(&info.addr()) + .expect("reading pool failed"); + let should_delete = releaser(sched_is_enabled, info, tc); + released_tcs += 1; + if should_delete && !sched_is_enabled { + tcs_to_delete.push(info.addr()); + } } } + for addr in tcs_to_delete { + self.pus_11_handler + .sched_tc_pool + .delete(addr) + .expect("deleting TC from pool failed"); + } + if released_tcs > 0 { + info!("{released_tcs} TC(s) released from scheduler"); + } } pub fn handle_next_packet(&mut self) -> bool { diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index a8d9a44..a52d111 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -7,12 +7,12 @@ use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::spacepackets::time::cds::TimeProvider; use satrs_core::spacepackets::time::TimeWriter; -use satrs_core::{events::EventU32, pus::EcssTcInStoreConverter}; +use satrs_core::{events::EventU32, pus::EcssTcInSharedStoreConverter}; use satrs_example::{tmtc_err, TEST_EVENT}; use std::sync::mpsc::Sender; pub struct Service17CustomWrapper { - pub pus17_handler: PusService17TestHandler, + pub pus17_handler: PusService17TestHandler, pub test_srv_event_sender: Sender<(EventU32, Option)>, } diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 40b2b6b..b96d5f5 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -5,7 +5,7 @@ use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use thiserror::Error; use crate::pus::PusReceiver; -use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; +use satrs_core::pool::{MemPoolProvider, SharedStaticMemoryPool, StoreAddr, StoreError}; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::tmtc::tm_helper::SharedTmStore; @@ -41,7 +41,7 @@ pub enum MpscStoreAndSendError { #[derive(Clone)] pub struct TcStore { - pub pool: SharedPool, + pub pool: SharedStaticMemoryPool, } impl TcStore { diff --git a/satrs-example/src/udp.rs b/satrs-example/src/udp.rs index e3ca9f6..c67c0ef 100644 --- a/satrs-example/src/udp.rs +++ b/satrs-example/src/udp.rs @@ -3,7 +3,7 @@ use std::{net::SocketAddr, sync::mpsc::Receiver}; use log::{info, warn}; use satrs_core::{ hal::std::udp_server::{ReceiveResult, UdpTcServer}, - pool::{SharedPool, StoreAddr}, + pool::{MemPoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr}, tmtc::CcsdsError, }; @@ -12,7 +12,7 @@ use crate::tmtc::MpscStoreAndSendError; pub struct UdpTmtcServer { pub udp_tc_server: UdpTcServer>, pub tm_rx: Receiver, - pub tm_store: SharedPool, + pub tm_store: SharedStaticMemoryPool, } impl UdpTmtcServer { pub fn periodic_operation(&mut self) {