diff --git a/satrs-core/src/pus/scheduler.rs b/satrs-core/src/pus/scheduler.rs index 182f08e..d610adf 100644 --- a/satrs-core/src/pus/scheduler.rs +++ b/satrs-core/src/pus/scheduler.rs @@ -610,16 +610,17 @@ pub mod alloc_mod { /// Utility method which calls [Self::telecommands_to_release] and then calls a releaser /// closure for each telecommand which should be released. This function will also delete - /// the telecommands from the holding store after calling the release closure, if the scheduler - /// is disabled. + /// the telecommands from the holding store after calling the release closure if the user + /// returns [true] from the release closure. /// /// # Arguments /// /// * `releaser` - Closure where the first argument is whether the scheduler is enabled and - /// the second argument is the telecommand information also containing the store address. - /// This closure should return whether the command should be deleted if the scheduler is - /// disabled to prevent memory leaks. - /// * `store` - The holding store of the telecommands. + /// the second argument is the telecommand information also containing the store + /// address. This closure should return whether the command should be deleted. Please + /// note that returning false might lead to memory leaks if the TC is not cleared from + /// the store in some other way. + /// * `tc_store` - The holding store of the telecommands. pub fn release_telecommands bool>( &mut self, mut releaser: R, @@ -633,7 +634,7 @@ pub mod alloc_mod { let tc = tc_store.read(&info.addr).map_err(|e| (released_tcs, e))?; let should_delete = releaser(self.enabled, info, tc); released_tcs += 1; - if should_delete && !self.is_enabled() { + if should_delete { let res = tc_store.delete(info.addr); if res.is_err() { store_error = res; @@ -647,6 +648,32 @@ pub mod alloc_mod { .map_err(|e| (released_tcs, e)) } + /// This utility method is similar to [Self::release_telecommands] but will not perform + /// store deletions and thus does not require a mutable reference of the TC store. + /// + /// It will returns a [Vec] of [TcInfo]s to transfer the list of released + /// telecommands to the user. The user should take care of deleting those telecommands + /// from the holding store to prevent memory leaks. + pub fn release_telecommands_no_deletion( + &mut self, + mut releaser: R, + tc_store: &(impl PoolProviderMemInPlace + ?Sized), + ) -> Result, (Vec, StoreError)> { + let tcs_to_release = self.telecommands_to_release(); + let mut released_tcs = Vec::new(); + for tc in tcs_to_release { + for info in tc.1 { + let tc = tc_store + .read(&info.addr) + .map_err(|e| (released_tcs.clone(), e))?; + releaser(self.is_enabled(), info, tc); + released_tcs.push(*info); + } + } + self.tc_map.retain(|k, _| k > &self.current_time); + Ok(released_tcs) + } + /// Retrieve all telecommands which should be release based on the current time. pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec> { self.tc_map.range(..=self.current_time) diff --git a/satrs-core/src/pus/scheduler_srv.rs b/satrs-core/src/pus/scheduler_srv.rs index 5148aff..7ff9c83 100644 --- a/satrs-core/src/pus/scheduler_srv.rs +++ b/satrs-core/src/pus/scheduler_srv.rs @@ -15,28 +15,18 @@ use spacepackets::time::cds::TimeProvider; /// telecommands when applicable. pub struct PusService11SchedHandler< TcInMemConverter: EcssTcInMemConverter, - MemPool: PoolProviderMemInPlace, Scheduler: PusSchedulerInterface, > { pub service_helper: PusServiceHelper, - pub sched_tc_pool: MemPool, scheduler: Scheduler, } -impl< - TcInMemConverter: EcssTcInMemConverter, - MemPool: PoolProviderMemInPlace, - Scheduler: PusSchedulerInterface, - > PusService11SchedHandler +impl + PusService11SchedHandler { - pub fn new( - service_helper: PusServiceHelper, - sched_tc_pool: MemPool, - scheduler: Scheduler, - ) -> Self { + pub fn new(service_helper: PusServiceHelper, scheduler: Scheduler) -> Self { Self { service_helper, - sched_tc_pool, scheduler, } } @@ -49,7 +39,10 @@ impl< &self.scheduler } - pub fn handle_one_tc(&mut self) -> Result { + pub fn handle_one_tc( + &mut self, + sched_tc_pool: &mut (impl PoolProviderMemInPlace + ?Sized), + ) -> Result { let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?; if possible_packet.is_none() { return Ok(PusPacketHandlerResult::Empty); @@ -124,7 +117,7 @@ impl< // let mut pool = self.shared_tc_store.write().expect("Locking pool failed"); self.scheduler - .reset(&mut self.sched_tc_pool) + .reset(sched_tc_pool) .expect("Error resetting TC Pool"); self.service_helper @@ -145,7 +138,7 @@ impl< // let mut pool = self.sched_tc_pool.write().expect("locking pool failed"); self.scheduler - .insert_wrapped_tc::(&tc, &mut self.sched_tc_pool) + .insert_wrapped_tc::(&tc, sched_tc_pool) .expect("insertion of activity into pool failed"); self.service_helper @@ -175,7 +168,6 @@ impl< #[cfg(test)] mod tests { use crate::pool::StaticMemoryPool; - use crate::pus::scheduler::RequestId as RequestIdSched; use crate::{ events::EventU32, pus::{ diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 8e7a123..ed1ef32 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -89,7 +89,7 @@ fn main() { let tc_store = TcStore { pool: Arc::new(RwLock::new(tc_pool)), }; - let tc_sched_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ + let sched_tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ (30, 32), (15, 64), (15, 128), @@ -220,11 +220,11 @@ fn main() { verif_reporter.clone(), EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), ), - tc_sched_pool, scheduler, ); let mut pus_11_wrapper = Pus11Wrapper { pus_11_handler, + sched_tc_pool, tc_source_wrapper, }; diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index ea5cbdd..75f3494 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -1,30 +1,30 @@ use crate::tmtc::PusTcSource; use log::{error, info, warn}; use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool}; -use satrs_core::pus::scheduler::{PusScheduler, PusSchedulerInterface, TcInfo}; +use satrs_core::pus::scheduler::{PusScheduler, TcInfo}; use satrs_core::pus::scheduler_srv::PusService11SchedHandler; use satrs_core::pus::{EcssTcInSharedStoreConverter, PusPacketHandlerResult}; pub struct Pus11Wrapper { - pub pus_11_handler: - PusService11SchedHandler, + pub pus_11_handler: PusService11SchedHandler, + pub sched_tc_pool: StaticMemoryPool, pub tc_source_wrapper: PusTcSource, } impl Pus11Wrapper { pub fn release_tcs(&mut self) { let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool { - // Transfer TC from scheduler TC pool to shared TC pool. - let released_tc_addr = self - .tc_source_wrapper - .tc_store - .pool - .write() - .expect("locking pool failed") - .add(tc) - .expect("adding TC to shared pool failed"); - if enabled { + // Transfer TC from scheduler TC pool to shared TC pool. + let released_tc_addr = self + .tc_source_wrapper + .tc_store + .pool + .write() + .expect("locking pool failed") + .add(tc) + .expect("adding TC to shared pool failed"); + self.tc_source_wrapper .tc_source .send(released_tc_addr) @@ -37,40 +37,18 @@ impl Pus11Wrapper { .scheduler_mut() .update_time_from_now() .unwrap(); - let sched_is_enabled = self.pus_11_handler.scheduler().is_enabled(); - - // We have to implement some boilerplate ourself, because the borrow checker falls over - // multiple borrows of the same object. - let tcs_to_release = self.pus_11_handler.scheduler().telecommands_to_release(); - let mut released_tcs = 0; - let mut tcs_to_delete = Vec::new(); - for tc in tcs_to_release { - for info in tc.1 { - let tc = self - .pus_11_handler - .sched_tc_pool - .read(&info.addr()) - .expect("reading pool failed"); - let should_delete = releaser(sched_is_enabled, info, tc); - released_tcs += 1; - if should_delete && !sched_is_enabled { - tcs_to_delete.push(info.addr()); - } - } - } - for addr in tcs_to_delete { - self.pus_11_handler - .sched_tc_pool - .delete(addr) - .expect("deleting TC from pool failed"); - } + let released_tcs = self + .pus_11_handler + .scheduler_mut() + .release_telecommands(releaser, &mut self.sched_tc_pool) + .expect("releasing TCs failed"); if released_tcs > 0 { info!("{released_tcs} TC(s) released from scheduler"); } } pub fn handle_next_packet(&mut self) -> bool { - match self.pus_11_handler.handle_one_tc() { + match self.pus_11_handler.handle_one_tc(&mut self.sched_tc_pool) { Ok(result) => match result { PusPacketHandlerResult::RequestHandled => {} PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {