diff --git a/satrs-core/src/pool.rs b/satrs-core/src/pool.rs index 4f526e4..928285f 100644 --- a/satrs-core/src/pool.rs +++ b/satrs-core/src/pool.rs @@ -3,11 +3,12 @@ //! # Example for the [StaticMemoryPool] //! //! ``` -//! use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StaticPoolConfig}; +//! use satrs_core::pool::{PoolProvider, StaticMemoryPool, StaticPoolConfig}; //! //! // 4 buckets of 4 bytes, 2 of 8 bytes and 1 of 16 bytes //! let pool_cfg = StaticPoolConfig::new(vec![(4, 4), (2, 8), (1, 16)]); //! let mut local_pool = StaticMemoryPool::new(pool_cfg); +//! let mut read_buf: [u8; 16] = [0; 16]; //! let mut addr; //! { //! // Add new data to the pool @@ -20,25 +21,25 @@ //! //! { //! // Read the store data back -//! let res = local_pool.read(&addr); +//! let res = local_pool.read(&addr, &mut read_buf); //! assert!(res.is_ok()); -//! let buf_read_back = res.unwrap(); -//! assert_eq!(buf_read_back.len(), 4); -//! assert_eq!(buf_read_back[0], 42); +//! let read_bytes = res.unwrap(); +//! assert_eq!(read_bytes, 4); +//! assert_eq!(read_buf[0], 42); //! // Modify the stored data -//! let res = local_pool.modify(&addr); +//! let res = local_pool.update(&addr, |buf| { +//! buf[0] = 12; +//! }); //! assert!(res.is_ok()); -//! let buf_read_back = res.unwrap(); -//! buf_read_back[0] = 12; //! } //! //! { //! // Read the modified data back -//! let res = local_pool.read(&addr); +//! let res = local_pool.read(&addr, &mut read_buf); //! assert!(res.is_ok()); -//! let buf_read_back = res.unwrap(); -//! assert_eq!(buf_read_back.len(), 4); -//! assert_eq!(buf_read_back[0], 12); +//! let read_bytes = res.unwrap(); +//! assert_eq!(read_bytes, 4); +//! assert_eq!(read_buf[0], 12); //! } //! //! // Delete the stored data @@ -46,21 +47,21 @@ //! //! // Get a free element in the pool with an appropriate size //! { -//! let res = local_pool.free_element(12); +//! let res = local_pool.free_element(12, |buf| { +//! buf[0] = 7; +//! }); //! assert!(res.is_ok()); -//! let (tmp, mut_buf) = res.unwrap(); -//! addr = tmp; -//! mut_buf[0] = 7; +//! addr = res.unwrap(); //! } //! //! // Read back the data //! { //! // Read the store data back -//! let res = local_pool.read(&addr); +//! let res = local_pool.read(&addr, &mut read_buf); //! assert!(res.is_ok()); -//! let buf_read_back = res.unwrap(); -//! assert_eq!(buf_read_back.len(), 12); -//! assert_eq!(buf_read_back[0], 7); +//! let read_bytes = res.unwrap(); +//! assert_eq!(read_bytes, 12); +//! assert_eq!(read_buf[0], 7); //! } //! ``` #[cfg(feature = "alloc")] @@ -153,6 +154,7 @@ pub enum StoreError { /// Valid subpool and packet index, but no data is stored at the given address DataDoesNotExist(StoreAddr), ByteConversionError(spacepackets::ByteConversionError), + LockError, /// Internal or configuration errors InternalError(u32), } @@ -178,6 +180,9 @@ impl Display for StoreError { StoreError::ByteConversionError(e) => { write!(f, "store error: {e}") } + StoreError::LockError => { + write!(f, "lock error") + } } } } @@ -212,7 +217,7 @@ pub trait PoolProvider { fn free_element( &mut self, len: usize, - writer: &mut W, + writer: W, ) -> Result; /// Modify data added previously using a given [StoreAddr] by yielding a mutable reference @@ -220,7 +225,7 @@ pub trait PoolProvider { fn update( &mut self, addr: &StoreAddr, - updater: &mut U, + updater: U, ) -> Result<(), StoreError>; /// Read data by yielding a read-only reference given a [StoreAddr] @@ -233,12 +238,12 @@ pub trait PoolProvider { /// Retrieve the length of the data at the given store address. fn len_of_data(&self, addr: &StoreAddr) -> Result; - /*{ - if !self.has_element_at(addr)? { - return Err(StoreError::DataDoesNotExist(*addr)); - } - Ok(self.read(addr)?.len()) - }*/ + #[cfg(feature = "alloc")] + fn read_as_vec(&self, addr: &StoreAddr) -> Result, StoreError> { + let mut vec = alloc::vec![0; self.len_of_data(addr)?]; + self.read(addr, &mut vec)?; + Ok(vec) + } } pub trait PoolProviderWithGuards: PoolProvider { @@ -285,6 +290,11 @@ impl<'a, MemProvider: PoolProvider> PoolGuard<'a, MemProvider> { self.pool.read(&self.addr, buf) } + #[cfg(feature = "alloc")] + pub fn read_as_vec(&self) -> Result, StoreError> { + self.pool.read_as_vec(&self.addr) + } + /// Releasing the pool guard will disable the automatic deletion of the data when the guard /// is dropped. pub fn release(&mut self) { @@ -509,7 +519,7 @@ mod alloc_mod { fn free_element( &mut self, len: usize, - writer: &mut W, + mut writer: W, ) -> Result { if len > POOL_MAX_SIZE { return Err(StoreError::DataTooLarge(len)); @@ -525,7 +535,7 @@ mod alloc_mod { fn update( &mut self, addr: &StoreAddr, - updater: &mut U, + mut updater: U, ) -> Result<(), StoreError> { let addr = StaticPoolAddr::from(*addr); let curr_size = self.addr_check(&addr)?; @@ -701,7 +711,7 @@ mod tests { { // Verify that the slot is free by trying to get a reference to it - let res = local_pool + local_pool .update(&addr, &mut |buf: &mut [u8]| { buf[0] = 0; buf[1] = 0x42; @@ -709,7 +719,7 @@ mod tests { .expect("Modifying data failed"); } - let res = local_pool + local_pool .read(&addr, &mut test_buf) .expect("Reading back data failed"); assert_eq!(test_buf[0], 0); @@ -722,13 +732,13 @@ mod tests { fn test_consecutive_reservation() { let mut local_pool = basic_small_pool(); // Reserve two smaller blocks consecutively and verify that the third reservation fails - let res = local_pool.free_element(8, &mut |_| {}); + let res = local_pool.free_element(8, |_| {}); assert!(res.is_ok()); let addr0 = res.unwrap(); - let res = local_pool.free_element(8, &mut |_| {}); + let res = local_pool.free_element(8, |_| {}); assert!(res.is_ok()); let addr1 = res.unwrap(); - let res = local_pool.free_element(8, &mut |_| {}); + let res = local_pool.free_element(8, |_| {}); assert!(res.is_err()); let err = res.unwrap_err(); assert_eq!(err, StoreError::StoreFull(1)); @@ -819,7 +829,7 @@ mod tests { #[test] fn test_data_too_large_1() { let mut local_pool = basic_small_pool(); - let res = local_pool.free_element(POOL_MAX_SIZE + 1, &mut |_| {}); + let res = local_pool.free_element(POOL_MAX_SIZE + 1, |_| {}); assert!(res.is_err()); assert_eq!( res.unwrap_err(), @@ -831,7 +841,7 @@ mod tests { fn test_free_element_too_large() { let mut local_pool = basic_small_pool(); // Try to request a slot which is too large - let res = local_pool.free_element(20, &mut |_| {}); + let res = local_pool.free_element(20, |_| {}); assert!(res.is_err()); assert_eq!(res.unwrap_err(), StoreError::DataTooLarge(20)); } @@ -873,7 +883,7 @@ mod tests { let test_buf: [u8; 16] = [0; 16]; let addr = local_pool.add(&test_buf).expect("Adding data failed"); let mut rw_guard = PoolRwGuard::new(&mut local_pool, addr); - let _ = rw_guard.update(&mut |_| {}).expect("modify failed"); + rw_guard.update(&mut |_| {}).expect("modify failed"); drop(rw_guard); assert!(!local_pool.has_element_at(&addr).expect("Invalid address")); } @@ -884,7 +894,7 @@ mod tests { let test_buf: [u8; 16] = [0; 16]; let addr = local_pool.add(&test_buf).expect("Adding data failed"); let mut rw_guard = local_pool.modify_with_guard(addr); - let _ = rw_guard.update(&mut |_| {}).expect("modify failed"); + rw_guard.update(&mut |_| {}).expect("modify failed"); drop(rw_guard); assert!(!local_pool.has_element_at(&addr).expect("Invalid address")); } @@ -901,22 +911,22 @@ mod tests { let addr2 = local_pool.add(&test_buf_2).expect("Adding data failed"); let addr3 = local_pool.add(&test_buf_3).expect("Adding data failed"); local_pool - .update(&addr0, &mut |buf| { + .update(&addr0, |buf| { assert_eq!(buf, test_buf_0); }) .expect("Modifying data failed"); local_pool - .update(&addr1, &mut |buf| { + .update(&addr1, |buf| { assert_eq!(buf, test_buf_1); }) .expect("Modifying data failed"); local_pool - .update(&addr2, &mut |buf| { + .update(&addr2, |buf| { assert_eq!(buf, test_buf_2); }) .expect("Modifying data failed"); local_pool - .update(&addr3, &mut |buf| { + .update(&addr3, |buf| { assert_eq!(buf, test_buf_3); }) .expect("Modifying data failed"); diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index 0adaf7d..5a21d95 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -390,7 +390,7 @@ mod alloc_mod { #[cfg(feature = "std")] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] pub mod std_mod { - use crate::pool::{PoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr}; + use crate::pool::{PoolProvider, PoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr}; use crate::pus::verification::{ StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; @@ -789,12 +789,15 @@ pub mod std_mod { .shared_tc_store .write() .map_err(|_| PusPacketHandlingError::EcssTmtc(EcssTmtcError::StoreLock))?; - let tc_guard = tc_pool.read_with_guard(addr); - let tc_raw = tc_guard.read().unwrap(); - if tc_raw.len() > self.pus_buf.len() { - return Err(PusPacketHandlingError::PusPacketTooLarge(tc_raw.len())); + let tc_size = tc_pool + .len_of_data(&addr) + .map_err(|e| PusPacketHandlingError::EcssTmtc(EcssTmtcError::Store(e)))?; + if tc_size > self.pus_buf.len() { + return Err(PusPacketHandlingError::PusPacketTooLarge(tc_size)); } - self.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); + let tc_guard = tc_pool.read_with_guard(addr); + // TODO: Proper error handling. + tc_guard.read(&mut self.pus_buf[0..tc_size]).unwrap(); Ok(()) } } @@ -1077,8 +1080,8 @@ pub mod tests { assert!(next_msg.is_ok()); let tm_addr = next_msg.unwrap(); let tm_pool = self.tm_pool.0.read().unwrap(); - let tm_raw = tm_pool.read(&tm_addr).unwrap(); - self.tm_buf[0..tm_raw.len()].copy_from_slice(tm_raw); + let tm_raw = tm_pool.read_as_vec(&tm_addr).unwrap(); + self.tm_buf[0..tm_raw.len()].copy_from_slice(&tm_raw); PusTmReader::new(&self.tm_buf, 7).unwrap().0 } @@ -1095,8 +1098,8 @@ pub mod tests { assert!(next_msg.is_ok()); let tm_addr = next_msg.unwrap(); let tm_pool = self.tm_pool.0.read().unwrap(); - let tm_raw = tm_pool.read(&tm_addr).unwrap(); - let tm = PusTmReader::new(tm_raw, 7).unwrap().0; + let tm_raw = tm_pool.read_as_vec(&tm_addr).unwrap(); + let tm = PusTmReader::new(&tm_raw, 7).unwrap().0; assert_eq!(PusPacket::service(&tm), 1); assert_eq!(PusPacket::subservice(&tm), subservice); assert_eq!(tm.apid(), TEST_APID); diff --git a/satrs-core/src/pus/scheduler.rs b/satrs-core/src/pus/scheduler.rs index 7258f08..cda1cc5 100644 --- a/satrs-core/src/pus/scheduler.rs +++ b/satrs-core/src/pus/scheduler.rs @@ -397,7 +397,6 @@ pub mod alloc_mod { #[derive(Debug)] pub struct PusScheduler { tc_map: BTreeMap>, - tc_buf: Vec, pub(crate) current_time: UnixTimestamp, time_margin: Duration, enabled: bool, @@ -413,14 +412,9 @@ pub mod alloc_mod { /// added to the current time, it will not be inserted into the schedule. /// * `tc_buf_size` - Buffer for temporary storage of telecommand packets. This buffer /// should be large enough to accomodate the largest expected TC packets. - pub fn new( - init_current_time: UnixTimestamp, - time_margin: Duration, - tc_buf_size: usize, - ) -> Self { + pub fn new(init_current_time: UnixTimestamp, time_margin: Duration) -> Self { PusScheduler { tc_map: Default::default(), - tc_buf: vec![0; tc_buf_size], current_time: init_current_time, time_margin, enabled: true, @@ -430,15 +424,8 @@ pub mod alloc_mod { /// Like [Self::new], but sets the `init_current_time` parameter to the current system time. #[cfg(feature = "std")] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] - pub fn new_with_current_init_time( - time_margin: Duration, - tc_buf_size: usize, - ) -> Result { - Ok(Self::new( - UnixTimestamp::from_now()?, - time_margin, - tc_buf_size, - )) + pub fn new_with_current_init_time(time_margin: Duration) -> Result { + Ok(Self::new(UnixTimestamp::from_now()?, time_margin)) } pub fn num_scheduled_telecommands(&self) -> u64 { @@ -687,7 +674,8 @@ pub mod alloc_mod { /// Utility method which calls [Self::telecommands_to_release] and then calls a releaser /// closure for each telecommand which should be released. This function will also delete /// the telecommands from the holding store after calling the release closure if the user - /// returns [true] from the release closure. + /// returns [true] from the release closure. A buffer must be provided to hold the + /// telecommands for the release process. /// /// # Arguments /// @@ -697,20 +685,53 @@ pub mod alloc_mod { /// note that returning false might lead to memory leaks if the TC is not cleared from /// the store in some other way. /// * `tc_store` - The holding store of the telecommands. + /// * `tc_buf` - Buffer to hold each telecommand being released. + pub fn release_telecommands_with_buffer bool>( + &mut self, + releaser: R, + tc_store: &mut (impl PoolProvider + ?Sized), + tc_buf: &mut [u8], + ) -> Result { + self.release_telecommands_internal(releaser, tc_store, Some(tc_buf)) + } + + /// This functions is almost identical to [release_telecommands] but does not require + /// a user provided TC buffer because it will always use the [PoolProvider::read_as_vec] + /// API to read the TC packets. However, this will also perform frequent allocations + /// for all telecommands being released. pub fn release_telecommands bool>( + &mut self, + releaser: R, + tc_store: &mut (impl PoolProvider + ?Sized), + ) -> Result { + self.release_telecommands_internal(releaser, tc_store, None) + } + + fn release_telecommands_internal bool>( &mut self, mut releaser: R, tc_store: &mut (impl PoolProvider + ?Sized), + mut tc_buf: Option<&mut [u8]>, ) -> Result { let tcs_to_release = self.telecommands_to_release(); let mut released_tcs = 0; let mut store_error = Ok(()); for tc in tcs_to_release { for info in tc.1 { - let tc = tc_store - .read(&info.addr, &mut self.tc_buf) - .map_err(|e| (released_tcs, e))?; - let should_delete = releaser(self.enabled, info, &self.tc_buf); + let should_delete = match tc_buf.as_mut() { + Some(buf) => { + tc_store + .read(&info.addr, buf) + .map_err(|e| (released_tcs, e))?; + releaser(self.enabled, info, buf) + } + None => { + let tc = tc_store + .read_as_vec(&info.addr) + .map_err(|e| (released_tcs, e))?; + releaser(self.enabled, info, &tc) + } + }; released_tcs += 1; if should_delete { let res = tc_store.delete(info.addr); @@ -736,15 +757,16 @@ pub mod alloc_mod { &mut self, mut releaser: R, tc_store: &(impl PoolProvider + ?Sized), + tc_buf: &mut [u8], ) -> Result, (Vec, StoreError)> { let tcs_to_release = self.telecommands_to_release(); let mut released_tcs = Vec::new(); for tc in tcs_to_release { for info in tc.1 { - let tc = tc_store - .read(&info.addr, &mut self.tc_buf) + tc_store + .read(&info.addr, tc_buf) .map_err(|e| (released_tcs.clone(), e))?; - releaser(self.is_enabled(), info, &self.tc_buf); + releaser(self.is_enabled(), info, tc_buf); released_tcs.push(*info); } } @@ -910,11 +932,8 @@ mod tests { #[test] fn test_enable_api() { - let mut scheduler = PusScheduler::new( - UnixTimestamp::new_only_seconds(0), - Duration::from_secs(5), - 256, - ); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); assert!(scheduler.is_enabled()); scheduler.disable(); assert!(!scheduler.is_enabled()); @@ -925,11 +944,8 @@ mod tests { #[test] fn test_reset() { let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); - let mut scheduler = PusScheduler::new( - UnixTimestamp::new_only_seconds(0), - Duration::from_secs(5), - 64, - ); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); @@ -971,11 +987,8 @@ mod tests { #[test] fn insert_multi_with_same_time() { - let mut scheduler = PusScheduler::new( - UnixTimestamp::new_only_seconds(0), - Duration::from_secs(5), - 128, - ); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); scheduler .insert_unwrapped_and_stored_tc( @@ -1034,11 +1047,8 @@ mod tests { #[test] fn test_time_update() { - let mut scheduler = PusScheduler::new( - UnixTimestamp::new_only_seconds(0), - Duration::from_secs(5), - 128, - ); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let time = UnixTimestamp::new(1, 2).unwrap(); scheduler.update_time(time); assert_eq!(scheduler.current_time(), &time); @@ -1086,11 +1096,8 @@ mod tests { #[test] fn test_release_telecommands() { let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); - let mut scheduler = PusScheduler::new( - UnixTimestamp::new_only_seconds(0), - Duration::from_secs(5), - 128, - ); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); @@ -1113,8 +1120,9 @@ mod tests { // test 1: too early, no tcs scheduler.update_time(UnixTimestamp::new_only_seconds(99)); + let mut tc_buf: [u8; 128] = [0; 128]; scheduler - .release_telecommands(&mut test_closure_1, &mut pool) + .release_telecommands_with_buffer(&mut test_closure_1, &mut pool, &mut tc_buf) .expect("deletion failed"); // test 2: exact time stamp of tc, releases 1 tc @@ -1136,7 +1144,7 @@ mod tests { scheduler.update_time(UnixTimestamp::new_only_seconds(206)); released = scheduler - .release_telecommands(&mut test_closure_2, &mut pool) + .release_telecommands_with_buffer(&mut test_closure_2, &mut pool, &mut tc_buf) .expect("deletion failed"); assert_eq!(released, 1); // TC is deleted. @@ -1154,11 +1162,8 @@ mod tests { #[test] fn release_multi_with_same_time() { let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); - let mut scheduler = PusScheduler::new( - UnixTimestamp::new_only_seconds(0), - Duration::from_secs(5), - 128, - ); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); @@ -1185,9 +1190,10 @@ mod tests { // test 1: too early, no tcs scheduler.update_time(UnixTimestamp::new_only_seconds(99)); + let mut tc_buf: [u8; 128] = [0; 128]; let mut released = scheduler - .release_telecommands(&mut test_closure, &mut pool) + .release_telecommands_with_buffer(&mut test_closure, &mut pool, &mut tc_buf) .expect("deletion failed"); assert_eq!(released, 0); @@ -1214,11 +1220,8 @@ mod tests { #[test] fn release_with_scheduler_disabled() { let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); - let mut scheduler = PusScheduler::new( - UnixTimestamp::new_only_seconds(0), - Duration::from_secs(5), - 128, - ); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); scheduler.disable(); @@ -1240,11 +1243,13 @@ mod tests { true }; + let mut tc_buf: [u8; 128] = [0; 128]; + // test 1: too early, no tcs scheduler.update_time(UnixTimestamp::new_only_seconds(99)); scheduler - .release_telecommands(&mut test_closure_1, &mut pool) + .release_telecommands_with_buffer(&mut test_closure_1, &mut pool, &mut tc_buf) .expect("deletion failed"); // test 2: exact time stamp of tc, releases 1 tc @@ -1281,11 +1286,8 @@ mod tests { #[test] fn insert_unwrapped_tc() { - let mut scheduler = PusScheduler::new( - UnixTimestamp::new_only_seconds(0), - Duration::from_secs(5), - 128, - ); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 32] = [0; 32]; @@ -1302,7 +1304,7 @@ mod tests { assert!(pool.has_element_at(&tc_info_0.addr()).unwrap()); let mut read_buf: [u8; 64] = [0; 64]; - let data = pool.read(&tc_info_0.addr(), &mut read_buf).unwrap(); + pool.read(&tc_info_0.addr(), &mut read_buf).unwrap(); let check_tc = PusTcReader::new(&read_buf).expect("incorrect Pus tc raw data"); assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); @@ -1324,18 +1326,16 @@ mod tests { .release_telecommands(&mut test_closure, &mut pool) .unwrap(); - let data = pool.read(&addr_vec[0], &mut read_buf).unwrap(); + let read_len = pool.read(&addr_vec[0], &mut read_buf).unwrap(); let check_tc = PusTcReader::new(&read_buf).expect("incorrect Pus tc raw data"); + assert_eq!(read_len, check_tc.1); assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); } #[test] fn insert_wrapped_tc() { - let mut scheduler = PusScheduler::new( - UnixTimestamp::new_only_seconds(0), - Duration::from_secs(5), - 128, - ); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); @@ -1351,8 +1351,9 @@ mod tests { assert!(pool.has_element_at(&info.addr).unwrap()); - let data = pool.read(&info.addr, &mut buf).unwrap(); + let read_len = pool.read(&info.addr, &mut buf).unwrap(); let check_tc = PusTcReader::new(&buf).expect("incorrect Pus tc raw data"); + assert_eq!(read_len, check_tc.1); assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); assert_eq!(scheduler.num_scheduled_telecommands(), 1); @@ -1369,22 +1370,22 @@ mod tests { false }; + let mut tc_buf: [u8; 64] = [0; 64]; + scheduler - .release_telecommands(&mut test_closure, &mut pool) + .release_telecommands_with_buffer(&mut test_closure, &mut pool, &mut tc_buf) .unwrap(); - let data = pool.read(&addr_vec[0], &mut buf).unwrap(); + let read_len = pool.read(&addr_vec[0], &mut buf).unwrap(); let check_tc = PusTcReader::new(&buf).expect("incorrect PUS tc raw data"); + assert_eq!(read_len, check_tc.1); assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); } #[test] fn insert_wrong_service() { - let mut scheduler = PusScheduler::new( - UnixTimestamp::new_only_seconds(0), - Duration::from_secs(5), - 128, - ); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)])); @@ -1944,8 +1945,9 @@ mod tests { scheduler.update_time(UnixTimestamp::new_only_seconds(205)); + let mut tc_buf: [u8; 64] = [0; 64]; let tc_info_vec = scheduler - .release_telecommands_no_deletion(&mut test_closure_1, &pool) + .release_telecommands_no_deletion(&mut test_closure_1, &pool, &mut tc_buf) .expect("deletion failed"); assert_eq!(tc_info_vec[0], tc_info_0); assert_eq!(tc_info_vec[1], tc_info_1); diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index 1874411..36514f1 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -15,7 +15,7 @@ //! ``` //! use std::sync::{Arc, mpsc, RwLock}; //! use std::time::Duration; -//! use satrs_core::pool::{PoolProviderMemInPlaceWithGuards, StaticMemoryPool, StaticPoolConfig}; +//! use satrs_core::pool::{PoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig}; //! use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; //! use satrs_core::seq_count::SeqCountProviderSimple; //! use satrs_core::pus::MpscTmInSharedPoolSender; @@ -56,9 +56,7 @@ //! { //! let mut rg = tm_store.write().expect("Error locking shared pool"); //! let store_guard = rg.read_with_guard(addr); -//! let slice = store_guard.read().expect("Error reading TM slice"); -//! tm_len = slice.len(); -//! tm_buf[0..tm_len].copy_from_slice(slice); +//! tm_len = store_guard.read(&mut tm_buf).expect("Error reading TM slice"); //! } //! let (pus_tm, _) = PusTmReader::new(&tm_buf[0..tm_len], 7) //! .expect("Error reading verification TM"); @@ -1547,7 +1545,7 @@ mod tests { let mut sender = TestSender::default(); let fail_code = EcssEnumU16::new(2); let fail_params = FailParams::new(Some(stamp_buf.as_slice()), &fail_code, None); - b.vr.acceptance_failure(tok, &mut sender, fail_params) + b.vr.acceptance_failure(tok, &sender, fail_params) .expect("Sending acceptance success failed"); acceptance_fail_check(&mut sender, tok.req_id, stamp_buf); } @@ -1682,12 +1680,10 @@ mod tests { ); let accepted_token = - b.vr.acceptance_success(tok, &mut sender, Some(&EMPTY_STAMP)) + b.vr.acceptance_success(tok, &sender, Some(&EMPTY_STAMP)) .expect("Sending acceptance success failed"); - let empty = - b.vr.start_failure(accepted_token, &mut sender, fail_params) - .expect("Start failure failure"); - assert_eq!(empty, ()); + b.vr.start_failure(accepted_token, &mut sender, fail_params) + .expect("Start failure failure"); start_fail_check(&mut sender, tok.req_id, fail_data_raw); } @@ -1779,23 +1775,23 @@ mod tests { let mut sender = TestSender::default(); let accepted_token = b .rep() - .acceptance_success(tok, &mut sender, Some(&EMPTY_STAMP)) + .acceptance_success(tok, &sender, Some(&EMPTY_STAMP)) .expect("Sending acceptance success failed"); let started_token = b .rep() - .start_success(accepted_token, &mut sender, Some(&[0, 1, 0, 1, 0, 1, 0])) + .start_success(accepted_token, &sender, Some(&[0, 1, 0, 1, 0, 1, 0])) .expect("Sending start success failed"); b.rep() .step_success( &started_token, - &mut sender, + &sender, Some(&EMPTY_STAMP), EcssEnumU8::new(0), ) .expect("Sending step 0 success failed"); b.vr.step_success( &started_token, - &mut sender, + &sender, Some(&EMPTY_STAMP), EcssEnumU8::new(1), ) @@ -2176,9 +2172,9 @@ mod tests { { let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); let store_guard = rg.read_with_guard(addr); - let slice = store_guard.read().expect("Error reading TM slice"); - tm_len = slice.len(); - tm_buf[0..tm_len].copy_from_slice(slice); + tm_len = store_guard + .read(&mut tm_buf) + .expect("Error reading TM slice"); } let (pus_tm, _) = PusTmReader::new(&tm_buf[0..tm_len], 7).expect("Error reading verification TM"); diff --git a/satrs-core/src/tmtc/tm_helper.rs b/satrs-core/src/tmtc/tm_helper.rs index e13bd26..192d574 100644 --- a/satrs-core/src/tmtc/tm_helper.rs +++ b/satrs-core/src/tmtc/tm_helper.rs @@ -35,7 +35,7 @@ pub mod std_mod { pub fn add_pus_tm(&self, pus_tm: &PusTmCreator) -> Result { let mut pg = self.0.write().map_err(|_| EcssTmtcError::StoreLock)?; - let addr = pg.free_element(pus_tm.len_written(), &mut |buf| { + let addr = pg.free_element(pus_tm.len_written(), |buf| { pus_tm .write_to_bytes(buf) .expect("writing PUS TM to store failed"); diff --git a/satrs-core/tests/pools.rs b/satrs-core/tests/pools.rs index 17cd4eb..a091203 100644 --- a/satrs-core/tests/pools.rs +++ b/satrs-core/tests/pools.rs @@ -25,8 +25,10 @@ fn threaded_usage() { addr = rx.recv().expect("Receiving store address failed"); let mut pool_access = shared_clone.write().unwrap(); let pg = PoolGuard::new(pool_access.deref_mut(), addr); - let read_res = pg.read().expect("Reading failed"); - assert_eq!(read_res, DUMMY_DATA); + let mut read_buf: [u8; 4] = [0; 4]; + let read_bytes = pg.read(&mut read_buf).expect("Reading failed"); + assert_eq!(read_buf, DUMMY_DATA); + assert_eq!(read_bytes, 4); } let pool_access = shared_clone.read().unwrap(); assert!(!pool_access.has_element_at(&addr).expect("Invalid address")); diff --git a/satrs-core/tests/pus_verification.rs b/satrs-core/tests/pus_verification.rs index f8f6a1c..f299272 100644 --- a/satrs-core/tests/pus_verification.rs +++ b/satrs-core/tests/pus_verification.rs @@ -58,15 +58,21 @@ pub mod crossbeam_test { let tc_header = PusTcSecondaryHeader::new_simple(17, 1); let pus_tc_0 = PusTcCreator::new_no_app_data(&mut sph, tc_header, true); req_id_0 = RequestId::new(&pus_tc_0); - let (addr, buf) = tc_guard.free_element(pus_tc_0.len_written()).unwrap(); - pus_tc_0.write_to_bytes(buf).unwrap(); + let addr = tc_guard + .free_element(pus_tc_0.len_written(), |buf| { + pus_tc_0.write_to_bytes(buf).unwrap(); + }) + .unwrap(); tx_tc_0.send(addr).unwrap(); let mut sph = SpHeader::tc_unseg(TEST_APID, 1, 0).unwrap(); let tc_header = PusTcSecondaryHeader::new_simple(5, 1); let pus_tc_1 = PusTcCreator::new_no_app_data(&mut sph, tc_header, true); req_id_1 = RequestId::new(&pus_tc_1); - let (addr, buf) = tc_guard.free_element(pus_tc_0.len_written()).unwrap(); - pus_tc_1.write_to_bytes(buf).unwrap(); + let addr = tc_guard + .free_element(pus_tc_0.len_written(), |buf| { + pus_tc_1.write_to_bytes(buf).unwrap(); + }) + .unwrap(); tx_tc_1.send(addr).unwrap(); } let verif_sender_0 = thread::spawn(move || { @@ -78,9 +84,7 @@ pub mod crossbeam_test { { let mut tc_guard = shared_tc_pool_0.write().unwrap(); let pg = tc_guard.read_with_guard(tc_addr); - let buf = pg.read().unwrap(); - tc_len = buf.len(); - tc_buf[0..tc_len].copy_from_slice(buf); + tc_len = pg.read(&mut tc_buf).unwrap(); } let (_tc, _) = PusTcReader::new(&tc_buf[0..tc_len]).unwrap(); @@ -116,9 +120,7 @@ pub mod crossbeam_test { { let mut tc_guard = shared_tc_pool_1.write().unwrap(); let pg = tc_guard.read_with_guard(tc_addr); - let buf = pg.read().unwrap(); - tc_len = buf.len(); - tc_buf[0..tc_len].copy_from_slice(buf); + tc_len = pg.read(&mut tc_buf).unwrap(); } let (tc, _) = PusTcReader::new(&tc_buf[0..tc_len]).unwrap(); let token = reporter_with_sender_1.add_tc(&tc); @@ -148,9 +150,9 @@ pub mod crossbeam_test { { let mut rg = shared_tm_store.write().expect("Error locking shared pool"); let store_guard = rg.read_with_guard(verif_addr); - let slice = store_guard.read().expect("Error reading TM slice"); - tm_len = slice.len(); - tm_buf[0..tm_len].copy_from_slice(slice); + tm_len = store_guard + .read(&mut tm_buf) + .expect("Error reading TM slice"); } let (pus_tm, _) = PusTmReader::new(&tm_buf[0..tm_len], 7).expect("Error reading verification TM"); diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index f3ea5e5..8ffe8df 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -54,6 +54,7 @@ impl TcReleaser for mpsc::Sender> { pub struct Pus11Wrapper { pub pus_11_handler: PusService11SchedHandler, pub sched_tc_pool: StaticMemoryPool, + pub releaser_buf: [u8; 4096], pub tc_releaser: Box, } @@ -70,7 +71,11 @@ impl Pus11Wrapper { let released_tcs = self .pus_11_handler .scheduler_mut() - .release_telecommands(&mut self.tc_releaser.release, &mut self.sched_tc_pool) + .release_telecommands_with_buffer( + releaser, + &mut self.sched_tc_pool, + &mut self.releaser_buf, + ) .expect("releasing TCs failed"); if released_tcs > 0 { info!("{released_tcs} TC(s) released from scheduler"); @@ -136,6 +141,7 @@ pub fn create_scheduler_service_static( Pus11Wrapper { pus_11_handler, sched_tc_pool, + releaser_buf: [0; 4096], tc_releaser: Box::new(tc_releaser), } } @@ -172,6 +178,7 @@ pub fn create_scheduler_service_dynamic( Pus11Wrapper { pus_11_handler, sched_tc_pool, + releaser_buf: [0; 4096], tc_releaser: Box::new(tc_source_sender), } } diff --git a/satrs-example/src/tm_funnel.rs b/satrs-example/src/tm_funnel.rs index 3c4f4ea..4f88297 100644 --- a/satrs-example/src/tm_funnel.rs +++ b/satrs-example/src/tm_funnel.rs @@ -3,6 +3,7 @@ use std::{ sync::mpsc::{Receiver, Sender}, }; +use log::info; use satrs_core::{ pool::{PoolProvider, StoreAddr}, seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, @@ -63,9 +64,14 @@ impl TmFunnelCommon { *entry += 1; } + Self::packet_printout(&zero_copy_writer); // This operation has to come last! zero_copy_writer.finish(); } + + fn packet_printout(tm: &PusTmZeroCopyWriter) { + info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice()); + } } pub struct TmFunnelStatic { @@ -96,16 +102,21 @@ impl TmFunnelStatic { // the CRC. let shared_pool = self.shared_tm_store.clone_backing_pool(); let mut pool_guard = shared_pool.write().expect("Locking TM pool failed"); - let tm_raw = pool_guard - .update(&addr) + let mut tm_copy = Vec::new(); + pool_guard + .update(&addr, |buf| { + let zero_copy_writer = PusTmZeroCopyWriter::new(buf, MIN_CDS_FIELD_LEN) + .expect("Creating TM zero copy writer failed"); + self.common.apply_packet_processing(zero_copy_writer); + tm_copy = buf.to_vec() + }) .expect("Reading TM from pool failed"); - let zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw, MIN_CDS_FIELD_LEN) - .expect("Creating TM zero copy writer failed"); - self.common.apply_packet_processing(zero_copy_writer); self.tm_server_tx .send(addr) .expect("Sending TM to server failed"); - self.common.sync_tm_tcp_source.add_tm(tm_raw); + // We could also do this step in the update closure, but I'd rather avoid this, could + // lead to nested locking. + self.common.sync_tm_tcp_source.add_tm(&tm_copy); } } } diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index e0048c7..7ab891c 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -28,8 +28,9 @@ pub struct SharedTcPool { impl SharedTcPool { pub fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result { let mut pg = self.pool.write().expect("error locking TC store"); - let (addr, buf) = pg.free_element(pus_tc.len_packed())?; - buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data()); + let addr = pg.free_element(pus_tc.len_packed(), |buf| { + buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data()); + })?; Ok(addr) } } @@ -125,8 +126,8 @@ impl TcSourceTaskStatic { .pool .read() .expect("locking tc pool failed"); - let data = pool.read(&addr).expect("reading pool failed"); - self.tc_buf[0..data.len()].copy_from_slice(data); + pool.read(&addr, &mut self.tc_buf) + .expect("reading pool failed"); drop(pool); match PusTcReader::new(&self.tc_buf) { Ok((pus_tc, _)) => { diff --git a/satrs-example/src/udp.rs b/satrs-example/src/udp.rs index c2a8eea..9e3faa8 100644 --- a/satrs-example/src/udp.rs +++ b/satrs-example/src/udp.rs @@ -29,20 +29,13 @@ impl UdpTmHandler for StaticUdpTmHandler { } let mut store_lock = store_lock.unwrap(); let pg = store_lock.read_with_guard(addr); - let read_res = pg.read(); + let read_res = pg.read_as_vec(); if read_res.is_err() { warn!("Error reading TM pool data"); continue; } let buf = read_res.unwrap(); - if buf.len() > 9 { - let service = buf[7]; - let subservice = buf[8]; - info!("Sending PUS TM[{service},{subservice}]") - } else { - info!("Sending PUS TM"); - } - let result = socket.send_to(buf, recv_addr); + let result = socket.send_to(&buf, recv_addr); if let Err(e) = result { warn!("Sending TM with UDP socket failed: {e}") }