fixed all tests
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good

This commit is contained in:
Robin Müller 2024-02-09 19:21:49 +01:00
parent b5efc0081c
commit 3e5f05e634
Signed by: muellerr
GPG Key ID: A649FB78196E3849
11 changed files with 217 additions and 190 deletions

View File

@ -3,11 +3,12 @@
//! # Example for the [StaticMemoryPool]
//!
//! ```
//! use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StaticPoolConfig};
//! use satrs_core::pool::{PoolProvider, StaticMemoryPool, StaticPoolConfig};
//!
//! // 4 buckets of 4 bytes, 2 of 8 bytes and 1 of 16 bytes
//! let pool_cfg = StaticPoolConfig::new(vec![(4, 4), (2, 8), (1, 16)]);
//! let mut local_pool = StaticMemoryPool::new(pool_cfg);
//! let mut read_buf: [u8; 16] = [0; 16];
//! let mut addr;
//! {
//! // Add new data to the pool
@ -20,25 +21,25 @@
//!
//! {
//! // Read the store data back
//! let res = local_pool.read(&addr);
//! let res = local_pool.read(&addr, &mut read_buf);
//! assert!(res.is_ok());
//! let buf_read_back = res.unwrap();
//! assert_eq!(buf_read_back.len(), 4);
//! assert_eq!(buf_read_back[0], 42);
//! let read_bytes = res.unwrap();
//! assert_eq!(read_bytes, 4);
//! assert_eq!(read_buf[0], 42);
//! // Modify the stored data
//! let res = local_pool.modify(&addr);
//! let res = local_pool.update(&addr, |buf| {
//! buf[0] = 12;
//! });
//! assert!(res.is_ok());
//! let buf_read_back = res.unwrap();
//! buf_read_back[0] = 12;
//! }
//!
//! {
//! // Read the modified data back
//! let res = local_pool.read(&addr);
//! let res = local_pool.read(&addr, &mut read_buf);
//! assert!(res.is_ok());
//! let buf_read_back = res.unwrap();
//! assert_eq!(buf_read_back.len(), 4);
//! assert_eq!(buf_read_back[0], 12);
//! let read_bytes = res.unwrap();
//! assert_eq!(read_bytes, 4);
//! assert_eq!(read_buf[0], 12);
//! }
//!
//! // Delete the stored data
@ -46,21 +47,21 @@
//!
//! // Get a free element in the pool with an appropriate size
//! {
//! let res = local_pool.free_element(12);
//! let res = local_pool.free_element(12, |buf| {
//! buf[0] = 7;
//! });
//! assert!(res.is_ok());
//! let (tmp, mut_buf) = res.unwrap();
//! addr = tmp;
//! mut_buf[0] = 7;
//! addr = res.unwrap();
//! }
//!
//! // Read back the data
//! {
//! // Read the store data back
//! let res = local_pool.read(&addr);
//! let res = local_pool.read(&addr, &mut read_buf);
//! assert!(res.is_ok());
//! let buf_read_back = res.unwrap();
//! assert_eq!(buf_read_back.len(), 12);
//! assert_eq!(buf_read_back[0], 7);
//! let read_bytes = res.unwrap();
//! assert_eq!(read_bytes, 12);
//! assert_eq!(read_buf[0], 7);
//! }
//! ```
#[cfg(feature = "alloc")]
@ -153,6 +154,7 @@ pub enum StoreError {
/// Valid subpool and packet index, but no data is stored at the given address
DataDoesNotExist(StoreAddr),
ByteConversionError(spacepackets::ByteConversionError),
LockError,
/// Internal or configuration errors
InternalError(u32),
}
@ -178,6 +180,9 @@ impl Display for StoreError {
StoreError::ByteConversionError(e) => {
write!(f, "store error: {e}")
}
StoreError::LockError => {
write!(f, "lock error")
}
}
}
}
@ -212,7 +217,7 @@ pub trait PoolProvider {
fn free_element<W: FnMut(&mut [u8])>(
&mut self,
len: usize,
writer: &mut W,
writer: W,
) -> Result<StoreAddr, StoreError>;
/// Modify data added previously using a given [StoreAddr] by yielding a mutable reference
@ -220,7 +225,7 @@ pub trait PoolProvider {
fn update<U: FnMut(&mut [u8])>(
&mut self,
addr: &StoreAddr,
updater: &mut U,
updater: U,
) -> Result<(), StoreError>;
/// Read data by yielding a read-only reference given a [StoreAddr]
@ -233,12 +238,12 @@ pub trait PoolProvider {
/// Retrieve the length of the data at the given store address.
fn len_of_data(&self, addr: &StoreAddr) -> Result<usize, StoreError>;
/*{
if !self.has_element_at(addr)? {
return Err(StoreError::DataDoesNotExist(*addr));
}
Ok(self.read(addr)?.len())
}*/
#[cfg(feature = "alloc")]
fn read_as_vec(&self, addr: &StoreAddr) -> Result<alloc::vec::Vec<u8>, StoreError> {
let mut vec = alloc::vec![0; self.len_of_data(addr)?];
self.read(addr, &mut vec)?;
Ok(vec)
}
}
pub trait PoolProviderWithGuards: PoolProvider {
@ -285,6 +290,11 @@ impl<'a, MemProvider: PoolProvider> PoolGuard<'a, MemProvider> {
self.pool.read(&self.addr, buf)
}
#[cfg(feature = "alloc")]
pub fn read_as_vec(&self) -> Result<alloc::vec::Vec<u8>, StoreError> {
self.pool.read_as_vec(&self.addr)
}
/// Releasing the pool guard will disable the automatic deletion of the data when the guard
/// is dropped.
pub fn release(&mut self) {
@ -509,7 +519,7 @@ mod alloc_mod {
fn free_element<W: FnMut(&mut [u8])>(
&mut self,
len: usize,
writer: &mut W,
mut writer: W,
) -> Result<StoreAddr, StoreError> {
if len > POOL_MAX_SIZE {
return Err(StoreError::DataTooLarge(len));
@ -525,7 +535,7 @@ mod alloc_mod {
fn update<U: FnMut(&mut [u8])>(
&mut self,
addr: &StoreAddr,
updater: &mut U,
mut updater: U,
) -> Result<(), StoreError> {
let addr = StaticPoolAddr::from(*addr);
let curr_size = self.addr_check(&addr)?;
@ -701,7 +711,7 @@ mod tests {
{
// Verify that the slot is free by trying to get a reference to it
let res = local_pool
local_pool
.update(&addr, &mut |buf: &mut [u8]| {
buf[0] = 0;
buf[1] = 0x42;
@ -709,7 +719,7 @@ mod tests {
.expect("Modifying data failed");
}
let res = local_pool
local_pool
.read(&addr, &mut test_buf)
.expect("Reading back data failed");
assert_eq!(test_buf[0], 0);
@ -722,13 +732,13 @@ mod tests {
fn test_consecutive_reservation() {
let mut local_pool = basic_small_pool();
// Reserve two smaller blocks consecutively and verify that the third reservation fails
let res = local_pool.free_element(8, &mut |_| {});
let res = local_pool.free_element(8, |_| {});
assert!(res.is_ok());
let addr0 = res.unwrap();
let res = local_pool.free_element(8, &mut |_| {});
let res = local_pool.free_element(8, |_| {});
assert!(res.is_ok());
let addr1 = res.unwrap();
let res = local_pool.free_element(8, &mut |_| {});
let res = local_pool.free_element(8, |_| {});
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(err, StoreError::StoreFull(1));
@ -819,7 +829,7 @@ mod tests {
#[test]
fn test_data_too_large_1() {
let mut local_pool = basic_small_pool();
let res = local_pool.free_element(POOL_MAX_SIZE + 1, &mut |_| {});
let res = local_pool.free_element(POOL_MAX_SIZE + 1, |_| {});
assert!(res.is_err());
assert_eq!(
res.unwrap_err(),
@ -831,7 +841,7 @@ mod tests {
fn test_free_element_too_large() {
let mut local_pool = basic_small_pool();
// Try to request a slot which is too large
let res = local_pool.free_element(20, &mut |_| {});
let res = local_pool.free_element(20, |_| {});
assert!(res.is_err());
assert_eq!(res.unwrap_err(), StoreError::DataTooLarge(20));
}
@ -873,7 +883,7 @@ mod tests {
let test_buf: [u8; 16] = [0; 16];
let addr = local_pool.add(&test_buf).expect("Adding data failed");
let mut rw_guard = PoolRwGuard::new(&mut local_pool, addr);
let _ = rw_guard.update(&mut |_| {}).expect("modify failed");
rw_guard.update(&mut |_| {}).expect("modify failed");
drop(rw_guard);
assert!(!local_pool.has_element_at(&addr).expect("Invalid address"));
}
@ -884,7 +894,7 @@ mod tests {
let test_buf: [u8; 16] = [0; 16];
let addr = local_pool.add(&test_buf).expect("Adding data failed");
let mut rw_guard = local_pool.modify_with_guard(addr);
let _ = rw_guard.update(&mut |_| {}).expect("modify failed");
rw_guard.update(&mut |_| {}).expect("modify failed");
drop(rw_guard);
assert!(!local_pool.has_element_at(&addr).expect("Invalid address"));
}
@ -901,22 +911,22 @@ mod tests {
let addr2 = local_pool.add(&test_buf_2).expect("Adding data failed");
let addr3 = local_pool.add(&test_buf_3).expect("Adding data failed");
local_pool
.update(&addr0, &mut |buf| {
.update(&addr0, |buf| {
assert_eq!(buf, test_buf_0);
})
.expect("Modifying data failed");
local_pool
.update(&addr1, &mut |buf| {
.update(&addr1, |buf| {
assert_eq!(buf, test_buf_1);
})
.expect("Modifying data failed");
local_pool
.update(&addr2, &mut |buf| {
.update(&addr2, |buf| {
assert_eq!(buf, test_buf_2);
})
.expect("Modifying data failed");
local_pool
.update(&addr3, &mut |buf| {
.update(&addr3, |buf| {
assert_eq!(buf, test_buf_3);
})
.expect("Modifying data failed");

View File

@ -390,7 +390,7 @@ mod alloc_mod {
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub mod std_mod {
use crate::pool::{PoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr};
use crate::pool::{PoolProvider, PoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr};
use crate::pus::verification::{
StdVerifReporterWithSender, TcStateAccepted, VerificationToken,
};
@ -789,12 +789,15 @@ pub mod std_mod {
.shared_tc_store
.write()
.map_err(|_| PusPacketHandlingError::EcssTmtc(EcssTmtcError::StoreLock))?;
let tc_guard = tc_pool.read_with_guard(addr);
let tc_raw = tc_guard.read().unwrap();
if tc_raw.len() > self.pus_buf.len() {
return Err(PusPacketHandlingError::PusPacketTooLarge(tc_raw.len()));
let tc_size = tc_pool
.len_of_data(&addr)
.map_err(|e| PusPacketHandlingError::EcssTmtc(EcssTmtcError::Store(e)))?;
if tc_size > self.pus_buf.len() {
return Err(PusPacketHandlingError::PusPacketTooLarge(tc_size));
}
self.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw);
let tc_guard = tc_pool.read_with_guard(addr);
// TODO: Proper error handling.
tc_guard.read(&mut self.pus_buf[0..tc_size]).unwrap();
Ok(())
}
}
@ -1077,8 +1080,8 @@ pub mod tests {
assert!(next_msg.is_ok());
let tm_addr = next_msg.unwrap();
let tm_pool = self.tm_pool.0.read().unwrap();
let tm_raw = tm_pool.read(&tm_addr).unwrap();
self.tm_buf[0..tm_raw.len()].copy_from_slice(tm_raw);
let tm_raw = tm_pool.read_as_vec(&tm_addr).unwrap();
self.tm_buf[0..tm_raw.len()].copy_from_slice(&tm_raw);
PusTmReader::new(&self.tm_buf, 7).unwrap().0
}
@ -1095,8 +1098,8 @@ pub mod tests {
assert!(next_msg.is_ok());
let tm_addr = next_msg.unwrap();
let tm_pool = self.tm_pool.0.read().unwrap();
let tm_raw = tm_pool.read(&tm_addr).unwrap();
let tm = PusTmReader::new(tm_raw, 7).unwrap().0;
let tm_raw = tm_pool.read_as_vec(&tm_addr).unwrap();
let tm = PusTmReader::new(&tm_raw, 7).unwrap().0;
assert_eq!(PusPacket::service(&tm), 1);
assert_eq!(PusPacket::subservice(&tm), subservice);
assert_eq!(tm.apid(), TEST_APID);

View File

@ -397,7 +397,6 @@ pub mod alloc_mod {
#[derive(Debug)]
pub struct PusScheduler {
tc_map: BTreeMap<UnixTimestamp, Vec<TcInfo>>,
tc_buf: Vec<u8>,
pub(crate) current_time: UnixTimestamp,
time_margin: Duration,
enabled: bool,
@ -413,14 +412,9 @@ pub mod alloc_mod {
/// added to the current time, it will not be inserted into the schedule.
/// * `tc_buf_size` - Buffer for temporary storage of telecommand packets. This buffer
/// should be large enough to accomodate the largest expected TC packets.
pub fn new(
init_current_time: UnixTimestamp,
time_margin: Duration,
tc_buf_size: usize,
) -> Self {
pub fn new(init_current_time: UnixTimestamp, time_margin: Duration) -> Self {
PusScheduler {
tc_map: Default::default(),
tc_buf: vec![0; tc_buf_size],
current_time: init_current_time,
time_margin,
enabled: true,
@ -430,15 +424,8 @@ pub mod alloc_mod {
/// Like [Self::new], but sets the `init_current_time` parameter to the current system time.
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub fn new_with_current_init_time(
time_margin: Duration,
tc_buf_size: usize,
) -> Result<Self, SystemTimeError> {
Ok(Self::new(
UnixTimestamp::from_now()?,
time_margin,
tc_buf_size,
))
pub fn new_with_current_init_time(time_margin: Duration) -> Result<Self, SystemTimeError> {
Ok(Self::new(UnixTimestamp::from_now()?, time_margin))
}
pub fn num_scheduled_telecommands(&self) -> u64 {
@ -687,7 +674,8 @@ pub mod alloc_mod {
/// Utility method which calls [Self::telecommands_to_release] and then calls a releaser
/// closure for each telecommand which should be released. This function will also delete
/// the telecommands from the holding store after calling the release closure if the user
/// returns [true] from the release closure.
/// returns [true] from the release closure. A buffer must be provided to hold the
/// telecommands for the release process.
///
/// # Arguments
///
@ -697,20 +685,53 @@ pub mod alloc_mod {
/// note that returning false might lead to memory leaks if the TC is not cleared from
/// the store in some other way.
/// * `tc_store` - The holding store of the telecommands.
/// * `tc_buf` - Buffer to hold each telecommand being released.
pub fn release_telecommands_with_buffer<R: FnMut(bool, &TcInfo, &[u8]) -> bool>(
&mut self,
releaser: R,
tc_store: &mut (impl PoolProvider + ?Sized),
tc_buf: &mut [u8],
) -> Result<u64, (u64, StoreError)> {
self.release_telecommands_internal(releaser, tc_store, Some(tc_buf))
}
/// This functions is almost identical to [release_telecommands] but does not require
/// a user provided TC buffer because it will always use the [PoolProvider::read_as_vec]
/// API to read the TC packets. However, this will also perform frequent allocations
/// for all telecommands being released.
pub fn release_telecommands<R: FnMut(bool, &TcInfo, &[u8]) -> bool>(
&mut self,
releaser: R,
tc_store: &mut (impl PoolProvider + ?Sized),
) -> Result<u64, (u64, StoreError)> {
self.release_telecommands_internal(releaser, tc_store, None)
}
fn release_telecommands_internal<R: FnMut(bool, &TcInfo, &[u8]) -> bool>(
&mut self,
mut releaser: R,
tc_store: &mut (impl PoolProvider + ?Sized),
mut tc_buf: Option<&mut [u8]>,
) -> Result<u64, (u64, StoreError)> {
let tcs_to_release = self.telecommands_to_release();
let mut released_tcs = 0;
let mut store_error = Ok(());
for tc in tcs_to_release {
for info in tc.1 {
let tc = tc_store
.read(&info.addr, &mut self.tc_buf)
.map_err(|e| (released_tcs, e))?;
let should_delete = releaser(self.enabled, info, &self.tc_buf);
let should_delete = match tc_buf.as_mut() {
Some(buf) => {
tc_store
.read(&info.addr, buf)
.map_err(|e| (released_tcs, e))?;
releaser(self.enabled, info, buf)
}
None => {
let tc = tc_store
.read_as_vec(&info.addr)
.map_err(|e| (released_tcs, e))?;
releaser(self.enabled, info, &tc)
}
};
released_tcs += 1;
if should_delete {
let res = tc_store.delete(info.addr);
@ -736,15 +757,16 @@ pub mod alloc_mod {
&mut self,
mut releaser: R,
tc_store: &(impl PoolProvider + ?Sized),
tc_buf: &mut [u8],
) -> Result<Vec<TcInfo>, (Vec<TcInfo>, StoreError)> {
let tcs_to_release = self.telecommands_to_release();
let mut released_tcs = Vec::new();
for tc in tcs_to_release {
for info in tc.1 {
let tc = tc_store
.read(&info.addr, &mut self.tc_buf)
tc_store
.read(&info.addr, tc_buf)
.map_err(|e| (released_tcs.clone(), e))?;
releaser(self.is_enabled(), info, &self.tc_buf);
releaser(self.is_enabled(), info, tc_buf);
released_tcs.push(*info);
}
}
@ -910,11 +932,8 @@ mod tests {
#[test]
fn test_enable_api() {
let mut scheduler = PusScheduler::new(
UnixTimestamp::new_only_seconds(0),
Duration::from_secs(5),
256,
);
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
assert!(scheduler.is_enabled());
scheduler.disable();
assert!(!scheduler.is_enabled());
@ -925,11 +944,8 @@ mod tests {
#[test]
fn test_reset() {
let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)]));
let mut scheduler = PusScheduler::new(
UnixTimestamp::new_only_seconds(0),
Duration::from_secs(5),
64,
);
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None);
@ -971,11 +987,8 @@ mod tests {
#[test]
fn insert_multi_with_same_time() {
let mut scheduler = PusScheduler::new(
UnixTimestamp::new_only_seconds(0),
Duration::from_secs(5),
128,
);
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
scheduler
.insert_unwrapped_and_stored_tc(
@ -1034,11 +1047,8 @@ mod tests {
#[test]
fn test_time_update() {
let mut scheduler = PusScheduler::new(
UnixTimestamp::new_only_seconds(0),
Duration::from_secs(5),
128,
);
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let time = UnixTimestamp::new(1, 2).unwrap();
scheduler.update_time(time);
assert_eq!(scheduler.current_time(), &time);
@ -1086,11 +1096,8 @@ mod tests {
#[test]
fn test_release_telecommands() {
let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)]));
let mut scheduler = PusScheduler::new(
UnixTimestamp::new_only_seconds(0),
Duration::from_secs(5),
128,
);
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None);
@ -1113,8 +1120,9 @@ mod tests {
// test 1: too early, no tcs
scheduler.update_time(UnixTimestamp::new_only_seconds(99));
let mut tc_buf: [u8; 128] = [0; 128];
scheduler
.release_telecommands(&mut test_closure_1, &mut pool)
.release_telecommands_with_buffer(&mut test_closure_1, &mut pool, &mut tc_buf)
.expect("deletion failed");
// test 2: exact time stamp of tc, releases 1 tc
@ -1136,7 +1144,7 @@ mod tests {
scheduler.update_time(UnixTimestamp::new_only_seconds(206));
released = scheduler
.release_telecommands(&mut test_closure_2, &mut pool)
.release_telecommands_with_buffer(&mut test_closure_2, &mut pool, &mut tc_buf)
.expect("deletion failed");
assert_eq!(released, 1);
// TC is deleted.
@ -1154,11 +1162,8 @@ mod tests {
#[test]
fn release_multi_with_same_time() {
let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)]));
let mut scheduler = PusScheduler::new(
UnixTimestamp::new_only_seconds(0),
Duration::from_secs(5),
128,
);
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None);
@ -1185,9 +1190,10 @@ mod tests {
// test 1: too early, no tcs
scheduler.update_time(UnixTimestamp::new_only_seconds(99));
let mut tc_buf: [u8; 128] = [0; 128];
let mut released = scheduler
.release_telecommands(&mut test_closure, &mut pool)
.release_telecommands_with_buffer(&mut test_closure, &mut pool, &mut tc_buf)
.expect("deletion failed");
assert_eq!(released, 0);
@ -1214,11 +1220,8 @@ mod tests {
#[test]
fn release_with_scheduler_disabled() {
let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)]));
let mut scheduler = PusScheduler::new(
UnixTimestamp::new_only_seconds(0),
Duration::from_secs(5),
128,
);
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
scheduler.disable();
@ -1240,11 +1243,13 @@ mod tests {
true
};
let mut tc_buf: [u8; 128] = [0; 128];
// test 1: too early, no tcs
scheduler.update_time(UnixTimestamp::new_only_seconds(99));
scheduler
.release_telecommands(&mut test_closure_1, &mut pool)
.release_telecommands_with_buffer(&mut test_closure_1, &mut pool, &mut tc_buf)
.expect("deletion failed");
// test 2: exact time stamp of tc, releases 1 tc
@ -1281,11 +1286,8 @@ mod tests {
#[test]
fn insert_unwrapped_tc() {
let mut scheduler = PusScheduler::new(
UnixTimestamp::new_only_seconds(0),
Duration::from_secs(5),
128,
);
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)]));
let mut buf: [u8; 32] = [0; 32];
@ -1302,7 +1304,7 @@ mod tests {
assert!(pool.has_element_at(&tc_info_0.addr()).unwrap());
let mut read_buf: [u8; 64] = [0; 64];
let data = pool.read(&tc_info_0.addr(), &mut read_buf).unwrap();
pool.read(&tc_info_0.addr(), &mut read_buf).unwrap();
let check_tc = PusTcReader::new(&read_buf).expect("incorrect Pus tc raw data");
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None));
@ -1324,18 +1326,16 @@ mod tests {
.release_telecommands(&mut test_closure, &mut pool)
.unwrap();
let data = pool.read(&addr_vec[0], &mut read_buf).unwrap();
let read_len = pool.read(&addr_vec[0], &mut read_buf).unwrap();
let check_tc = PusTcReader::new(&read_buf).expect("incorrect Pus tc raw data");
assert_eq!(read_len, check_tc.1);
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None));
}
#[test]
fn insert_wrapped_tc() {
let mut scheduler = PusScheduler::new(
UnixTimestamp::new_only_seconds(0),
Duration::from_secs(5),
128,
);
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)]));
@ -1351,8 +1351,9 @@ mod tests {
assert!(pool.has_element_at(&info.addr).unwrap());
let data = pool.read(&info.addr, &mut buf).unwrap();
let read_len = pool.read(&info.addr, &mut buf).unwrap();
let check_tc = PusTcReader::new(&buf).expect("incorrect Pus tc raw data");
assert_eq!(read_len, check_tc.1);
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None));
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
@ -1369,22 +1370,22 @@ mod tests {
false
};
let mut tc_buf: [u8; 64] = [0; 64];
scheduler
.release_telecommands(&mut test_closure, &mut pool)
.release_telecommands_with_buffer(&mut test_closure, &mut pool, &mut tc_buf)
.unwrap();
let data = pool.read(&addr_vec[0], &mut buf).unwrap();
let read_len = pool.read(&addr_vec[0], &mut buf).unwrap();
let check_tc = PusTcReader::new(&buf).expect("incorrect PUS tc raw data");
assert_eq!(read_len, check_tc.1);
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None));
}
#[test]
fn insert_wrong_service() {
let mut scheduler = PusScheduler::new(
UnixTimestamp::new_only_seconds(0),
Duration::from_secs(5),
128,
);
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(10, 32), (5, 64)]));
@ -1944,8 +1945,9 @@ mod tests {
scheduler.update_time(UnixTimestamp::new_only_seconds(205));
let mut tc_buf: [u8; 64] = [0; 64];
let tc_info_vec = scheduler
.release_telecommands_no_deletion(&mut test_closure_1, &pool)
.release_telecommands_no_deletion(&mut test_closure_1, &pool, &mut tc_buf)
.expect("deletion failed");
assert_eq!(tc_info_vec[0], tc_info_0);
assert_eq!(tc_info_vec[1], tc_info_1);

View File

@ -15,7 +15,7 @@
//! ```
//! use std::sync::{Arc, mpsc, RwLock};
//! use std::time::Duration;
//! use satrs_core::pool::{PoolProviderMemInPlaceWithGuards, StaticMemoryPool, StaticPoolConfig};
//! use satrs_core::pool::{PoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig};
//! use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender};
//! use satrs_core::seq_count::SeqCountProviderSimple;
//! use satrs_core::pus::MpscTmInSharedPoolSender;
@ -56,9 +56,7 @@
//! {
//! let mut rg = tm_store.write().expect("Error locking shared pool");
//! let store_guard = rg.read_with_guard(addr);
//! let slice = store_guard.read().expect("Error reading TM slice");
//! tm_len = slice.len();
//! tm_buf[0..tm_len].copy_from_slice(slice);
//! tm_len = store_guard.read(&mut tm_buf).expect("Error reading TM slice");
//! }
//! let (pus_tm, _) = PusTmReader::new(&tm_buf[0..tm_len], 7)
//! .expect("Error reading verification TM");
@ -1547,7 +1545,7 @@ mod tests {
let mut sender = TestSender::default();
let fail_code = EcssEnumU16::new(2);
let fail_params = FailParams::new(Some(stamp_buf.as_slice()), &fail_code, None);
b.vr.acceptance_failure(tok, &mut sender, fail_params)
b.vr.acceptance_failure(tok, &sender, fail_params)
.expect("Sending acceptance success failed");
acceptance_fail_check(&mut sender, tok.req_id, stamp_buf);
}
@ -1682,12 +1680,10 @@ mod tests {
);
let accepted_token =
b.vr.acceptance_success(tok, &mut sender, Some(&EMPTY_STAMP))
b.vr.acceptance_success(tok, &sender, Some(&EMPTY_STAMP))
.expect("Sending acceptance success failed");
let empty =
b.vr.start_failure(accepted_token, &mut sender, fail_params)
.expect("Start failure failure");
assert_eq!(empty, ());
b.vr.start_failure(accepted_token, &mut sender, fail_params)
.expect("Start failure failure");
start_fail_check(&mut sender, tok.req_id, fail_data_raw);
}
@ -1779,23 +1775,23 @@ mod tests {
let mut sender = TestSender::default();
let accepted_token = b
.rep()
.acceptance_success(tok, &mut sender, Some(&EMPTY_STAMP))
.acceptance_success(tok, &sender, Some(&EMPTY_STAMP))
.expect("Sending acceptance success failed");
let started_token = b
.rep()
.start_success(accepted_token, &mut sender, Some(&[0, 1, 0, 1, 0, 1, 0]))
.start_success(accepted_token, &sender, Some(&[0, 1, 0, 1, 0, 1, 0]))
.expect("Sending start success failed");
b.rep()
.step_success(
&started_token,
&mut sender,
&sender,
Some(&EMPTY_STAMP),
EcssEnumU8::new(0),
)
.expect("Sending step 0 success failed");
b.vr.step_success(
&started_token,
&mut sender,
&sender,
Some(&EMPTY_STAMP),
EcssEnumU8::new(1),
)
@ -2176,9 +2172,9 @@ mod tests {
{
let mut rg = shared_tm_pool.write().expect("Error locking shared pool");
let store_guard = rg.read_with_guard(addr);
let slice = store_guard.read().expect("Error reading TM slice");
tm_len = slice.len();
tm_buf[0..tm_len].copy_from_slice(slice);
tm_len = store_guard
.read(&mut tm_buf)
.expect("Error reading TM slice");
}
let (pus_tm, _) =
PusTmReader::new(&tm_buf[0..tm_len], 7).expect("Error reading verification TM");

View File

@ -35,7 +35,7 @@ pub mod std_mod {
pub fn add_pus_tm(&self, pus_tm: &PusTmCreator) -> Result<StoreAddr, EcssTmtcError> {
let mut pg = self.0.write().map_err(|_| EcssTmtcError::StoreLock)?;
let addr = pg.free_element(pus_tm.len_written(), &mut |buf| {
let addr = pg.free_element(pus_tm.len_written(), |buf| {
pus_tm
.write_to_bytes(buf)
.expect("writing PUS TM to store failed");

View File

@ -25,8 +25,10 @@ fn threaded_usage() {
addr = rx.recv().expect("Receiving store address failed");
let mut pool_access = shared_clone.write().unwrap();
let pg = PoolGuard::new(pool_access.deref_mut(), addr);
let read_res = pg.read().expect("Reading failed");
assert_eq!(read_res, DUMMY_DATA);
let mut read_buf: [u8; 4] = [0; 4];
let read_bytes = pg.read(&mut read_buf).expect("Reading failed");
assert_eq!(read_buf, DUMMY_DATA);
assert_eq!(read_bytes, 4);
}
let pool_access = shared_clone.read().unwrap();
assert!(!pool_access.has_element_at(&addr).expect("Invalid address"));

View File

@ -58,15 +58,21 @@ pub mod crossbeam_test {
let tc_header = PusTcSecondaryHeader::new_simple(17, 1);
let pus_tc_0 = PusTcCreator::new_no_app_data(&mut sph, tc_header, true);
req_id_0 = RequestId::new(&pus_tc_0);
let (addr, buf) = tc_guard.free_element(pus_tc_0.len_written()).unwrap();
pus_tc_0.write_to_bytes(buf).unwrap();
let addr = tc_guard
.free_element(pus_tc_0.len_written(), |buf| {
pus_tc_0.write_to_bytes(buf).unwrap();
})
.unwrap();
tx_tc_0.send(addr).unwrap();
let mut sph = SpHeader::tc_unseg(TEST_APID, 1, 0).unwrap();
let tc_header = PusTcSecondaryHeader::new_simple(5, 1);
let pus_tc_1 = PusTcCreator::new_no_app_data(&mut sph, tc_header, true);
req_id_1 = RequestId::new(&pus_tc_1);
let (addr, buf) = tc_guard.free_element(pus_tc_0.len_written()).unwrap();
pus_tc_1.write_to_bytes(buf).unwrap();
let addr = tc_guard
.free_element(pus_tc_0.len_written(), |buf| {
pus_tc_1.write_to_bytes(buf).unwrap();
})
.unwrap();
tx_tc_1.send(addr).unwrap();
}
let verif_sender_0 = thread::spawn(move || {
@ -78,9 +84,7 @@ pub mod crossbeam_test {
{
let mut tc_guard = shared_tc_pool_0.write().unwrap();
let pg = tc_guard.read_with_guard(tc_addr);
let buf = pg.read().unwrap();
tc_len = buf.len();
tc_buf[0..tc_len].copy_from_slice(buf);
tc_len = pg.read(&mut tc_buf).unwrap();
}
let (_tc, _) = PusTcReader::new(&tc_buf[0..tc_len]).unwrap();
@ -116,9 +120,7 @@ pub mod crossbeam_test {
{
let mut tc_guard = shared_tc_pool_1.write().unwrap();
let pg = tc_guard.read_with_guard(tc_addr);
let buf = pg.read().unwrap();
tc_len = buf.len();
tc_buf[0..tc_len].copy_from_slice(buf);
tc_len = pg.read(&mut tc_buf).unwrap();
}
let (tc, _) = PusTcReader::new(&tc_buf[0..tc_len]).unwrap();
let token = reporter_with_sender_1.add_tc(&tc);
@ -148,9 +150,9 @@ pub mod crossbeam_test {
{
let mut rg = shared_tm_store.write().expect("Error locking shared pool");
let store_guard = rg.read_with_guard(verif_addr);
let slice = store_guard.read().expect("Error reading TM slice");
tm_len = slice.len();
tm_buf[0..tm_len].copy_from_slice(slice);
tm_len = store_guard
.read(&mut tm_buf)
.expect("Error reading TM slice");
}
let (pus_tm, _) =
PusTmReader::new(&tm_buf[0..tm_len], 7).expect("Error reading verification TM");

View File

@ -54,6 +54,7 @@ impl TcReleaser for mpsc::Sender<Vec<u8>> {
pub struct Pus11Wrapper<TcInMemConverter: EcssTcInMemConverter> {
pub pus_11_handler: PusService11SchedHandler<TcInMemConverter, PusScheduler>,
pub sched_tc_pool: StaticMemoryPool,
pub releaser_buf: [u8; 4096],
pub tc_releaser: Box<dyn TcReleaser + Send>,
}
@ -70,7 +71,11 @@ impl<TcInMemConverter: EcssTcInMemConverter> Pus11Wrapper<TcInMemConverter> {
let released_tcs = self
.pus_11_handler
.scheduler_mut()
.release_telecommands(&mut self.tc_releaser.release, &mut self.sched_tc_pool)
.release_telecommands_with_buffer(
releaser,
&mut self.sched_tc_pool,
&mut self.releaser_buf,
)
.expect("releasing TCs failed");
if released_tcs > 0 {
info!("{released_tcs} TC(s) released from scheduler");
@ -136,6 +141,7 @@ pub fn create_scheduler_service_static(
Pus11Wrapper {
pus_11_handler,
sched_tc_pool,
releaser_buf: [0; 4096],
tc_releaser: Box::new(tc_releaser),
}
}
@ -172,6 +178,7 @@ pub fn create_scheduler_service_dynamic(
Pus11Wrapper {
pus_11_handler,
sched_tc_pool,
releaser_buf: [0; 4096],
tc_releaser: Box::new(tc_source_sender),
}
}

View File

@ -3,6 +3,7 @@ use std::{
sync::mpsc::{Receiver, Sender},
};
use log::info;
use satrs_core::{
pool::{PoolProvider, StoreAddr},
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
@ -63,9 +64,14 @@ impl TmFunnelCommon {
*entry += 1;
}
Self::packet_printout(&zero_copy_writer);
// This operation has to come last!
zero_copy_writer.finish();
}
fn packet_printout(tm: &PusTmZeroCopyWriter) {
info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice());
}
}
pub struct TmFunnelStatic {
@ -96,16 +102,21 @@ impl TmFunnelStatic {
// the CRC.
let shared_pool = self.shared_tm_store.clone_backing_pool();
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
let tm_raw = pool_guard
.update(&addr)
let mut tm_copy = Vec::new();
pool_guard
.update(&addr, |buf| {
let zero_copy_writer = PusTmZeroCopyWriter::new(buf, MIN_CDS_FIELD_LEN)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
tm_copy = buf.to_vec()
})
.expect("Reading TM from pool failed");
let zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw, MIN_CDS_FIELD_LEN)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
self.tm_server_tx
.send(addr)
.expect("Sending TM to server failed");
self.common.sync_tm_tcp_source.add_tm(tm_raw);
// We could also do this step in the update closure, but I'd rather avoid this, could
// lead to nested locking.
self.common.sync_tm_tcp_source.add_tm(&tm_copy);
}
}
}

View File

@ -28,8 +28,9 @@ pub struct SharedTcPool {
impl SharedTcPool {
pub fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result<StoreAddr, StoreError> {
let mut pg = self.pool.write().expect("error locking TC store");
let (addr, buf) = pg.free_element(pus_tc.len_packed())?;
buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data());
let addr = pg.free_element(pus_tc.len_packed(), |buf| {
buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data());
})?;
Ok(addr)
}
}
@ -125,8 +126,8 @@ impl TcSourceTaskStatic {
.pool
.read()
.expect("locking tc pool failed");
let data = pool.read(&addr).expect("reading pool failed");
self.tc_buf[0..data.len()].copy_from_slice(data);
pool.read(&addr, &mut self.tc_buf)
.expect("reading pool failed");
drop(pool);
match PusTcReader::new(&self.tc_buf) {
Ok((pus_tc, _)) => {

View File

@ -29,20 +29,13 @@ impl UdpTmHandler for StaticUdpTmHandler {
}
let mut store_lock = store_lock.unwrap();
let pg = store_lock.read_with_guard(addr);
let read_res = pg.read();
let read_res = pg.read_as_vec();
if read_res.is_err() {
warn!("Error reading TM pool data");
continue;
}
let buf = read_res.unwrap();
if buf.len() > 9 {
let service = buf[7];
let subservice = buf[8];
info!("Sending PUS TM[{service},{subservice}]")
} else {
info!("Sending PUS TM");
}
let result = socket.send_to(buf, recv_addr);
let result = socket.send_to(&buf, recv_addr);
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}