From b5efc0081cb2d1f8ea268f2e36c759cd752004d3 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 9 Feb 2024 16:48:02 +0100 Subject: [PATCH] start refactoring the pool --- satrs-core/src/pool.rs | 208 ++++++++++++++++++--------- satrs-core/src/pus/mod.rs | 5 +- satrs-core/src/pus/scheduler.rs | 157 ++++++++++++-------- satrs-core/src/pus/scheduler_srv.rs | 6 +- satrs-core/src/pus/verification.rs | 2 +- satrs-core/src/tmtc/tm_helper.rs | 13 +- satrs-core/tests/pools.rs | 4 +- satrs-core/tests/pus_verification.rs | 3 +- satrs-example/src/pus/scheduler.rs | 4 +- satrs-example/src/tm_funnel.rs | 4 +- satrs-example/src/tmtc.rs | 2 +- satrs-example/src/udp.rs | 2 +- 12 files changed, 259 insertions(+), 151 deletions(-) diff --git a/satrs-core/src/pool.rs b/satrs-core/src/pool.rs index 0111d18..4f526e4 100644 --- a/satrs-core/src/pool.rs +++ b/satrs-core/src/pool.rs @@ -70,6 +70,7 @@ use core::fmt::{Display, Formatter}; use delegate::delegate; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +use spacepackets::ByteConversionError; #[cfg(feature = "std")] use std::error::Error; @@ -151,6 +152,7 @@ pub enum StoreError { InvalidStoreId(StoreIdError, Option), /// Valid subpool and packet index, but no data is stored at the given address DataDoesNotExist(StoreAddr), + ByteConversionError(spacepackets::ByteConversionError), /// Internal or configuration errors InternalError(u32), } @@ -173,10 +175,19 @@ impl Display for StoreError { StoreError::InternalError(e) => { write!(f, "internal error: {e}") } + StoreError::ByteConversionError(e) => { + write!(f, "store error: {e}") + } } } } +impl From for StoreError { + fn from(value: ByteConversionError) -> Self { + Self::ByteConversionError(value) + } +} + #[cfg(feature = "std")] impl Error for StoreError { fn source(&self) -> Option<&(dyn Error + 'static)> { @@ -189,7 +200,7 @@ impl Error for StoreError { /// Generic trait for pool providers where the data can be modified and read in-place. This /// generally means that a shared pool structure has to be wrapped inside a lock structure. -pub trait PoolProviderMemInPlace { +pub trait PoolProvider { /// Add new data to the pool. The provider should attempt to reserve a memory block with the /// appropriate size and then copy the given data to the block. Yields a [StoreAddr] which can /// be used to access the data stored in the pool @@ -198,29 +209,39 @@ pub trait PoolProviderMemInPlace { /// The provider should attempt to reserve a free memory block with the appropriate size and /// then return a mutable reference to it. Yields a [StoreAddr] which can be used to access /// the data stored in the pool - fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError>; + fn free_element( + &mut self, + len: usize, + writer: &mut W, + ) -> Result; /// Modify data added previously using a given [StoreAddr] by yielding a mutable reference /// to it - fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError>; + fn update( + &mut self, + addr: &StoreAddr, + updater: &mut U, + ) -> Result<(), StoreError>; /// Read data by yielding a read-only reference given a [StoreAddr] - fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError>; + fn read(&self, addr: &StoreAddr, buf: &mut [u8]) -> Result; /// Delete data inside the pool given a [StoreAddr] fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError>; fn has_element_at(&self, addr: &StoreAddr) -> Result; /// Retrieve the length of the data at the given store address. - fn len_of_data(&self, addr: &StoreAddr) -> Result { + fn len_of_data(&self, addr: &StoreAddr) -> Result; + + /*{ if !self.has_element_at(addr)? { return Err(StoreError::DataDoesNotExist(*addr)); } Ok(self.read(addr)?.len()) - } + }*/ } -pub trait PoolProviderMemInPlaceWithGuards: PoolProviderMemInPlace { +pub trait PoolProviderWithGuards: PoolProvider { /// This function behaves like [PoolProviderMemInPlace::read], but consumes the provided address /// and returns a RAII conformant guard object. /// @@ -242,7 +263,7 @@ pub trait PoolProviderMemInPlaceWithGuards: PoolProviderMemInPlace { fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard; } -pub struct PoolGuard<'a, MemProvider: PoolProviderMemInPlace + ?Sized> { +pub struct PoolGuard<'a, MemProvider: PoolProvider + ?Sized> { pool: &'a mut MemProvider, pub addr: StoreAddr, no_deletion: bool, @@ -250,7 +271,7 @@ pub struct PoolGuard<'a, MemProvider: PoolProviderMemInPlace + ?Sized> { } /// This helper object -impl<'a, MemProvider: PoolProviderMemInPlace> PoolGuard<'a, MemProvider> { +impl<'a, MemProvider: PoolProvider> PoolGuard<'a, MemProvider> { pub fn new(pool: &'a mut MemProvider, addr: StoreAddr) -> Self { Self { pool, @@ -260,8 +281,8 @@ impl<'a, MemProvider: PoolProviderMemInPlace> PoolGuard<'a, MemProvider> { } } - pub fn read(&self) -> Result<&[u8], StoreError> { - self.pool.read(&self.addr) + pub fn read(&self, buf: &mut [u8]) -> Result { + self.pool.read(&self.addr, buf) } /// Releasing the pool guard will disable the automatic deletion of the data when the guard @@ -271,7 +292,7 @@ impl<'a, MemProvider: PoolProviderMemInPlace> PoolGuard<'a, MemProvider> { } } -impl Drop for PoolGuard<'_, MemProvider> { +impl Drop for PoolGuard<'_, MemProvider> { fn drop(&mut self) { if !self.no_deletion { if let Err(e) = self.pool.delete(self.addr) { @@ -281,24 +302,24 @@ impl Drop for PoolGuard<'_, MemPro } } -pub struct PoolRwGuard<'a, MemProvider: PoolProviderMemInPlace + ?Sized> { +pub struct PoolRwGuard<'a, MemProvider: PoolProvider + ?Sized> { guard: PoolGuard<'a, MemProvider>, } -impl<'a, MemProvider: PoolProviderMemInPlace> PoolRwGuard<'a, MemProvider> { +impl<'a, MemProvider: PoolProvider> PoolRwGuard<'a, MemProvider> { pub fn new(pool: &'a mut MemProvider, addr: StoreAddr) -> Self { Self { guard: PoolGuard::new(pool, addr), } } - pub fn modify(&mut self) -> Result<&mut [u8], StoreError> { - self.guard.pool.modify(&self.guard.addr) + pub fn update(&mut self, updater: &mut U) -> Result<(), StoreError> { + self.guard.pool.update(&self.guard.addr, updater) } delegate!( to self.guard { - pub fn read(&self) -> Result<&[u8], StoreError>; + pub fn read(&self, buf: &mut [u8]) -> Result; /// Releasing the pool guard will disable the automatic deletion of the data when the guard /// is dropped. pub fn release(&mut self); @@ -308,13 +329,11 @@ impl<'a, MemProvider: PoolProviderMemInPlace> PoolRwGuard<'a, MemProvider> { #[cfg(feature = "alloc")] mod alloc_mod { - use super::{ - PoolGuard, PoolProviderMemInPlace, PoolProviderMemInPlaceWithGuards, PoolRwGuard, - StaticPoolAddr, - }; + use super::{PoolGuard, PoolProvider, PoolProviderWithGuards, PoolRwGuard, StaticPoolAddr}; use crate::pool::{NumBlocks, StoreAddr, StoreError, StoreIdError}; use alloc::vec; use alloc::vec::Vec; + use spacepackets::ByteConversionError; #[cfg(feature = "std")] use std::sync::{Arc, RwLock}; @@ -476,7 +495,7 @@ mod alloc_mod { } } - impl PoolProviderMemInPlace for StaticMemoryPool { + impl PoolProvider for StaticMemoryPool { fn add(&mut self, data: &[u8]) -> Result { let data_len = data.len(); if data_len > POOL_MAX_SIZE { @@ -487,7 +506,11 @@ mod alloc_mod { Ok(addr.into()) } - fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError> { + fn free_element( + &mut self, + len: usize, + writer: &mut W, + ) -> Result { if len > POOL_MAX_SIZE { return Err(StoreError::DataTooLarge(len)); } @@ -495,25 +518,40 @@ mod alloc_mod { let raw_pos = self.raw_pos(&addr).unwrap(); let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + len]; - Ok((addr.into(), block)) + writer(block); + Ok(addr.into()) } - fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError> { + fn update( + &mut self, + addr: &StoreAddr, + updater: &mut U, + ) -> Result<(), StoreError> { let addr = StaticPoolAddr::from(*addr); let curr_size = self.addr_check(&addr)?; let raw_pos = self.raw_pos(&addr).unwrap(); let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap() [raw_pos..raw_pos + curr_size]; - Ok(block) + updater(block); + Ok(()) } - fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError> { + fn read(&self, addr: &StoreAddr, buf: &mut [u8]) -> Result { let addr = StaticPoolAddr::from(*addr); let curr_size = self.addr_check(&addr)?; + if buf.len() < curr_size { + return Err(ByteConversionError::ToSliceTooSmall { + found: buf.len(), + expected: curr_size, + } + .into()); + } let raw_pos = self.raw_pos(&addr).unwrap(); let block = &self.pool.get(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + curr_size]; - Ok(block) + //block.copy_from_slice(&src); + buf[..curr_size].copy_from_slice(block); + Ok(curr_size) } fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError> { @@ -540,9 +578,21 @@ mod alloc_mod { } Ok(true) } + + fn len_of_data(&self, addr: &StoreAddr) -> Result { + let addr = StaticPoolAddr::from(*addr); + self.validate_addr(&addr)?; + let pool_idx = addr.pool_idx as usize; + let size_list = self.sizes_lists.get(pool_idx).unwrap(); + let size = size_list[addr.packet_idx as usize]; + Ok(match size { + STORE_FREE => 0, + _ => size, + }) + } } - impl PoolProviderMemInPlaceWithGuards for StaticMemoryPool { + impl PoolProviderWithGuards for StaticMemoryPool { fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard { PoolRwGuard::new(self, addr) } @@ -556,9 +606,8 @@ mod alloc_mod { #[cfg(test)] mod tests { use crate::pool::{ - PoolGuard, PoolProviderMemInPlace, PoolProviderMemInPlaceWithGuards, PoolRwGuard, - StaticMemoryPool, StaticPoolAddr, StaticPoolConfig, StoreError, StoreIdError, - POOL_MAX_SIZE, + PoolGuard, PoolProvider, PoolProviderWithGuards, PoolRwGuard, StaticMemoryPool, + StaticPoolAddr, StaticPoolConfig, StoreError, StoreIdError, POOL_MAX_SIZE, }; use std::vec; @@ -594,13 +643,14 @@ mod tests { for (i, val) in test_buf.iter_mut().enumerate() { *val = i as u8; } + let mut other_buf: [u8; 16] = [0; 16]; let addr = local_pool.add(&test_buf).expect("Adding data failed"); // Read back data and verify correctness - let res = local_pool.read(&addr); + let res = local_pool.read(&addr, &mut other_buf); assert!(res.is_ok()); - let buf_read_back = res.unwrap(); - assert_eq!(buf_read_back.len(), 16); - for (i, &val) in buf_read_back.iter().enumerate() { + let read_len = res.unwrap(); + assert_eq!(read_len, 16); + for (i, &val) in other_buf.iter().enumerate() { assert_eq!(val, i as u8); } } @@ -610,8 +660,10 @@ mod tests { let mut local_pool = basic_small_pool(); let test_buf: [u8; 12] = [0; 12]; let addr = local_pool.add(&test_buf).expect("Adding data failed"); - let res = local_pool.read(&addr).expect("Read back failed"); - assert_eq!(res.len(), 12); + let res = local_pool + .read(&addr, &mut [0; 12]) + .expect("Read back failed"); + assert_eq!(res, 12); } #[test] @@ -622,10 +674,13 @@ mod tests { // Delete the data let res = local_pool.delete(addr); assert!(res.is_ok()); + let mut writer = |buf: &mut [u8]| { + assert_eq!(buf.len(), 12); + }; // Verify that the slot is free by trying to get a reference to it - let res = local_pool.free_element(12); + let res = local_pool.free_element(12, &mut writer); assert!(res.is_ok()); - let (addr, buf_ref) = res.unwrap(); + let addr = res.unwrap(); assert_eq!( addr, u64::from(StaticPoolAddr { @@ -633,7 +688,6 @@ mod tests { packet_idx: 0 }) ); - assert_eq!(buf_ref.len(), 12); } #[test] @@ -647,29 +701,34 @@ mod tests { { // Verify that the slot is free by trying to get a reference to it - let res = local_pool.modify(&addr).expect("Modifying data failed"); - res[0] = 0; - res[1] = 0x42; + let res = local_pool + .update(&addr, &mut |buf: &mut [u8]| { + buf[0] = 0; + buf[1] = 0x42; + }) + .expect("Modifying data failed"); } - let res = local_pool.read(&addr).expect("Reading back data failed"); - assert_eq!(res[0], 0); - assert_eq!(res[1], 0x42); - assert_eq!(res[2], 2); - assert_eq!(res[3], 3); + let res = local_pool + .read(&addr, &mut test_buf) + .expect("Reading back data failed"); + assert_eq!(test_buf[0], 0); + assert_eq!(test_buf[1], 0x42); + assert_eq!(test_buf[2], 2); + assert_eq!(test_buf[3], 3); } #[test] fn test_consecutive_reservation() { let mut local_pool = basic_small_pool(); // Reserve two smaller blocks consecutively and verify that the third reservation fails - let res = local_pool.free_element(8); + let res = local_pool.free_element(8, &mut |_| {}); assert!(res.is_ok()); - let (addr0, _) = res.unwrap(); - let res = local_pool.free_element(8); + let addr0 = res.unwrap(); + let res = local_pool.free_element(8, &mut |_| {}); assert!(res.is_ok()); - let (addr1, _) = res.unwrap(); - let res = local_pool.free_element(8); + let addr1 = res.unwrap(); + let res = local_pool.free_element(8, &mut |_| {}); assert!(res.is_err()); let err = res.unwrap_err(); assert_eq!(err, StoreError::StoreFull(1)); @@ -689,6 +748,7 @@ mod tests { pool_idx: 0, } .into(), + &mut [], ); assert!(res.is_err()); assert!(matches!( @@ -720,7 +780,7 @@ mod tests { packet_idx: 0, } .into(); - let res = local_pool.read(&addr); + let res = local_pool.read(&addr, &mut []); assert!(res.is_err()); let err = res.unwrap_err(); assert!(matches!( @@ -737,7 +797,7 @@ mod tests { packet_idx: 1, }; assert_eq!(addr.raw(), 0x00020001); - let res = local_pool.read(&addr.into()); + let res = local_pool.read(&addr.into(), &mut []); assert!(res.is_err()); let err = res.unwrap_err(); assert!(matches!( @@ -759,7 +819,7 @@ mod tests { #[test] fn test_data_too_large_1() { let mut local_pool = basic_small_pool(); - let res = local_pool.free_element(POOL_MAX_SIZE + 1); + let res = local_pool.free_element(POOL_MAX_SIZE + 1, &mut |_| {}); assert!(res.is_err()); assert_eq!( res.unwrap_err(), @@ -771,7 +831,7 @@ mod tests { fn test_free_element_too_large() { let mut local_pool = basic_small_pool(); // Try to request a slot which is too large - let res = local_pool.free_element(20); + let res = local_pool.free_element(20, &mut |_| {}); assert!(res.is_err()); assert_eq!(res.unwrap_err(), StoreError::DataTooLarge(20)); } @@ -813,7 +873,7 @@ mod tests { let test_buf: [u8; 16] = [0; 16]; let addr = local_pool.add(&test_buf).expect("Adding data failed"); let mut rw_guard = PoolRwGuard::new(&mut local_pool, addr); - let _ = rw_guard.modify().expect("modify failed"); + let _ = rw_guard.update(&mut |_| {}).expect("modify failed"); drop(rw_guard); assert!(!local_pool.has_element_at(&addr).expect("Invalid address")); } @@ -824,7 +884,7 @@ mod tests { let test_buf: [u8; 16] = [0; 16]; let addr = local_pool.add(&test_buf).expect("Adding data failed"); let mut rw_guard = local_pool.modify_with_guard(addr); - let _ = rw_guard.modify().expect("modify failed"); + let _ = rw_guard.update(&mut |_| {}).expect("modify failed"); drop(rw_guard); assert!(!local_pool.has_element_at(&addr).expect("Invalid address")); } @@ -840,13 +900,25 @@ mod tests { let addr1 = local_pool.add(&test_buf_1).expect("Adding data failed"); let addr2 = local_pool.add(&test_buf_2).expect("Adding data failed"); let addr3 = local_pool.add(&test_buf_3).expect("Adding data failed"); - let tm0_raw = local_pool.modify(&addr0).expect("Modifying data failed"); - assert_eq!(tm0_raw, test_buf_0); - let tm1_raw = local_pool.modify(&addr1).expect("Modifying data failed"); - assert_eq!(tm1_raw, test_buf_1); - let tm2_raw = local_pool.modify(&addr2).expect("Modifying data failed"); - assert_eq!(tm2_raw, test_buf_2); - let tm3_raw = local_pool.modify(&addr3).expect("Modifying data failed"); - assert_eq!(tm3_raw, test_buf_3); + local_pool + .update(&addr0, &mut |buf| { + assert_eq!(buf, test_buf_0); + }) + .expect("Modifying data failed"); + local_pool + .update(&addr1, &mut |buf| { + assert_eq!(buf, test_buf_1); + }) + .expect("Modifying data failed"); + local_pool + .update(&addr2, &mut |buf| { + assert_eq!(buf, test_buf_2); + }) + .expect("Modifying data failed"); + local_pool + .update(&addr3, &mut |buf| { + assert_eq!(buf, test_buf_3); + }) + .expect("Modifying data failed"); } } diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index 64a78ce..0adaf7d 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -390,7 +390,7 @@ mod alloc_mod { #[cfg(feature = "std")] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] pub mod std_mod { - use crate::pool::{PoolProviderMemInPlaceWithGuards, SharedStaticMemoryPool, StoreAddr}; + use crate::pool::{PoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr}; use crate::pus::verification::{ StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; @@ -947,8 +947,7 @@ pub mod tests { use spacepackets::CcsdsPacket; use crate::pool::{ - PoolProviderMemInPlace, SharedStaticMemoryPool, StaticMemoryPool, StaticPoolConfig, - StoreAddr, + PoolProvider, SharedStaticMemoryPool, StaticMemoryPool, StaticPoolConfig, StoreAddr, }; use crate::pus::verification::RequestId; use crate::tmtc::tm_helper::SharedTmPool; diff --git a/satrs-core/src/pus/scheduler.rs b/satrs-core/src/pus/scheduler.rs index 549c8cd..7258f08 100644 --- a/satrs-core/src/pus/scheduler.rs +++ b/satrs-core/src/pus/scheduler.rs @@ -16,7 +16,7 @@ use spacepackets::{ByteConversionError, CcsdsPacket}; #[cfg(feature = "std")] use std::error::Error; -use crate::pool::{PoolProviderMemInPlace, StoreError}; +use crate::pool::{PoolProvider, StoreError}; #[cfg(feature = "alloc")] pub use alloc_mod::*; @@ -241,10 +241,7 @@ impl Error for ScheduleError { pub trait PusSchedulerInterface { type TimeProvider: CcsdsTimeProvider + TimeReader; - fn reset( - &mut self, - store: &mut (impl PoolProviderMemInPlace + ?Sized), - ) -> Result<(), StoreError>; + fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError>; fn is_enabled(&self) -> bool; @@ -267,7 +264,7 @@ pub trait PusSchedulerInterface { fn insert_wrapped_tc( &mut self, pus_tc: &(impl IsPusTelecommand + PusPacket + GenericPusTcSecondaryHeader), - pool: &mut (impl PoolProviderMemInPlace + ?Sized), + pool: &mut (impl PoolProvider + ?Sized), ) -> Result { if PusPacket::service(pus_tc) != 11 { return Err(ScheduleError::WrongService(PusPacket::service(pus_tc))); @@ -293,7 +290,7 @@ pub trait PusSchedulerInterface { &mut self, time_stamp: UnixTimestamp, tc: &[u8], - pool: &mut (impl PoolProviderMemInPlace + ?Sized), + pool: &mut (impl PoolProvider + ?Sized), ) -> Result { let check_tc = PusTcReader::new(tc)?; if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { @@ -343,7 +340,7 @@ pub fn generate_insert_telecommand_app_data( #[cfg(feature = "alloc")] pub mod alloc_mod { use super::*; - use crate::pool::{PoolProviderMemInPlace, StoreAddr, StoreError}; + use crate::pool::{PoolProvider, StoreAddr, StoreError}; use alloc::collections::btree_map::{Entry, Range}; use alloc::collections::BTreeMap; use alloc::vec; @@ -400,6 +397,7 @@ pub mod alloc_mod { #[derive(Debug)] pub struct PusScheduler { tc_map: BTreeMap>, + tc_buf: Vec, pub(crate) current_time: UnixTimestamp, time_margin: Duration, enabled: bool, @@ -413,9 +411,16 @@ pub mod alloc_mod { /// * `time_margin` - This time margin is used when inserting new telecommands into the /// schedule. If the release time of a new telecommand is earlier than the time margin /// added to the current time, it will not be inserted into the schedule. - pub fn new(init_current_time: UnixTimestamp, time_margin: Duration) -> Self { + /// * `tc_buf_size` - Buffer for temporary storage of telecommand packets. This buffer + /// should be large enough to accomodate the largest expected TC packets. + pub fn new( + init_current_time: UnixTimestamp, + time_margin: Duration, + tc_buf_size: usize, + ) -> Self { PusScheduler { tc_map: Default::default(), + tc_buf: vec![0; tc_buf_size], current_time: init_current_time, time_margin, enabled: true, @@ -425,8 +430,15 @@ pub mod alloc_mod { /// Like [Self::new], but sets the `init_current_time` parameter to the current system time. #[cfg(feature = "std")] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] - pub fn new_with_current_init_time(time_margin: Duration) -> Result { - Ok(Self::new(UnixTimestamp::from_now()?, time_margin)) + pub fn new_with_current_init_time( + time_margin: Duration, + tc_buf_size: usize, + ) -> Result { + Ok(Self::new( + UnixTimestamp::from_now()?, + time_margin, + tc_buf_size, + )) } pub fn num_scheduled_telecommands(&self) -> u64 { @@ -476,7 +488,7 @@ pub mod alloc_mod { &mut self, time_stamp: UnixTimestamp, tc: &[u8], - pool: &mut (impl PoolProviderMemInPlace + ?Sized), + pool: &mut (impl PoolProvider + ?Sized), ) -> Result { let check_tc = PusTcReader::new(tc)?; if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { @@ -499,7 +511,7 @@ pub mod alloc_mod { pub fn insert_wrapped_tc_cds_short( &mut self, pus_tc: &PusTc, - pool: &mut (impl PoolProviderMemInPlace + ?Sized), + pool: &mut (impl PoolProvider + ?Sized), ) -> Result { self.insert_wrapped_tc::(pus_tc, pool) } @@ -509,7 +521,7 @@ pub mod alloc_mod { pub fn insert_wrapped_tc_cds_long( &mut self, pus_tc: &PusTc, - pool: &mut (impl PoolProviderMemInPlace + ?Sized), + pool: &mut (impl PoolProvider + ?Sized), ) -> Result { self.insert_wrapped_tc::>(pus_tc, pool) } @@ -525,7 +537,7 @@ pub mod alloc_mod { pub fn delete_by_time_filter( &mut self, time_window: TimeWindow, - pool: &mut (impl PoolProviderMemInPlace + ?Sized), + pool: &mut (impl PoolProvider + ?Sized), ) -> Result { let range = self.retrieve_by_time_filter(time_window); let mut del_packets = 0; @@ -555,7 +567,7 @@ pub mod alloc_mod { /// the last deletion will be supplied in addition to the number of deleted commands. pub fn delete_all( &mut self, - pool: &mut (impl PoolProviderMemInPlace + ?Sized), + pool: &mut (impl PoolProvider + ?Sized), ) -> Result { self.delete_by_time_filter(TimeWindow::::new_select_all(), pool) } @@ -613,7 +625,7 @@ pub mod alloc_mod { pub fn delete_by_request_id_and_from_pool( &mut self, req_id: &RequestId, - pool: &mut (impl PoolProviderMemInPlace + ?Sized), + pool: &mut (impl PoolProvider + ?Sized), ) -> Result { if let DeletionResult::WithStoreDeletion(v) = self.delete_by_request_id_internal_with_store_deletion(req_id, pool) @@ -645,7 +657,7 @@ pub mod alloc_mod { fn delete_by_request_id_internal_with_store_deletion( &mut self, req_id: &RequestId, - pool: &mut (impl PoolProviderMemInPlace + ?Sized), + pool: &mut (impl PoolProvider + ?Sized), ) -> DeletionResult { let mut idx_found = None; for time_bucket in &mut self.tc_map { @@ -688,15 +700,17 @@ pub mod alloc_mod { pub fn release_telecommands bool>( &mut self, mut releaser: R, - tc_store: &mut (impl PoolProviderMemInPlace + ?Sized), + tc_store: &mut (impl PoolProvider + ?Sized), ) -> Result { let tcs_to_release = self.telecommands_to_release(); let mut released_tcs = 0; let mut store_error = Ok(()); for tc in tcs_to_release { for info in tc.1 { - let tc = tc_store.read(&info.addr).map_err(|e| (released_tcs, e))?; - let should_delete = releaser(self.enabled, info, tc); + let tc = tc_store + .read(&info.addr, &mut self.tc_buf) + .map_err(|e| (released_tcs, e))?; + let should_delete = releaser(self.enabled, info, &self.tc_buf); released_tcs += 1; if should_delete { let res = tc_store.delete(info.addr); @@ -721,16 +735,16 @@ pub mod alloc_mod { pub fn release_telecommands_no_deletion( &mut self, mut releaser: R, - tc_store: &(impl PoolProviderMemInPlace + ?Sized), + tc_store: &(impl PoolProvider + ?Sized), ) -> Result, (Vec, StoreError)> { let tcs_to_release = self.telecommands_to_release(); let mut released_tcs = Vec::new(); for tc in tcs_to_release { for info in tc.1 { let tc = tc_store - .read(&info.addr) + .read(&info.addr, &mut self.tc_buf) .map_err(|e| (released_tcs.clone(), e))?; - releaser(self.is_enabled(), info, tc); + releaser(self.is_enabled(), info, &self.tc_buf); released_tcs.push(*info); } } @@ -753,10 +767,7 @@ pub mod alloc_mod { /// The holding store for the telecommands needs to be passed so all the stored telecommands /// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error /// will be returned but the method will still try to delete all the commands in the schedule. - fn reset( - &mut self, - store: &mut (impl PoolProviderMemInPlace + ?Sized), - ) -> Result<(), StoreError> { + fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError> { self.enabled = false; let mut deletion_ok = Ok(()); for tc_lists in &mut self.tc_map { @@ -814,8 +825,7 @@ pub mod alloc_mod { mod tests { use super::*; use crate::pool::{ - PoolProviderMemInPlace, StaticMemoryPool, StaticPoolAddr, StaticPoolConfig, StoreAddr, - StoreError, + PoolProvider, StaticMemoryPool, StaticPoolAddr, StaticPoolConfig, StoreAddr, StoreError, }; use alloc::collections::btree_map::Range; use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader}; @@ -900,8 +910,11 @@ mod tests { #[test] fn test_enable_api() { - let mut scheduler = - PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut scheduler = PusScheduler::new( + UnixTimestamp::new_only_seconds(0), + Duration::from_secs(5), + 256, + ); assert!(scheduler.is_enabled()); scheduler.disable(); assert!(!scheduler.is_enabled()); @@ -912,8 +925,11 @@ mod tests { #[test] fn test_reset() { let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); - let mut scheduler = - PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut scheduler = PusScheduler::new( + UnixTimestamp::new_only_seconds(0), + Duration::from_secs(5), + 64, + ); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); @@ -955,8 +971,11 @@ mod tests { #[test] fn insert_multi_with_same_time() { - let mut scheduler = - PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut scheduler = PusScheduler::new( + UnixTimestamp::new_only_seconds(0), + Duration::from_secs(5), + 128, + ); scheduler .insert_unwrapped_and_stored_tc( @@ -1015,8 +1034,11 @@ mod tests { #[test] fn test_time_update() { - let mut scheduler = - PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut scheduler = PusScheduler::new( + UnixTimestamp::new_only_seconds(0), + Duration::from_secs(5), + 128, + ); let time = UnixTimestamp::new(1, 2).unwrap(); scheduler.update_time(time); assert_eq!(scheduler.current_time(), &time); @@ -1064,8 +1086,11 @@ mod tests { #[test] fn test_release_telecommands() { let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); - let mut scheduler = - PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut scheduler = PusScheduler::new( + UnixTimestamp::new_only_seconds(0), + Duration::from_secs(5), + 128, + ); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); @@ -1129,8 +1154,11 @@ mod tests { #[test] fn release_multi_with_same_time() { let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); - let mut scheduler = - PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut scheduler = PusScheduler::new( + UnixTimestamp::new_only_seconds(0), + Duration::from_secs(5), + 128, + ); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); @@ -1186,8 +1214,11 @@ mod tests { #[test] fn release_with_scheduler_disabled() { let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); - let mut scheduler = - PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut scheduler = PusScheduler::new( + UnixTimestamp::new_only_seconds(0), + Duration::from_secs(5), + 128, + ); scheduler.disable(); @@ -1250,8 +1281,11 @@ mod tests { #[test] fn insert_unwrapped_tc() { - let mut scheduler = - PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut scheduler = PusScheduler::new( + UnixTimestamp::new_only_seconds(0), + Duration::from_secs(5), + 128, + ); let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 32] = [0; 32]; @@ -1267,8 +1301,9 @@ mod tests { assert!(pool.has_element_at(&tc_info_0.addr()).unwrap()); - let data = pool.read(&tc_info_0.addr()).unwrap(); - let check_tc = PusTcReader::new(data).expect("incorrect Pus tc raw data"); + let mut read_buf: [u8; 64] = [0; 64]; + let data = pool.read(&tc_info_0.addr(), &mut read_buf).unwrap(); + let check_tc = PusTcReader::new(&read_buf).expect("incorrect Pus tc raw data"); assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); assert_eq!(scheduler.num_scheduled_telecommands(), 1); @@ -1289,15 +1324,18 @@ mod tests { .release_telecommands(&mut test_closure, &mut pool) .unwrap(); - let data = pool.read(&addr_vec[0]).unwrap(); - let check_tc = PusTcReader::new(data).expect("incorrect Pus tc raw data"); + let data = pool.read(&addr_vec[0], &mut read_buf).unwrap(); + let check_tc = PusTcReader::new(&read_buf).expect("incorrect Pus tc raw data"); assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); } #[test] fn insert_wrapped_tc() { - let mut scheduler = - PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut scheduler = PusScheduler::new( + UnixTimestamp::new_only_seconds(0), + Duration::from_secs(5), + 128, + ); let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); @@ -1313,8 +1351,8 @@ mod tests { assert!(pool.has_element_at(&info.addr).unwrap()); - let data = pool.read(&info.addr).unwrap(); - let check_tc = PusTcReader::new(data).expect("incorrect Pus tc raw data"); + let data = pool.read(&info.addr, &mut buf).unwrap(); + let check_tc = PusTcReader::new(&buf).expect("incorrect Pus tc raw data"); assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); assert_eq!(scheduler.num_scheduled_telecommands(), 1); @@ -1335,15 +1373,18 @@ mod tests { .release_telecommands(&mut test_closure, &mut pool) .unwrap(); - let data = pool.read(&addr_vec[0]).unwrap(); - let check_tc = PusTcReader::new(data).expect("incorrect PUS tc raw data"); + let data = pool.read(&addr_vec[0], &mut buf).unwrap(); + let check_tc = PusTcReader::new(&buf).expect("incorrect PUS tc raw data"); assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); } #[test] fn insert_wrong_service() { - let mut scheduler = - PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut scheduler = PusScheduler::new( + UnixTimestamp::new_only_seconds(0), + Duration::from_secs(5), + 128, + ); let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); diff --git a/satrs-core/src/pus/scheduler_srv.rs b/satrs-core/src/pus/scheduler_srv.rs index 2225c03..3e9ab32 100644 --- a/satrs-core/src/pus/scheduler_srv.rs +++ b/satrs-core/src/pus/scheduler_srv.rs @@ -1,6 +1,6 @@ use super::scheduler::PusSchedulerInterface; use super::{EcssTcInMemConverter, PusServiceBase, PusServiceHelper}; -use crate::pool::PoolProviderMemInPlace; +use crate::pool::PoolProvider; use crate::pus::{PusPacketHandlerResult, PusPacketHandlingError}; use alloc::string::ToString; use spacepackets::ecss::{scheduling, PusPacket}; @@ -42,7 +42,7 @@ impl pub fn handle_one_tc( &mut self, - sched_tc_pool: &mut (impl PoolProviderMemInPlace + ?Sized), + sched_tc_pool: &mut (impl PoolProvider + ?Sized), ) -> Result { let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?; if possible_packet.is_none() { @@ -237,7 +237,7 @@ mod tests { fn reset( &mut self, - _store: &mut (impl crate::pool::PoolProviderMemInPlace + ?Sized), + _store: &mut (impl crate::pool::PoolProvider + ?Sized), ) -> Result<(), crate::pool::StoreError> { self.reset_count += 1; Ok(()) diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index ca7c272..1874411 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -1325,7 +1325,7 @@ mod std_mod { #[cfg(test)] mod tests { - use crate::pool::{PoolProviderMemInPlaceWithGuards, StaticMemoryPool, StaticPoolConfig}; + use crate::pool::{PoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig}; use crate::pus::tests::CommonTmInfo; use crate::pus::verification::{ EcssTmSenderCore, EcssTmtcError, FailParams, FailParamsWithStep, RequestId, TcStateNone, diff --git a/satrs-core/src/tmtc/tm_helper.rs b/satrs-core/src/tmtc/tm_helper.rs index c8a09a3..e13bd26 100644 --- a/satrs-core/src/tmtc/tm_helper.rs +++ b/satrs-core/src/tmtc/tm_helper.rs @@ -8,9 +8,7 @@ pub use std_mod::*; #[cfg(feature = "std")] pub mod std_mod { - use crate::pool::{ - PoolProviderMemInPlace, SharedStaticMemoryPool, StaticMemoryPool, StoreAddr, - }; + use crate::pool::{PoolProvider, SharedStaticMemoryPool, StaticMemoryPool, StoreAddr}; use crate::pus::EcssTmtcError; use spacepackets::ecss::tm::PusTmCreator; use spacepackets::ecss::WritablePusPacket; @@ -37,10 +35,11 @@ pub mod std_mod { pub fn add_pus_tm(&self, pus_tm: &PusTmCreator) -> Result { let mut pg = self.0.write().map_err(|_| EcssTmtcError::StoreLock)?; - let (addr, buf) = pg.free_element(pus_tm.len_written())?; - pus_tm - .write_to_bytes(buf) - .expect("writing PUS TM to store failed"); + let addr = pg.free_element(pus_tm.len_written(), &mut |buf| { + pus_tm + .write_to_bytes(buf) + .expect("writing PUS TM to store failed"); + })?; Ok(addr) } } diff --git a/satrs-core/tests/pools.rs b/satrs-core/tests/pools.rs index cce2114..17cd4eb 100644 --- a/satrs-core/tests/pools.rs +++ b/satrs-core/tests/pools.rs @@ -1,6 +1,4 @@ -use satrs_core::pool::{ - PoolGuard, PoolProviderMemInPlace, StaticMemoryPool, StaticPoolConfig, StoreAddr, -}; +use satrs_core::pool::{PoolGuard, PoolProvider, StaticMemoryPool, StaticPoolConfig, StoreAddr}; use std::ops::DerefMut; use std::sync::mpsc; use std::sync::mpsc::{Receiver, Sender}; diff --git a/satrs-core/tests/pus_verification.rs b/satrs-core/tests/pus_verification.rs index 557a8b2..f8f6a1c 100644 --- a/satrs-core/tests/pus_verification.rs +++ b/satrs-core/tests/pus_verification.rs @@ -2,8 +2,7 @@ pub mod crossbeam_test { use hashbrown::HashMap; use satrs_core::pool::{ - PoolProviderMemInPlace, PoolProviderMemInPlaceWithGuards, StaticMemoryPool, - StaticPoolConfig, + PoolProvider, PoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig, }; use satrs_core::pus::verification::{ FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender, diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index 0842904..f3ea5e5 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -2,7 +2,7 @@ use std::sync::mpsc; use std::time::Duration; use log::{error, info, warn}; -use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StoreAddr}; +use satrs_core::pool::{PoolProvider, StaticMemoryPool, StoreAddr}; use satrs_core::pus::scheduler::{PusScheduler, TcInfo}; use satrs_core::pus::scheduler_srv::PusService11SchedHandler; use satrs_core::pus::verification::VerificationReporterWithSender; @@ -70,7 +70,7 @@ impl Pus11Wrapper { let released_tcs = self .pus_11_handler .scheduler_mut() - .release_telecommands(releaser, &mut self.sched_tc_pool) + .release_telecommands(&mut self.tc_releaser.release, &mut self.sched_tc_pool) .expect("releasing TCs failed"); if released_tcs > 0 { info!("{released_tcs} TC(s) released from scheduler"); diff --git a/satrs-example/src/tm_funnel.rs b/satrs-example/src/tm_funnel.rs index 9069d72..3c4f4ea 100644 --- a/satrs-example/src/tm_funnel.rs +++ b/satrs-example/src/tm_funnel.rs @@ -4,7 +4,7 @@ use std::{ }; use satrs_core::{ - pool::{PoolProviderMemInPlace, StoreAddr}, + pool::{PoolProvider, StoreAddr}, seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, spacepackets::{ ecss::{tm::PusTmZeroCopyWriter, PusPacket}, @@ -97,7 +97,7 @@ impl TmFunnelStatic { let shared_pool = self.shared_tm_store.clone_backing_pool(); let mut pool_guard = shared_pool.write().expect("Locking TM pool failed"); let tm_raw = pool_guard - .modify(&addr) + .update(&addr) .expect("Reading TM from pool failed"); let zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw, MIN_CDS_FIELD_LEN) .expect("Creating TM zero copy writer failed"); diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index d173ac7..e0048c7 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -5,7 +5,7 @@ use std::sync::mpsc::{self, Receiver, SendError, Sender, TryRecvError}; use thiserror::Error; use crate::pus::PusReceiver; -use satrs_core::pool::{PoolProviderMemInPlace, SharedStaticMemoryPool, StoreAddr, StoreError}; +use satrs_core::pool::{PoolProvider, SharedStaticMemoryPool, StoreAddr, StoreError}; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::tmtc::ReceivesCcsdsTc; diff --git a/satrs-example/src/udp.rs b/satrs-example/src/udp.rs index 11f84b2..c2a8eea 100644 --- a/satrs-example/src/udp.rs +++ b/satrs-example/src/udp.rs @@ -6,7 +6,7 @@ use std::{ use log::{info, warn}; use satrs_core::{ hal::std::udp_server::{ReceiveResult, UdpTcServer}, - pool::{PoolProviderMemInPlaceWithGuards, SharedStaticMemoryPool, StoreAddr}, + pool::{PoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr}, tmtc::CcsdsError, };