Merge pull request 'Refactor and improve pool abstraction' (#104) from refactor-pool-impl into main
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
Reviewed-on: #104
This commit is contained in:
commit
3c7113c231
@ -3,11 +3,12 @@
|
||||
//! # Example for the [StaticMemoryPool]
|
||||
//!
|
||||
//! ```
|
||||
//! use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StaticPoolConfig};
|
||||
//! use satrs_core::pool::{PoolProvider, StaticMemoryPool, StaticPoolConfig};
|
||||
//!
|
||||
//! // 4 buckets of 4 bytes, 2 of 8 bytes and 1 of 16 bytes
|
||||
//! let pool_cfg = StaticPoolConfig::new(vec![(4, 4), (2, 8), (1, 16)]);
|
||||
//! let mut local_pool = StaticMemoryPool::new(pool_cfg);
|
||||
//! let mut read_buf: [u8; 16] = [0; 16];
|
||||
//! let mut addr;
|
||||
//! {
|
||||
//! // Add new data to the pool
|
||||
@ -20,25 +21,25 @@
|
||||
//!
|
||||
//! {
|
||||
//! // Read the store data back
|
||||
//! let res = local_pool.read(&addr);
|
||||
//! let res = local_pool.read(&addr, &mut read_buf);
|
||||
//! assert!(res.is_ok());
|
||||
//! let buf_read_back = res.unwrap();
|
||||
//! assert_eq!(buf_read_back.len(), 4);
|
||||
//! assert_eq!(buf_read_back[0], 42);
|
||||
//! let read_bytes = res.unwrap();
|
||||
//! assert_eq!(read_bytes, 4);
|
||||
//! assert_eq!(read_buf[0], 42);
|
||||
//! // Modify the stored data
|
||||
//! let res = local_pool.modify(&addr);
|
||||
//! let res = local_pool.modify(&addr, |buf| {
|
||||
//! buf[0] = 12;
|
||||
//! });
|
||||
//! assert!(res.is_ok());
|
||||
//! let buf_read_back = res.unwrap();
|
||||
//! buf_read_back[0] = 12;
|
||||
//! }
|
||||
//!
|
||||
//! {
|
||||
//! // Read the modified data back
|
||||
//! let res = local_pool.read(&addr);
|
||||
//! let res = local_pool.read(&addr, &mut read_buf);
|
||||
//! assert!(res.is_ok());
|
||||
//! let buf_read_back = res.unwrap();
|
||||
//! assert_eq!(buf_read_back.len(), 4);
|
||||
//! assert_eq!(buf_read_back[0], 12);
|
||||
//! let read_bytes = res.unwrap();
|
||||
//! assert_eq!(read_bytes, 4);
|
||||
//! assert_eq!(read_buf[0], 12);
|
||||
//! }
|
||||
//!
|
||||
//! // Delete the stored data
|
||||
@ -46,21 +47,21 @@
|
||||
//!
|
||||
//! // Get a free element in the pool with an appropriate size
|
||||
//! {
|
||||
//! let res = local_pool.free_element(12);
|
||||
//! let res = local_pool.free_element(12, |buf| {
|
||||
//! buf[0] = 7;
|
||||
//! });
|
||||
//! assert!(res.is_ok());
|
||||
//! let (tmp, mut_buf) = res.unwrap();
|
||||
//! addr = tmp;
|
||||
//! mut_buf[0] = 7;
|
||||
//! addr = res.unwrap();
|
||||
//! }
|
||||
//!
|
||||
//! // Read back the data
|
||||
//! {
|
||||
//! // Read the store data back
|
||||
//! let res = local_pool.read(&addr);
|
||||
//! let res = local_pool.read(&addr, &mut read_buf);
|
||||
//! assert!(res.is_ok());
|
||||
//! let buf_read_back = res.unwrap();
|
||||
//! assert_eq!(buf_read_back.len(), 12);
|
||||
//! assert_eq!(buf_read_back[0], 7);
|
||||
//! let read_bytes = res.unwrap();
|
||||
//! assert_eq!(read_bytes, 12);
|
||||
//! assert_eq!(read_buf[0], 7);
|
||||
//! }
|
||||
//! ```
|
||||
#[cfg(feature = "alloc")]
|
||||
@ -70,6 +71,7 @@ use core::fmt::{Display, Formatter};
|
||||
use delegate::delegate;
|
||||
#[cfg(feature = "serde")]
|
||||
use serde::{Deserialize, Serialize};
|
||||
use spacepackets::ByteConversionError;
|
||||
#[cfg(feature = "std")]
|
||||
use std::error::Error;
|
||||
|
||||
@ -151,6 +153,8 @@ pub enum StoreError {
|
||||
InvalidStoreId(StoreIdError, Option<StoreAddr>),
|
||||
/// Valid subpool and packet index, but no data is stored at the given address
|
||||
DataDoesNotExist(StoreAddr),
|
||||
ByteConversionError(spacepackets::ByteConversionError),
|
||||
LockError,
|
||||
/// Internal or configuration errors
|
||||
InternalError(u32),
|
||||
}
|
||||
@ -173,10 +177,22 @@ impl Display for StoreError {
|
||||
StoreError::InternalError(e) => {
|
||||
write!(f, "internal error: {e}")
|
||||
}
|
||||
StoreError::ByteConversionError(e) => {
|
||||
write!(f, "store error: {e}")
|
||||
}
|
||||
StoreError::LockError => {
|
||||
write!(f, "lock error")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ByteConversionError> for StoreError {
|
||||
fn from(value: ByteConversionError) -> Self {
|
||||
Self::ByteConversionError(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl Error for StoreError {
|
||||
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
||||
@ -189,39 +205,54 @@ impl Error for StoreError {
|
||||
|
||||
/// Generic trait for pool providers where the data can be modified and read in-place. This
|
||||
/// generally means that a shared pool structure has to be wrapped inside a lock structure.
|
||||
pub trait PoolProviderMemInPlace {
|
||||
pub trait PoolProvider {
|
||||
/// Add new data to the pool. The provider should attempt to reserve a memory block with the
|
||||
/// appropriate size and then copy the given data to the block. Yields a [StoreAddr] which can
|
||||
/// be used to access the data stored in the pool
|
||||
fn add(&mut self, data: &[u8]) -> Result<StoreAddr, StoreError>;
|
||||
|
||||
/// The provider should attempt to reserve a free memory block with the appropriate size and
|
||||
/// then return a mutable reference to it. Yields a [StoreAddr] which can be used to access
|
||||
/// the data stored in the pool
|
||||
fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError>;
|
||||
/// The provider should attempt to reserve a free memory block with the appropriate size first.
|
||||
/// It then executes a user-provided closure and passes a mutable reference to that memory
|
||||
/// block to the closure. This allows the user to write data to the memory block.
|
||||
/// The function should yield a [StoreAddr] which can be used to access the data stored in the
|
||||
/// pool.
|
||||
fn free_element<W: FnMut(&mut [u8])>(
|
||||
&mut self,
|
||||
len: usize,
|
||||
writer: W,
|
||||
) -> Result<StoreAddr, StoreError>;
|
||||
|
||||
/// Modify data added previously using a given [StoreAddr] by yielding a mutable reference
|
||||
/// to it
|
||||
fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError>;
|
||||
/// Modify data added previously using a given [StoreAddr]. The provider should use the store
|
||||
/// address to determine if a memory block exists for that address. If it does, it should
|
||||
/// call the user-provided closure and pass a mutable reference to the memory block
|
||||
/// to the closure. This allows the user to modify the memory block.
|
||||
fn modify<U: FnMut(&mut [u8])>(
|
||||
&mut self,
|
||||
addr: &StoreAddr,
|
||||
updater: U,
|
||||
) -> Result<(), StoreError>;
|
||||
|
||||
/// Read data by yielding a read-only reference given a [StoreAddr]
|
||||
fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError>;
|
||||
/// The provider should copy the data from the memory block to the user-provided buffer if
|
||||
/// it exists.
|
||||
fn read(&self, addr: &StoreAddr, buf: &mut [u8]) -> Result<usize, StoreError>;
|
||||
|
||||
/// Delete data inside the pool given a [StoreAddr]
|
||||
/// Delete data inside the pool given a [StoreAddr].
|
||||
fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError>;
|
||||
fn has_element_at(&self, addr: &StoreAddr) -> Result<bool, StoreError>;
|
||||
|
||||
/// Retrieve the length of the data at the given store address.
|
||||
fn len_of_data(&self, addr: &StoreAddr) -> Result<usize, StoreError> {
|
||||
if !self.has_element_at(addr)? {
|
||||
return Err(StoreError::DataDoesNotExist(*addr));
|
||||
}
|
||||
Ok(self.read(addr)?.len())
|
||||
fn len_of_data(&self, addr: &StoreAddr) -> Result<usize, StoreError>;
|
||||
|
||||
#[cfg(feature = "alloc")]
|
||||
fn read_as_vec(&self, addr: &StoreAddr) -> Result<alloc::vec::Vec<u8>, StoreError> {
|
||||
let mut vec = alloc::vec![0; self.len_of_data(addr)?];
|
||||
self.read(addr, &mut vec)?;
|
||||
Ok(vec)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait PoolProviderMemInPlaceWithGuards: PoolProviderMemInPlace {
|
||||
/// This function behaves like [PoolProviderMemInPlace::read], but consumes the provided address
|
||||
pub trait PoolProviderWithGuards: PoolProvider {
|
||||
/// This function behaves like [PoolProvider::read], but consumes the provided address
|
||||
/// and returns a RAII conformant guard object.
|
||||
///
|
||||
/// Unless the guard [PoolRwGuard::release] method is called, the data for the
|
||||
@ -231,7 +262,7 @@ pub trait PoolProviderMemInPlaceWithGuards: PoolProviderMemInPlace {
|
||||
/// manual deletion is necessary when returning from a processing function prematurely.
|
||||
fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard<Self>;
|
||||
|
||||
/// This function behaves like [PoolProviderMemInPlace::modify], but consumes the provided
|
||||
/// This function behaves like [PoolProvider::modify], but consumes the provided
|
||||
/// address and returns a RAII conformant guard object.
|
||||
///
|
||||
/// Unless the guard [PoolRwGuard::release] method is called, the data for the
|
||||
@ -242,7 +273,7 @@ pub trait PoolProviderMemInPlaceWithGuards: PoolProviderMemInPlace {
|
||||
fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard<Self>;
|
||||
}
|
||||
|
||||
pub struct PoolGuard<'a, MemProvider: PoolProviderMemInPlace + ?Sized> {
|
||||
pub struct PoolGuard<'a, MemProvider: PoolProvider + ?Sized> {
|
||||
pool: &'a mut MemProvider,
|
||||
pub addr: StoreAddr,
|
||||
no_deletion: bool,
|
||||
@ -250,7 +281,7 @@ pub struct PoolGuard<'a, MemProvider: PoolProviderMemInPlace + ?Sized> {
|
||||
}
|
||||
|
||||
/// This helper object
|
||||
impl<'a, MemProvider: PoolProviderMemInPlace> PoolGuard<'a, MemProvider> {
|
||||
impl<'a, MemProvider: PoolProvider> PoolGuard<'a, MemProvider> {
|
||||
pub fn new(pool: &'a mut MemProvider, addr: StoreAddr) -> Self {
|
||||
Self {
|
||||
pool,
|
||||
@ -260,8 +291,13 @@ impl<'a, MemProvider: PoolProviderMemInPlace> PoolGuard<'a, MemProvider> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read(&self) -> Result<&[u8], StoreError> {
|
||||
self.pool.read(&self.addr)
|
||||
pub fn read(&self, buf: &mut [u8]) -> Result<usize, StoreError> {
|
||||
self.pool.read(&self.addr, buf)
|
||||
}
|
||||
|
||||
#[cfg(feature = "alloc")]
|
||||
pub fn read_as_vec(&self) -> Result<alloc::vec::Vec<u8>, StoreError> {
|
||||
self.pool.read_as_vec(&self.addr)
|
||||
}
|
||||
|
||||
/// Releasing the pool guard will disable the automatic deletion of the data when the guard
|
||||
@ -271,7 +307,7 @@ impl<'a, MemProvider: PoolProviderMemInPlace> PoolGuard<'a, MemProvider> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<MemProvider: PoolProviderMemInPlace + ?Sized> Drop for PoolGuard<'_, MemProvider> {
|
||||
impl<MemProvider: PoolProvider + ?Sized> Drop for PoolGuard<'_, MemProvider> {
|
||||
fn drop(&mut self) {
|
||||
if !self.no_deletion {
|
||||
if let Err(e) = self.pool.delete(self.addr) {
|
||||
@ -281,24 +317,24 @@ impl<MemProvider: PoolProviderMemInPlace + ?Sized> Drop for PoolGuard<'_, MemPro
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PoolRwGuard<'a, MemProvider: PoolProviderMemInPlace + ?Sized> {
|
||||
pub struct PoolRwGuard<'a, MemProvider: PoolProvider + ?Sized> {
|
||||
guard: PoolGuard<'a, MemProvider>,
|
||||
}
|
||||
|
||||
impl<'a, MemProvider: PoolProviderMemInPlace> PoolRwGuard<'a, MemProvider> {
|
||||
impl<'a, MemProvider: PoolProvider> PoolRwGuard<'a, MemProvider> {
|
||||
pub fn new(pool: &'a mut MemProvider, addr: StoreAddr) -> Self {
|
||||
Self {
|
||||
guard: PoolGuard::new(pool, addr),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn modify(&mut self) -> Result<&mut [u8], StoreError> {
|
||||
self.guard.pool.modify(&self.guard.addr)
|
||||
pub fn update<U: FnMut(&mut [u8])>(&mut self, updater: &mut U) -> Result<(), StoreError> {
|
||||
self.guard.pool.modify(&self.guard.addr, updater)
|
||||
}
|
||||
|
||||
delegate!(
|
||||
to self.guard {
|
||||
pub fn read(&self) -> Result<&[u8], StoreError>;
|
||||
pub fn read(&self, buf: &mut [u8]) -> Result<usize, StoreError>;
|
||||
/// Releasing the pool guard will disable the automatic deletion of the data when the guard
|
||||
/// is dropped.
|
||||
pub fn release(&mut self);
|
||||
@ -308,13 +344,11 @@ impl<'a, MemProvider: PoolProviderMemInPlace> PoolRwGuard<'a, MemProvider> {
|
||||
|
||||
#[cfg(feature = "alloc")]
|
||||
mod alloc_mod {
|
||||
use super::{
|
||||
PoolGuard, PoolProviderMemInPlace, PoolProviderMemInPlaceWithGuards, PoolRwGuard,
|
||||
StaticPoolAddr,
|
||||
};
|
||||
use super::{PoolGuard, PoolProvider, PoolProviderWithGuards, PoolRwGuard, StaticPoolAddr};
|
||||
use crate::pool::{NumBlocks, StoreAddr, StoreError, StoreIdError};
|
||||
use alloc::vec;
|
||||
use alloc::vec::Vec;
|
||||
use spacepackets::ByteConversionError;
|
||||
#[cfg(feature = "std")]
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
@ -476,7 +510,7 @@ mod alloc_mod {
|
||||
}
|
||||
}
|
||||
|
||||
impl PoolProviderMemInPlace for StaticMemoryPool {
|
||||
impl PoolProvider for StaticMemoryPool {
|
||||
fn add(&mut self, data: &[u8]) -> Result<StoreAddr, StoreError> {
|
||||
let data_len = data.len();
|
||||
if data_len > POOL_MAX_SIZE {
|
||||
@ -487,7 +521,11 @@ mod alloc_mod {
|
||||
Ok(addr.into())
|
||||
}
|
||||
|
||||
fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError> {
|
||||
fn free_element<W: FnMut(&mut [u8])>(
|
||||
&mut self,
|
||||
len: usize,
|
||||
mut writer: W,
|
||||
) -> Result<StoreAddr, StoreError> {
|
||||
if len > POOL_MAX_SIZE {
|
||||
return Err(StoreError::DataTooLarge(len));
|
||||
}
|
||||
@ -495,25 +533,40 @@ mod alloc_mod {
|
||||
let raw_pos = self.raw_pos(&addr).unwrap();
|
||||
let block =
|
||||
&mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + len];
|
||||
Ok((addr.into(), block))
|
||||
writer(block);
|
||||
Ok(addr.into())
|
||||
}
|
||||
|
||||
fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError> {
|
||||
fn modify<U: FnMut(&mut [u8])>(
|
||||
&mut self,
|
||||
addr: &StoreAddr,
|
||||
mut updater: U,
|
||||
) -> Result<(), StoreError> {
|
||||
let addr = StaticPoolAddr::from(*addr);
|
||||
let curr_size = self.addr_check(&addr)?;
|
||||
let raw_pos = self.raw_pos(&addr).unwrap();
|
||||
let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()
|
||||
[raw_pos..raw_pos + curr_size];
|
||||
Ok(block)
|
||||
updater(block);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError> {
|
||||
fn read(&self, addr: &StoreAddr, buf: &mut [u8]) -> Result<usize, StoreError> {
|
||||
let addr = StaticPoolAddr::from(*addr);
|
||||
let curr_size = self.addr_check(&addr)?;
|
||||
if buf.len() < curr_size {
|
||||
return Err(ByteConversionError::ToSliceTooSmall {
|
||||
found: buf.len(),
|
||||
expected: curr_size,
|
||||
}
|
||||
.into());
|
||||
}
|
||||
let raw_pos = self.raw_pos(&addr).unwrap();
|
||||
let block =
|
||||
&self.pool.get(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + curr_size];
|
||||
Ok(block)
|
||||
//block.copy_from_slice(&src);
|
||||
buf[..curr_size].copy_from_slice(block);
|
||||
Ok(curr_size)
|
||||
}
|
||||
|
||||
fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError> {
|
||||
@ -540,9 +593,21 @@ mod alloc_mod {
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn len_of_data(&self, addr: &StoreAddr) -> Result<usize, StoreError> {
|
||||
let addr = StaticPoolAddr::from(*addr);
|
||||
self.validate_addr(&addr)?;
|
||||
let pool_idx = addr.pool_idx as usize;
|
||||
let size_list = self.sizes_lists.get(pool_idx).unwrap();
|
||||
let size = size_list[addr.packet_idx as usize];
|
||||
Ok(match size {
|
||||
STORE_FREE => 0,
|
||||
_ => size,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl PoolProviderMemInPlaceWithGuards for StaticMemoryPool {
|
||||
impl PoolProviderWithGuards for StaticMemoryPool {
|
||||
fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard<Self> {
|
||||
PoolRwGuard::new(self, addr)
|
||||
}
|
||||
@ -556,9 +621,8 @@ mod alloc_mod {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::pool::{
|
||||
PoolGuard, PoolProviderMemInPlace, PoolProviderMemInPlaceWithGuards, PoolRwGuard,
|
||||
StaticMemoryPool, StaticPoolAddr, StaticPoolConfig, StoreError, StoreIdError,
|
||||
POOL_MAX_SIZE,
|
||||
PoolGuard, PoolProvider, PoolProviderWithGuards, PoolRwGuard, StaticMemoryPool,
|
||||
StaticPoolAddr, StaticPoolConfig, StoreError, StoreIdError, POOL_MAX_SIZE,
|
||||
};
|
||||
use std::vec;
|
||||
|
||||
@ -594,13 +658,14 @@ mod tests {
|
||||
for (i, val) in test_buf.iter_mut().enumerate() {
|
||||
*val = i as u8;
|
||||
}
|
||||
let mut other_buf: [u8; 16] = [0; 16];
|
||||
let addr = local_pool.add(&test_buf).expect("Adding data failed");
|
||||
// Read back data and verify correctness
|
||||
let res = local_pool.read(&addr);
|
||||
let res = local_pool.read(&addr, &mut other_buf);
|
||||
assert!(res.is_ok());
|
||||
let buf_read_back = res.unwrap();
|
||||
assert_eq!(buf_read_back.len(), 16);
|
||||
for (i, &val) in buf_read_back.iter().enumerate() {
|
||||
let read_len = res.unwrap();
|
||||
assert_eq!(read_len, 16);
|
||||
for (i, &val) in other_buf.iter().enumerate() {
|
||||
assert_eq!(val, i as u8);
|
||||
}
|
||||
}
|
||||
@ -610,8 +675,10 @@ mod tests {
|
||||
let mut local_pool = basic_small_pool();
|
||||
let test_buf: [u8; 12] = [0; 12];
|
||||
let addr = local_pool.add(&test_buf).expect("Adding data failed");
|
||||
let res = local_pool.read(&addr).expect("Read back failed");
|
||||
assert_eq!(res.len(), 12);
|
||||
let res = local_pool
|
||||
.read(&addr, &mut [0; 12])
|
||||
.expect("Read back failed");
|
||||
assert_eq!(res, 12);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -622,10 +689,13 @@ mod tests {
|
||||
// Delete the data
|
||||
let res = local_pool.delete(addr);
|
||||
assert!(res.is_ok());
|
||||
let mut writer = |buf: &mut [u8]| {
|
||||
assert_eq!(buf.len(), 12);
|
||||
};
|
||||
// Verify that the slot is free by trying to get a reference to it
|
||||
let res = local_pool.free_element(12);
|
||||
let res = local_pool.free_element(12, &mut writer);
|
||||
assert!(res.is_ok());
|
||||
let (addr, buf_ref) = res.unwrap();
|
||||
let addr = res.unwrap();
|
||||
assert_eq!(
|
||||
addr,
|
||||
u64::from(StaticPoolAddr {
|
||||
@ -633,7 +703,6 @@ mod tests {
|
||||
packet_idx: 0
|
||||
})
|
||||
);
|
||||
assert_eq!(buf_ref.len(), 12);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -647,29 +716,34 @@ mod tests {
|
||||
|
||||
{
|
||||
// Verify that the slot is free by trying to get a reference to it
|
||||
let res = local_pool.modify(&addr).expect("Modifying data failed");
|
||||
res[0] = 0;
|
||||
res[1] = 0x42;
|
||||
local_pool
|
||||
.modify(&addr, &mut |buf: &mut [u8]| {
|
||||
buf[0] = 0;
|
||||
buf[1] = 0x42;
|
||||
})
|
||||
.expect("Modifying data failed");
|
||||
}
|
||||
|
||||
let res = local_pool.read(&addr).expect("Reading back data failed");
|
||||
assert_eq!(res[0], 0);
|
||||
assert_eq!(res[1], 0x42);
|
||||
assert_eq!(res[2], 2);
|
||||
assert_eq!(res[3], 3);
|
||||
local_pool
|
||||
.read(&addr, &mut test_buf)
|
||||
.expect("Reading back data failed");
|
||||
assert_eq!(test_buf[0], 0);
|
||||
assert_eq!(test_buf[1], 0x42);
|
||||
assert_eq!(test_buf[2], 2);
|
||||
assert_eq!(test_buf[3], 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_consecutive_reservation() {
|
||||
let mut local_pool = basic_small_pool();
|
||||
// Reserve two smaller blocks consecutively and verify that the third reservation fails
|
||||
let res = local_pool.free_element(8);
|
||||
let res = local_pool.free_element(8, |_| {});
|
||||
assert!(res.is_ok());
|
||||
let (addr0, _) = res.unwrap();
|
||||
let res = local_pool.free_element(8);
|
||||
let addr0 = res.unwrap();
|
||||
let res = local_pool.free_element(8, |_| {});
|
||||
assert!(res.is_ok());
|
||||
let (addr1, _) = res.unwrap();
|
||||
let res = local_pool.free_element(8);
|
||||
let addr1 = res.unwrap();
|
||||
let res = local_pool.free_element(8, |_| {});
|
||||
assert!(res.is_err());
|
||||
let err = res.unwrap_err();
|
||||
assert_eq!(err, StoreError::StoreFull(1));
|
||||
@ -689,6 +763,7 @@ mod tests {
|
||||
pool_idx: 0,
|
||||
}
|
||||
.into(),
|
||||
&mut [],
|
||||
);
|
||||
assert!(res.is_err());
|
||||
assert!(matches!(
|
||||
@ -720,7 +795,7 @@ mod tests {
|
||||
packet_idx: 0,
|
||||
}
|
||||
.into();
|
||||
let res = local_pool.read(&addr);
|
||||
let res = local_pool.read(&addr, &mut []);
|
||||
assert!(res.is_err());
|
||||
let err = res.unwrap_err();
|
||||
assert!(matches!(
|
||||
@ -737,7 +812,7 @@ mod tests {
|
||||
packet_idx: 1,
|
||||
};
|
||||
assert_eq!(addr.raw(), 0x00020001);
|
||||
let res = local_pool.read(&addr.into());
|
||||
let res = local_pool.read(&addr.into(), &mut []);
|
||||
assert!(res.is_err());
|
||||
let err = res.unwrap_err();
|
||||
assert!(matches!(
|
||||
@ -759,7 +834,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_data_too_large_1() {
|
||||
let mut local_pool = basic_small_pool();
|
||||
let res = local_pool.free_element(POOL_MAX_SIZE + 1);
|
||||
let res = local_pool.free_element(POOL_MAX_SIZE + 1, |_| {});
|
||||
assert!(res.is_err());
|
||||
assert_eq!(
|
||||
res.unwrap_err(),
|
||||
@ -771,7 +846,7 @@ mod tests {
|
||||
fn test_free_element_too_large() {
|
||||
let mut local_pool = basic_small_pool();
|
||||
// Try to request a slot which is too large
|
||||
let res = local_pool.free_element(20);
|
||||
let res = local_pool.free_element(20, |_| {});
|
||||
assert!(res.is_err());
|
||||
assert_eq!(res.unwrap_err(), StoreError::DataTooLarge(20));
|
||||
}
|
||||
@ -813,7 +888,7 @@ mod tests {
|
||||
let test_buf: [u8; 16] = [0; 16];
|
||||
let addr = local_pool.add(&test_buf).expect("Adding data failed");
|
||||
let mut rw_guard = PoolRwGuard::new(&mut local_pool, addr);
|
||||
let _ = rw_guard.modify().expect("modify failed");
|
||||
rw_guard.update(&mut |_| {}).expect("modify failed");
|
||||
drop(rw_guard);
|
||||
assert!(!local_pool.has_element_at(&addr).expect("Invalid address"));
|
||||
}
|
||||
@ -824,7 +899,7 @@ mod tests {
|
||||
let test_buf: [u8; 16] = [0; 16];
|
||||
let addr = local_pool.add(&test_buf).expect("Adding data failed");
|
||||
let mut rw_guard = local_pool.modify_with_guard(addr);
|
||||
let _ = rw_guard.modify().expect("modify failed");
|
||||
rw_guard.update(&mut |_| {}).expect("modify failed");
|
||||
drop(rw_guard);
|
||||
assert!(!local_pool.has_element_at(&addr).expect("Invalid address"));
|
||||
}
|
||||
@ -840,13 +915,25 @@ mod tests {
|
||||
let addr1 = local_pool.add(&test_buf_1).expect("Adding data failed");
|
||||
let addr2 = local_pool.add(&test_buf_2).expect("Adding data failed");
|
||||
let addr3 = local_pool.add(&test_buf_3).expect("Adding data failed");
|
||||
let tm0_raw = local_pool.modify(&addr0).expect("Modifying data failed");
|
||||
assert_eq!(tm0_raw, test_buf_0);
|
||||
let tm1_raw = local_pool.modify(&addr1).expect("Modifying data failed");
|
||||
assert_eq!(tm1_raw, test_buf_1);
|
||||
let tm2_raw = local_pool.modify(&addr2).expect("Modifying data failed");
|
||||
assert_eq!(tm2_raw, test_buf_2);
|
||||
let tm3_raw = local_pool.modify(&addr3).expect("Modifying data failed");
|
||||
assert_eq!(tm3_raw, test_buf_3);
|
||||
local_pool
|
||||
.modify(&addr0, |buf| {
|
||||
assert_eq!(buf, test_buf_0);
|
||||
})
|
||||
.expect("Modifying data failed");
|
||||
local_pool
|
||||
.modify(&addr1, |buf| {
|
||||
assert_eq!(buf, test_buf_1);
|
||||
})
|
||||
.expect("Modifying data failed");
|
||||
local_pool
|
||||
.modify(&addr2, |buf| {
|
||||
assert_eq!(buf, test_buf_2);
|
||||
})
|
||||
.expect("Modifying data failed");
|
||||
local_pool
|
||||
.modify(&addr3, |buf| {
|
||||
assert_eq!(buf, test_buf_3);
|
||||
})
|
||||
.expect("Modifying data failed");
|
||||
}
|
||||
}
|
||||
|
@ -390,7 +390,7 @@ mod alloc_mod {
|
||||
#[cfg(feature = "std")]
|
||||
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
|
||||
pub mod std_mod {
|
||||
use crate::pool::{PoolProviderMemInPlaceWithGuards, SharedStaticMemoryPool, StoreAddr};
|
||||
use crate::pool::{PoolProvider, PoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr};
|
||||
use crate::pus::verification::{
|
||||
StdVerifReporterWithSender, TcStateAccepted, VerificationToken,
|
||||
};
|
||||
@ -789,12 +789,15 @@ pub mod std_mod {
|
||||
.shared_tc_store
|
||||
.write()
|
||||
.map_err(|_| PusPacketHandlingError::EcssTmtc(EcssTmtcError::StoreLock))?;
|
||||
let tc_guard = tc_pool.read_with_guard(addr);
|
||||
let tc_raw = tc_guard.read().unwrap();
|
||||
if tc_raw.len() > self.pus_buf.len() {
|
||||
return Err(PusPacketHandlingError::PusPacketTooLarge(tc_raw.len()));
|
||||
let tc_size = tc_pool
|
||||
.len_of_data(&addr)
|
||||
.map_err(|e| PusPacketHandlingError::EcssTmtc(EcssTmtcError::Store(e)))?;
|
||||
if tc_size > self.pus_buf.len() {
|
||||
return Err(PusPacketHandlingError::PusPacketTooLarge(tc_size));
|
||||
}
|
||||
self.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw);
|
||||
let tc_guard = tc_pool.read_with_guard(addr);
|
||||
// TODO: Proper error handling.
|
||||
tc_guard.read(&mut self.pus_buf[0..tc_size]).unwrap();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -947,8 +950,7 @@ pub mod tests {
|
||||
use spacepackets::CcsdsPacket;
|
||||
|
||||
use crate::pool::{
|
||||
PoolProviderMemInPlace, SharedStaticMemoryPool, StaticMemoryPool, StaticPoolConfig,
|
||||
StoreAddr,
|
||||
PoolProvider, SharedStaticMemoryPool, StaticMemoryPool, StaticPoolConfig, StoreAddr,
|
||||
};
|
||||
use crate::pus::verification::RequestId;
|
||||
use crate::tmtc::tm_helper::SharedTmPool;
|
||||
@ -1078,8 +1080,8 @@ pub mod tests {
|
||||
assert!(next_msg.is_ok());
|
||||
let tm_addr = next_msg.unwrap();
|
||||
let tm_pool = self.tm_pool.0.read().unwrap();
|
||||
let tm_raw = tm_pool.read(&tm_addr).unwrap();
|
||||
self.tm_buf[0..tm_raw.len()].copy_from_slice(tm_raw);
|
||||
let tm_raw = tm_pool.read_as_vec(&tm_addr).unwrap();
|
||||
self.tm_buf[0..tm_raw.len()].copy_from_slice(&tm_raw);
|
||||
PusTmReader::new(&self.tm_buf, 7).unwrap().0
|
||||
}
|
||||
|
||||
@ -1096,8 +1098,8 @@ pub mod tests {
|
||||
assert!(next_msg.is_ok());
|
||||
let tm_addr = next_msg.unwrap();
|
||||
let tm_pool = self.tm_pool.0.read().unwrap();
|
||||
let tm_raw = tm_pool.read(&tm_addr).unwrap();
|
||||
let tm = PusTmReader::new(tm_raw, 7).unwrap().0;
|
||||
let tm_raw = tm_pool.read_as_vec(&tm_addr).unwrap();
|
||||
let tm = PusTmReader::new(&tm_raw, 7).unwrap().0;
|
||||
assert_eq!(PusPacket::service(&tm), 1);
|
||||
assert_eq!(PusPacket::subservice(&tm), subservice);
|
||||
assert_eq!(tm.apid(), TEST_APID);
|
||||
|
@ -16,7 +16,7 @@ use spacepackets::{ByteConversionError, CcsdsPacket};
|
||||
#[cfg(feature = "std")]
|
||||
use std::error::Error;
|
||||
|
||||
use crate::pool::{PoolProviderMemInPlace, StoreError};
|
||||
use crate::pool::{PoolProvider, StoreError};
|
||||
#[cfg(feature = "alloc")]
|
||||
pub use alloc_mod::*;
|
||||
|
||||
@ -241,10 +241,7 @@ impl Error for ScheduleError {
|
||||
pub trait PusSchedulerInterface {
|
||||
type TimeProvider: CcsdsTimeProvider + TimeReader;
|
||||
|
||||
fn reset(
|
||||
&mut self,
|
||||
store: &mut (impl PoolProviderMemInPlace + ?Sized),
|
||||
) -> Result<(), StoreError>;
|
||||
fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError>;
|
||||
|
||||
fn is_enabled(&self) -> bool;
|
||||
|
||||
@ -267,7 +264,7 @@ pub trait PusSchedulerInterface {
|
||||
fn insert_wrapped_tc<TimeProvider>(
|
||||
&mut self,
|
||||
pus_tc: &(impl IsPusTelecommand + PusPacket + GenericPusTcSecondaryHeader),
|
||||
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
|
||||
pool: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<TcInfo, ScheduleError> {
|
||||
if PusPacket::service(pus_tc) != 11 {
|
||||
return Err(ScheduleError::WrongService(PusPacket::service(pus_tc)));
|
||||
@ -293,7 +290,7 @@ pub trait PusSchedulerInterface {
|
||||
&mut self,
|
||||
time_stamp: UnixTimestamp,
|
||||
tc: &[u8],
|
||||
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
|
||||
pool: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<TcInfo, ScheduleError> {
|
||||
let check_tc = PusTcReader::new(tc)?;
|
||||
if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 {
|
||||
@ -343,7 +340,7 @@ pub fn generate_insert_telecommand_app_data(
|
||||
#[cfg(feature = "alloc")]
|
||||
pub mod alloc_mod {
|
||||
use super::*;
|
||||
use crate::pool::{PoolProviderMemInPlace, StoreAddr, StoreError};
|
||||
use crate::pool::{PoolProvider, StoreAddr, StoreError};
|
||||
use alloc::collections::btree_map::{Entry, Range};
|
||||
use alloc::collections::BTreeMap;
|
||||
use alloc::vec;
|
||||
@ -379,7 +376,7 @@ pub mod alloc_mod {
|
||||
/// This is the core data structure for scheduling PUS telecommands with [alloc] support.
|
||||
///
|
||||
/// It is assumed that the actual telecommand data is stored in a separate TC pool offering
|
||||
/// a [crate::pool::PoolProviderMemInPlace] API. This data structure just tracks the store
|
||||
/// a [crate::pool::PoolProvider] API. This data structure just tracks the store
|
||||
/// addresses and their release times and offers a convenient API to insert and release
|
||||
/// telecommands and perform other functionality specified by the ECSS standard in section 6.11.
|
||||
/// The time is tracked as a [spacepackets::time::UnixTimestamp] but the only requirement to
|
||||
@ -413,6 +410,8 @@ pub mod alloc_mod {
|
||||
/// * `time_margin` - This time margin is used when inserting new telecommands into the
|
||||
/// schedule. If the release time of a new telecommand is earlier than the time margin
|
||||
/// added to the current time, it will not be inserted into the schedule.
|
||||
/// * `tc_buf_size` - Buffer for temporary storage of telecommand packets. This buffer
|
||||
/// should be large enough to accomodate the largest expected TC packets.
|
||||
pub fn new(init_current_time: UnixTimestamp, time_margin: Duration) -> Self {
|
||||
PusScheduler {
|
||||
tc_map: Default::default(),
|
||||
@ -476,7 +475,7 @@ pub mod alloc_mod {
|
||||
&mut self,
|
||||
time_stamp: UnixTimestamp,
|
||||
tc: &[u8],
|
||||
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
|
||||
pool: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<TcInfo, ScheduleError> {
|
||||
let check_tc = PusTcReader::new(tc)?;
|
||||
if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 {
|
||||
@ -499,7 +498,7 @@ pub mod alloc_mod {
|
||||
pub fn insert_wrapped_tc_cds_short(
|
||||
&mut self,
|
||||
pus_tc: &PusTc,
|
||||
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
|
||||
pool: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<TcInfo, ScheduleError> {
|
||||
self.insert_wrapped_tc::<cds::TimeProvider>(pus_tc, pool)
|
||||
}
|
||||
@ -509,7 +508,7 @@ pub mod alloc_mod {
|
||||
pub fn insert_wrapped_tc_cds_long(
|
||||
&mut self,
|
||||
pus_tc: &PusTc,
|
||||
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
|
||||
pool: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<TcInfo, ScheduleError> {
|
||||
self.insert_wrapped_tc::<cds::TimeProvider<DaysLen24Bits>>(pus_tc, pool)
|
||||
}
|
||||
@ -525,7 +524,7 @@ pub mod alloc_mod {
|
||||
pub fn delete_by_time_filter<TimeProvider: CcsdsTimeProvider + Clone>(
|
||||
&mut self,
|
||||
time_window: TimeWindow<TimeProvider>,
|
||||
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
|
||||
pool: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<u64, (u64, StoreError)> {
|
||||
let range = self.retrieve_by_time_filter(time_window);
|
||||
let mut del_packets = 0;
|
||||
@ -555,7 +554,7 @@ pub mod alloc_mod {
|
||||
/// the last deletion will be supplied in addition to the number of deleted commands.
|
||||
pub fn delete_all(
|
||||
&mut self,
|
||||
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
|
||||
pool: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<u64, (u64, StoreError)> {
|
||||
self.delete_by_time_filter(TimeWindow::<cds::TimeProvider>::new_select_all(), pool)
|
||||
}
|
||||
@ -613,7 +612,7 @@ pub mod alloc_mod {
|
||||
pub fn delete_by_request_id_and_from_pool(
|
||||
&mut self,
|
||||
req_id: &RequestId,
|
||||
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
|
||||
pool: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<bool, StoreError> {
|
||||
if let DeletionResult::WithStoreDeletion(v) =
|
||||
self.delete_by_request_id_internal_with_store_deletion(req_id, pool)
|
||||
@ -645,7 +644,7 @@ pub mod alloc_mod {
|
||||
fn delete_by_request_id_internal_with_store_deletion(
|
||||
&mut self,
|
||||
req_id: &RequestId,
|
||||
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
|
||||
pool: &mut (impl PoolProvider + ?Sized),
|
||||
) -> DeletionResult {
|
||||
let mut idx_found = None;
|
||||
for time_bucket in &mut self.tc_map {
|
||||
@ -675,7 +674,8 @@ pub mod alloc_mod {
|
||||
/// Utility method which calls [Self::telecommands_to_release] and then calls a releaser
|
||||
/// closure for each telecommand which should be released. This function will also delete
|
||||
/// the telecommands from the holding store after calling the release closure if the user
|
||||
/// returns [true] from the release closure.
|
||||
/// returns [true] from the release closure. A buffer must be provided to hold the
|
||||
/// telecommands for the release process.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
@ -685,18 +685,55 @@ pub mod alloc_mod {
|
||||
/// note that returning false might lead to memory leaks if the TC is not cleared from
|
||||
/// the store in some other way.
|
||||
/// * `tc_store` - The holding store of the telecommands.
|
||||
/// * `tc_buf` - Buffer to hold each telecommand being released.
|
||||
pub fn release_telecommands_with_buffer<R: FnMut(bool, &TcInfo, &[u8]) -> bool>(
|
||||
&mut self,
|
||||
releaser: R,
|
||||
tc_store: &mut (impl PoolProvider + ?Sized),
|
||||
tc_buf: &mut [u8],
|
||||
) -> Result<u64, (u64, StoreError)> {
|
||||
self.release_telecommands_internal(releaser, tc_store, Some(tc_buf))
|
||||
}
|
||||
|
||||
/// This functions is almost identical to [Self::release_telecommands_with_buffer] but does
|
||||
/// not require a user provided TC buffer because it will always use the
|
||||
/// [PoolProvider::read_as_vec] API to read the TC packets.
|
||||
///
|
||||
/// However, this might also perform frequent allocations for all telecommands being
|
||||
/// released.
|
||||
pub fn release_telecommands<R: FnMut(bool, &TcInfo, &[u8]) -> bool>(
|
||||
&mut self,
|
||||
releaser: R,
|
||||
tc_store: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<u64, (u64, StoreError)> {
|
||||
self.release_telecommands_internal(releaser, tc_store, None)
|
||||
}
|
||||
|
||||
fn release_telecommands_internal<R: FnMut(bool, &TcInfo, &[u8]) -> bool>(
|
||||
&mut self,
|
||||
mut releaser: R,
|
||||
tc_store: &mut (impl PoolProviderMemInPlace + ?Sized),
|
||||
tc_store: &mut (impl PoolProvider + ?Sized),
|
||||
mut tc_buf: Option<&mut [u8]>,
|
||||
) -> Result<u64, (u64, StoreError)> {
|
||||
let tcs_to_release = self.telecommands_to_release();
|
||||
let mut released_tcs = 0;
|
||||
let mut store_error = Ok(());
|
||||
for tc in tcs_to_release {
|
||||
for info in tc.1 {
|
||||
let tc = tc_store.read(&info.addr).map_err(|e| (released_tcs, e))?;
|
||||
let should_delete = releaser(self.enabled, info, tc);
|
||||
let should_delete = match tc_buf.as_mut() {
|
||||
Some(buf) => {
|
||||
tc_store
|
||||
.read(&info.addr, buf)
|
||||
.map_err(|e| (released_tcs, e))?;
|
||||
releaser(self.enabled, info, buf)
|
||||
}
|
||||
None => {
|
||||
let tc = tc_store
|
||||
.read_as_vec(&info.addr)
|
||||
.map_err(|e| (released_tcs, e))?;
|
||||
releaser(self.enabled, info, &tc)
|
||||
}
|
||||
};
|
||||
released_tcs += 1;
|
||||
if should_delete {
|
||||
let res = tc_store.delete(info.addr);
|
||||
@ -721,16 +758,17 @@ pub mod alloc_mod {
|
||||
pub fn release_telecommands_no_deletion<R: FnMut(bool, &TcInfo, &[u8])>(
|
||||
&mut self,
|
||||
mut releaser: R,
|
||||
tc_store: &(impl PoolProviderMemInPlace + ?Sized),
|
||||
tc_store: &(impl PoolProvider + ?Sized),
|
||||
tc_buf: &mut [u8],
|
||||
) -> Result<Vec<TcInfo>, (Vec<TcInfo>, StoreError)> {
|
||||
let tcs_to_release = self.telecommands_to_release();
|
||||
let mut released_tcs = Vec::new();
|
||||
for tc in tcs_to_release {
|
||||
for info in tc.1 {
|
||||
let tc = tc_store
|
||||
.read(&info.addr)
|
||||
tc_store
|
||||
.read(&info.addr, tc_buf)
|
||||
.map_err(|e| (released_tcs.clone(), e))?;
|
||||
releaser(self.is_enabled(), info, tc);
|
||||
releaser(self.is_enabled(), info, tc_buf);
|
||||
released_tcs.push(*info);
|
||||
}
|
||||
}
|
||||
@ -753,10 +791,7 @@ pub mod alloc_mod {
|
||||
/// The holding store for the telecommands needs to be passed so all the stored telecommands
|
||||
/// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error
|
||||
/// will be returned but the method will still try to delete all the commands in the schedule.
|
||||
fn reset(
|
||||
&mut self,
|
||||
store: &mut (impl PoolProviderMemInPlace + ?Sized),
|
||||
) -> Result<(), StoreError> {
|
||||
fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError> {
|
||||
self.enabled = false;
|
||||
let mut deletion_ok = Ok(());
|
||||
for tc_lists in &mut self.tc_map {
|
||||
@ -814,8 +849,7 @@ pub mod alloc_mod {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::pool::{
|
||||
PoolProviderMemInPlace, StaticMemoryPool, StaticPoolAddr, StaticPoolConfig, StoreAddr,
|
||||
StoreError,
|
||||
PoolProvider, StaticMemoryPool, StaticPoolAddr, StaticPoolConfig, StoreAddr, StoreError,
|
||||
};
|
||||
use alloc::collections::btree_map::Range;
|
||||
use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader};
|
||||
@ -1088,8 +1122,9 @@ mod tests {
|
||||
// test 1: too early, no tcs
|
||||
scheduler.update_time(UnixTimestamp::new_only_seconds(99));
|
||||
|
||||
let mut tc_buf: [u8; 128] = [0; 128];
|
||||
scheduler
|
||||
.release_telecommands(&mut test_closure_1, &mut pool)
|
||||
.release_telecommands_with_buffer(&mut test_closure_1, &mut pool, &mut tc_buf)
|
||||
.expect("deletion failed");
|
||||
|
||||
// test 2: exact time stamp of tc, releases 1 tc
|
||||
@ -1111,7 +1146,7 @@ mod tests {
|
||||
scheduler.update_time(UnixTimestamp::new_only_seconds(206));
|
||||
|
||||
released = scheduler
|
||||
.release_telecommands(&mut test_closure_2, &mut pool)
|
||||
.release_telecommands_with_buffer(&mut test_closure_2, &mut pool, &mut tc_buf)
|
||||
.expect("deletion failed");
|
||||
assert_eq!(released, 1);
|
||||
// TC is deleted.
|
||||
@ -1157,9 +1192,10 @@ mod tests {
|
||||
|
||||
// test 1: too early, no tcs
|
||||
scheduler.update_time(UnixTimestamp::new_only_seconds(99));
|
||||
let mut tc_buf: [u8; 128] = [0; 128];
|
||||
|
||||
let mut released = scheduler
|
||||
.release_telecommands(&mut test_closure, &mut pool)
|
||||
.release_telecommands_with_buffer(&mut test_closure, &mut pool, &mut tc_buf)
|
||||
.expect("deletion failed");
|
||||
assert_eq!(released, 0);
|
||||
|
||||
@ -1209,11 +1245,13 @@ mod tests {
|
||||
true
|
||||
};
|
||||
|
||||
let mut tc_buf: [u8; 128] = [0; 128];
|
||||
|
||||
// test 1: too early, no tcs
|
||||
scheduler.update_time(UnixTimestamp::new_only_seconds(99));
|
||||
|
||||
scheduler
|
||||
.release_telecommands(&mut test_closure_1, &mut pool)
|
||||
.release_telecommands_with_buffer(&mut test_closure_1, &mut pool, &mut tc_buf)
|
||||
.expect("deletion failed");
|
||||
|
||||
// test 2: exact time stamp of tc, releases 1 tc
|
||||
@ -1267,8 +1305,9 @@ mod tests {
|
||||
|
||||
assert!(pool.has_element_at(&tc_info_0.addr()).unwrap());
|
||||
|
||||
let data = pool.read(&tc_info_0.addr()).unwrap();
|
||||
let check_tc = PusTcReader::new(data).expect("incorrect Pus tc raw data");
|
||||
let mut read_buf: [u8; 64] = [0; 64];
|
||||
pool.read(&tc_info_0.addr(), &mut read_buf).unwrap();
|
||||
let check_tc = PusTcReader::new(&read_buf).expect("incorrect Pus tc raw data");
|
||||
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None));
|
||||
|
||||
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
|
||||
@ -1289,8 +1328,9 @@ mod tests {
|
||||
.release_telecommands(&mut test_closure, &mut pool)
|
||||
.unwrap();
|
||||
|
||||
let data = pool.read(&addr_vec[0]).unwrap();
|
||||
let check_tc = PusTcReader::new(data).expect("incorrect Pus tc raw data");
|
||||
let read_len = pool.read(&addr_vec[0], &mut read_buf).unwrap();
|
||||
let check_tc = PusTcReader::new(&read_buf).expect("incorrect Pus tc raw data");
|
||||
assert_eq!(read_len, check_tc.1);
|
||||
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None));
|
||||
}
|
||||
|
||||
@ -1313,8 +1353,9 @@ mod tests {
|
||||
|
||||
assert!(pool.has_element_at(&info.addr).unwrap());
|
||||
|
||||
let data = pool.read(&info.addr).unwrap();
|
||||
let check_tc = PusTcReader::new(data).expect("incorrect Pus tc raw data");
|
||||
let read_len = pool.read(&info.addr, &mut buf).unwrap();
|
||||
let check_tc = PusTcReader::new(&buf).expect("incorrect Pus tc raw data");
|
||||
assert_eq!(read_len, check_tc.1);
|
||||
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None));
|
||||
|
||||
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
|
||||
@ -1331,12 +1372,15 @@ mod tests {
|
||||
false
|
||||
};
|
||||
|
||||
let mut tc_buf: [u8; 64] = [0; 64];
|
||||
|
||||
scheduler
|
||||
.release_telecommands(&mut test_closure, &mut pool)
|
||||
.release_telecommands_with_buffer(&mut test_closure, &mut pool, &mut tc_buf)
|
||||
.unwrap();
|
||||
|
||||
let data = pool.read(&addr_vec[0]).unwrap();
|
||||
let check_tc = PusTcReader::new(data).expect("incorrect PUS tc raw data");
|
||||
let read_len = pool.read(&addr_vec[0], &mut buf).unwrap();
|
||||
let check_tc = PusTcReader::new(&buf).expect("incorrect PUS tc raw data");
|
||||
assert_eq!(read_len, check_tc.1);
|
||||
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None));
|
||||
}
|
||||
|
||||
@ -1903,8 +1947,9 @@ mod tests {
|
||||
|
||||
scheduler.update_time(UnixTimestamp::new_only_seconds(205));
|
||||
|
||||
let mut tc_buf: [u8; 64] = [0; 64];
|
||||
let tc_info_vec = scheduler
|
||||
.release_telecommands_no_deletion(&mut test_closure_1, &pool)
|
||||
.release_telecommands_no_deletion(&mut test_closure_1, &pool, &mut tc_buf)
|
||||
.expect("deletion failed");
|
||||
assert_eq!(tc_info_vec[0], tc_info_0);
|
||||
assert_eq!(tc_info_vec[1], tc_info_1);
|
||||
|
@ -1,6 +1,6 @@
|
||||
use super::scheduler::PusSchedulerInterface;
|
||||
use super::{EcssTcInMemConverter, PusServiceBase, PusServiceHelper};
|
||||
use crate::pool::PoolProviderMemInPlace;
|
||||
use crate::pool::PoolProvider;
|
||||
use crate::pus::{PusPacketHandlerResult, PusPacketHandlingError};
|
||||
use alloc::string::ToString;
|
||||
use spacepackets::ecss::{scheduling, PusPacket};
|
||||
@ -42,7 +42,7 @@ impl<TcInMemConverter: EcssTcInMemConverter, Scheduler: PusSchedulerInterface>
|
||||
|
||||
pub fn handle_one_tc(
|
||||
&mut self,
|
||||
sched_tc_pool: &mut (impl PoolProviderMemInPlace + ?Sized),
|
||||
sched_tc_pool: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
|
||||
let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?;
|
||||
if possible_packet.is_none() {
|
||||
@ -237,7 +237,7 @@ mod tests {
|
||||
|
||||
fn reset(
|
||||
&mut self,
|
||||
_store: &mut (impl crate::pool::PoolProviderMemInPlace + ?Sized),
|
||||
_store: &mut (impl crate::pool::PoolProvider + ?Sized),
|
||||
) -> Result<(), crate::pool::StoreError> {
|
||||
self.reset_count += 1;
|
||||
Ok(())
|
||||
|
@ -15,7 +15,7 @@
|
||||
//! ```
|
||||
//! use std::sync::{Arc, mpsc, RwLock};
|
||||
//! use std::time::Duration;
|
||||
//! use satrs_core::pool::{PoolProviderMemInPlaceWithGuards, StaticMemoryPool, StaticPoolConfig};
|
||||
//! use satrs_core::pool::{PoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig};
|
||||
//! use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender};
|
||||
//! use satrs_core::seq_count::SeqCountProviderSimple;
|
||||
//! use satrs_core::pus::MpscTmInSharedPoolSender;
|
||||
@ -56,9 +56,7 @@
|
||||
//! {
|
||||
//! let mut rg = tm_store.write().expect("Error locking shared pool");
|
||||
//! let store_guard = rg.read_with_guard(addr);
|
||||
//! let slice = store_guard.read().expect("Error reading TM slice");
|
||||
//! tm_len = slice.len();
|
||||
//! tm_buf[0..tm_len].copy_from_slice(slice);
|
||||
//! tm_len = store_guard.read(&mut tm_buf).expect("Error reading TM slice");
|
||||
//! }
|
||||
//! let (pus_tm, _) = PusTmReader::new(&tm_buf[0..tm_len], 7)
|
||||
//! .expect("Error reading verification TM");
|
||||
@ -1325,7 +1323,7 @@ mod std_mod {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::pool::{PoolProviderMemInPlaceWithGuards, StaticMemoryPool, StaticPoolConfig};
|
||||
use crate::pool::{PoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig};
|
||||
use crate::pus::tests::CommonTmInfo;
|
||||
use crate::pus::verification::{
|
||||
EcssTmSenderCore, EcssTmtcError, FailParams, FailParamsWithStep, RequestId, TcStateNone,
|
||||
@ -1547,7 +1545,7 @@ mod tests {
|
||||
let mut sender = TestSender::default();
|
||||
let fail_code = EcssEnumU16::new(2);
|
||||
let fail_params = FailParams::new(Some(stamp_buf.as_slice()), &fail_code, None);
|
||||
b.vr.acceptance_failure(tok, &mut sender, fail_params)
|
||||
b.vr.acceptance_failure(tok, &sender, fail_params)
|
||||
.expect("Sending acceptance success failed");
|
||||
acceptance_fail_check(&mut sender, tok.req_id, stamp_buf);
|
||||
}
|
||||
@ -1682,12 +1680,10 @@ mod tests {
|
||||
);
|
||||
|
||||
let accepted_token =
|
||||
b.vr.acceptance_success(tok, &mut sender, Some(&EMPTY_STAMP))
|
||||
b.vr.acceptance_success(tok, &sender, Some(&EMPTY_STAMP))
|
||||
.expect("Sending acceptance success failed");
|
||||
let empty =
|
||||
b.vr.start_failure(accepted_token, &mut sender, fail_params)
|
||||
.expect("Start failure failure");
|
||||
assert_eq!(empty, ());
|
||||
b.vr.start_failure(accepted_token, &mut sender, fail_params)
|
||||
.expect("Start failure failure");
|
||||
start_fail_check(&mut sender, tok.req_id, fail_data_raw);
|
||||
}
|
||||
|
||||
@ -1779,23 +1775,23 @@ mod tests {
|
||||
let mut sender = TestSender::default();
|
||||
let accepted_token = b
|
||||
.rep()
|
||||
.acceptance_success(tok, &mut sender, Some(&EMPTY_STAMP))
|
||||
.acceptance_success(tok, &sender, Some(&EMPTY_STAMP))
|
||||
.expect("Sending acceptance success failed");
|
||||
let started_token = b
|
||||
.rep()
|
||||
.start_success(accepted_token, &mut sender, Some(&[0, 1, 0, 1, 0, 1, 0]))
|
||||
.start_success(accepted_token, &sender, Some(&[0, 1, 0, 1, 0, 1, 0]))
|
||||
.expect("Sending start success failed");
|
||||
b.rep()
|
||||
.step_success(
|
||||
&started_token,
|
||||
&mut sender,
|
||||
&sender,
|
||||
Some(&EMPTY_STAMP),
|
||||
EcssEnumU8::new(0),
|
||||
)
|
||||
.expect("Sending step 0 success failed");
|
||||
b.vr.step_success(
|
||||
&started_token,
|
||||
&mut sender,
|
||||
&sender,
|
||||
Some(&EMPTY_STAMP),
|
||||
EcssEnumU8::new(1),
|
||||
)
|
||||
@ -2176,9 +2172,9 @@ mod tests {
|
||||
{
|
||||
let mut rg = shared_tm_pool.write().expect("Error locking shared pool");
|
||||
let store_guard = rg.read_with_guard(addr);
|
||||
let slice = store_guard.read().expect("Error reading TM slice");
|
||||
tm_len = slice.len();
|
||||
tm_buf[0..tm_len].copy_from_slice(slice);
|
||||
tm_len = store_guard
|
||||
.read(&mut tm_buf)
|
||||
.expect("Error reading TM slice");
|
||||
}
|
||||
let (pus_tm, _) =
|
||||
PusTmReader::new(&tm_buf[0..tm_len], 7).expect("Error reading verification TM");
|
||||
|
@ -8,9 +8,7 @@ pub use std_mod::*;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
pub mod std_mod {
|
||||
use crate::pool::{
|
||||
PoolProviderMemInPlace, SharedStaticMemoryPool, StaticMemoryPool, StoreAddr,
|
||||
};
|
||||
use crate::pool::{PoolProvider, SharedStaticMemoryPool, StaticMemoryPool, StoreAddr};
|
||||
use crate::pus::EcssTmtcError;
|
||||
use spacepackets::ecss::tm::PusTmCreator;
|
||||
use spacepackets::ecss::WritablePusPacket;
|
||||
@ -37,10 +35,11 @@ pub mod std_mod {
|
||||
|
||||
pub fn add_pus_tm(&self, pus_tm: &PusTmCreator) -> Result<StoreAddr, EcssTmtcError> {
|
||||
let mut pg = self.0.write().map_err(|_| EcssTmtcError::StoreLock)?;
|
||||
let (addr, buf) = pg.free_element(pus_tm.len_written())?;
|
||||
pus_tm
|
||||
.write_to_bytes(buf)
|
||||
.expect("writing PUS TM to store failed");
|
||||
let addr = pg.free_element(pus_tm.len_written(), |buf| {
|
||||
pus_tm
|
||||
.write_to_bytes(buf)
|
||||
.expect("writing PUS TM to store failed");
|
||||
})?;
|
||||
Ok(addr)
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,4 @@
|
||||
use satrs_core::pool::{
|
||||
PoolGuard, PoolProviderMemInPlace, StaticMemoryPool, StaticPoolConfig, StoreAddr,
|
||||
};
|
||||
use satrs_core::pool::{PoolGuard, PoolProvider, StaticMemoryPool, StaticPoolConfig, StoreAddr};
|
||||
use std::ops::DerefMut;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::mpsc::{Receiver, Sender};
|
||||
@ -27,8 +25,10 @@ fn threaded_usage() {
|
||||
addr = rx.recv().expect("Receiving store address failed");
|
||||
let mut pool_access = shared_clone.write().unwrap();
|
||||
let pg = PoolGuard::new(pool_access.deref_mut(), addr);
|
||||
let read_res = pg.read().expect("Reading failed");
|
||||
assert_eq!(read_res, DUMMY_DATA);
|
||||
let mut read_buf: [u8; 4] = [0; 4];
|
||||
let read_bytes = pg.read(&mut read_buf).expect("Reading failed");
|
||||
assert_eq!(read_buf, DUMMY_DATA);
|
||||
assert_eq!(read_bytes, 4);
|
||||
}
|
||||
let pool_access = shared_clone.read().unwrap();
|
||||
assert!(!pool_access.has_element_at(&addr).expect("Invalid address"));
|
||||
|
@ -2,8 +2,7 @@
|
||||
pub mod crossbeam_test {
|
||||
use hashbrown::HashMap;
|
||||
use satrs_core::pool::{
|
||||
PoolProviderMemInPlace, PoolProviderMemInPlaceWithGuards, StaticMemoryPool,
|
||||
StaticPoolConfig,
|
||||
PoolProvider, PoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig,
|
||||
};
|
||||
use satrs_core::pus::verification::{
|
||||
FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender,
|
||||
@ -59,15 +58,21 @@ pub mod crossbeam_test {
|
||||
let tc_header = PusTcSecondaryHeader::new_simple(17, 1);
|
||||
let pus_tc_0 = PusTcCreator::new_no_app_data(&mut sph, tc_header, true);
|
||||
req_id_0 = RequestId::new(&pus_tc_0);
|
||||
let (addr, buf) = tc_guard.free_element(pus_tc_0.len_written()).unwrap();
|
||||
pus_tc_0.write_to_bytes(buf).unwrap();
|
||||
let addr = tc_guard
|
||||
.free_element(pus_tc_0.len_written(), |buf| {
|
||||
pus_tc_0.write_to_bytes(buf).unwrap();
|
||||
})
|
||||
.unwrap();
|
||||
tx_tc_0.send(addr).unwrap();
|
||||
let mut sph = SpHeader::tc_unseg(TEST_APID, 1, 0).unwrap();
|
||||
let tc_header = PusTcSecondaryHeader::new_simple(5, 1);
|
||||
let pus_tc_1 = PusTcCreator::new_no_app_data(&mut sph, tc_header, true);
|
||||
req_id_1 = RequestId::new(&pus_tc_1);
|
||||
let (addr, buf) = tc_guard.free_element(pus_tc_0.len_written()).unwrap();
|
||||
pus_tc_1.write_to_bytes(buf).unwrap();
|
||||
let addr = tc_guard
|
||||
.free_element(pus_tc_0.len_written(), |buf| {
|
||||
pus_tc_1.write_to_bytes(buf).unwrap();
|
||||
})
|
||||
.unwrap();
|
||||
tx_tc_1.send(addr).unwrap();
|
||||
}
|
||||
let verif_sender_0 = thread::spawn(move || {
|
||||
@ -79,9 +84,7 @@ pub mod crossbeam_test {
|
||||
{
|
||||
let mut tc_guard = shared_tc_pool_0.write().unwrap();
|
||||
let pg = tc_guard.read_with_guard(tc_addr);
|
||||
let buf = pg.read().unwrap();
|
||||
tc_len = buf.len();
|
||||
tc_buf[0..tc_len].copy_from_slice(buf);
|
||||
tc_len = pg.read(&mut tc_buf).unwrap();
|
||||
}
|
||||
let (_tc, _) = PusTcReader::new(&tc_buf[0..tc_len]).unwrap();
|
||||
|
||||
@ -117,9 +120,7 @@ pub mod crossbeam_test {
|
||||
{
|
||||
let mut tc_guard = shared_tc_pool_1.write().unwrap();
|
||||
let pg = tc_guard.read_with_guard(tc_addr);
|
||||
let buf = pg.read().unwrap();
|
||||
tc_len = buf.len();
|
||||
tc_buf[0..tc_len].copy_from_slice(buf);
|
||||
tc_len = pg.read(&mut tc_buf).unwrap();
|
||||
}
|
||||
let (tc, _) = PusTcReader::new(&tc_buf[0..tc_len]).unwrap();
|
||||
let token = reporter_with_sender_1.add_tc(&tc);
|
||||
@ -149,9 +150,9 @@ pub mod crossbeam_test {
|
||||
{
|
||||
let mut rg = shared_tm_store.write().expect("Error locking shared pool");
|
||||
let store_guard = rg.read_with_guard(verif_addr);
|
||||
let slice = store_guard.read().expect("Error reading TM slice");
|
||||
tm_len = slice.len();
|
||||
tm_buf[0..tm_len].copy_from_slice(slice);
|
||||
tm_len = store_guard
|
||||
.read(&mut tm_buf)
|
||||
.expect("Error reading TM slice");
|
||||
}
|
||||
let (pus_tm, _) =
|
||||
PusTmReader::new(&tm_buf[0..tm_len], 7).expect("Error reading verification TM");
|
||||
|
@ -2,7 +2,7 @@ use std::sync::mpsc;
|
||||
use std::time::Duration;
|
||||
|
||||
use log::{error, info, warn};
|
||||
use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StoreAddr};
|
||||
use satrs_core::pool::{PoolProvider, StaticMemoryPool, StoreAddr};
|
||||
use satrs_core::pus::scheduler::{PusScheduler, TcInfo};
|
||||
use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
|
||||
use satrs_core::pus::verification::VerificationReporterWithSender;
|
||||
@ -54,6 +54,7 @@ impl TcReleaser for mpsc::Sender<Vec<u8>> {
|
||||
pub struct Pus11Wrapper<TcInMemConverter: EcssTcInMemConverter> {
|
||||
pub pus_11_handler: PusService11SchedHandler<TcInMemConverter, PusScheduler>,
|
||||
pub sched_tc_pool: StaticMemoryPool,
|
||||
pub releaser_buf: [u8; 4096],
|
||||
pub tc_releaser: Box<dyn TcReleaser + Send>,
|
||||
}
|
||||
|
||||
@ -70,7 +71,11 @@ impl<TcInMemConverter: EcssTcInMemConverter> Pus11Wrapper<TcInMemConverter> {
|
||||
let released_tcs = self
|
||||
.pus_11_handler
|
||||
.scheduler_mut()
|
||||
.release_telecommands(releaser, &mut self.sched_tc_pool)
|
||||
.release_telecommands_with_buffer(
|
||||
releaser,
|
||||
&mut self.sched_tc_pool,
|
||||
&mut self.releaser_buf,
|
||||
)
|
||||
.expect("releasing TCs failed");
|
||||
if released_tcs > 0 {
|
||||
info!("{released_tcs} TC(s) released from scheduler");
|
||||
@ -136,6 +141,7 @@ pub fn create_scheduler_service_static(
|
||||
Pus11Wrapper {
|
||||
pus_11_handler,
|
||||
sched_tc_pool,
|
||||
releaser_buf: [0; 4096],
|
||||
tc_releaser: Box::new(tc_releaser),
|
||||
}
|
||||
}
|
||||
@ -172,6 +178,7 @@ pub fn create_scheduler_service_dynamic(
|
||||
Pus11Wrapper {
|
||||
pus_11_handler,
|
||||
sched_tc_pool,
|
||||
releaser_buf: [0; 4096],
|
||||
tc_releaser: Box::new(tc_source_sender),
|
||||
}
|
||||
}
|
||||
|
@ -3,8 +3,9 @@ use std::{
|
||||
sync::mpsc::{Receiver, Sender},
|
||||
};
|
||||
|
||||
use log::info;
|
||||
use satrs_core::{
|
||||
pool::{PoolProviderMemInPlace, StoreAddr},
|
||||
pool::{PoolProvider, StoreAddr},
|
||||
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
|
||||
spacepackets::{
|
||||
ecss::{tm::PusTmZeroCopyWriter, PusPacket},
|
||||
@ -63,9 +64,14 @@ impl TmFunnelCommon {
|
||||
*entry += 1;
|
||||
}
|
||||
|
||||
Self::packet_printout(&zero_copy_writer);
|
||||
// This operation has to come last!
|
||||
zero_copy_writer.finish();
|
||||
}
|
||||
|
||||
fn packet_printout(tm: &PusTmZeroCopyWriter) {
|
||||
info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice());
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TmFunnelStatic {
|
||||
@ -96,16 +102,21 @@ impl TmFunnelStatic {
|
||||
// the CRC.
|
||||
let shared_pool = self.shared_tm_store.clone_backing_pool();
|
||||
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
|
||||
let tm_raw = pool_guard
|
||||
.modify(&addr)
|
||||
let mut tm_copy = Vec::new();
|
||||
pool_guard
|
||||
.modify(&addr, |buf| {
|
||||
let zero_copy_writer = PusTmZeroCopyWriter::new(buf, MIN_CDS_FIELD_LEN)
|
||||
.expect("Creating TM zero copy writer failed");
|
||||
self.common.apply_packet_processing(zero_copy_writer);
|
||||
tm_copy = buf.to_vec()
|
||||
})
|
||||
.expect("Reading TM from pool failed");
|
||||
let zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw, MIN_CDS_FIELD_LEN)
|
||||
.expect("Creating TM zero copy writer failed");
|
||||
self.common.apply_packet_processing(zero_copy_writer);
|
||||
self.tm_server_tx
|
||||
.send(addr)
|
||||
.expect("Sending TM to server failed");
|
||||
self.common.sync_tm_tcp_source.add_tm(tm_raw);
|
||||
// We could also do this step in the update closure, but I'd rather avoid this, could
|
||||
// lead to nested locking.
|
||||
self.common.sync_tm_tcp_source.add_tm(&tm_copy);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ use std::sync::mpsc::{self, Receiver, SendError, Sender, TryRecvError};
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::pus::PusReceiver;
|
||||
use satrs_core::pool::{PoolProviderMemInPlace, SharedStaticMemoryPool, StoreAddr, StoreError};
|
||||
use satrs_core::pool::{PoolProvider, SharedStaticMemoryPool, StoreAddr, StoreError};
|
||||
use satrs_core::spacepackets::ecss::tc::PusTcReader;
|
||||
use satrs_core::spacepackets::ecss::PusPacket;
|
||||
use satrs_core::tmtc::ReceivesCcsdsTc;
|
||||
@ -28,8 +28,9 @@ pub struct SharedTcPool {
|
||||
impl SharedTcPool {
|
||||
pub fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result<StoreAddr, StoreError> {
|
||||
let mut pg = self.pool.write().expect("error locking TC store");
|
||||
let (addr, buf) = pg.free_element(pus_tc.len_packed())?;
|
||||
buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data());
|
||||
let addr = pg.free_element(pus_tc.len_packed(), |buf| {
|
||||
buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data());
|
||||
})?;
|
||||
Ok(addr)
|
||||
}
|
||||
}
|
||||
@ -125,8 +126,8 @@ impl TcSourceTaskStatic {
|
||||
.pool
|
||||
.read()
|
||||
.expect("locking tc pool failed");
|
||||
let data = pool.read(&addr).expect("reading pool failed");
|
||||
self.tc_buf[0..data.len()].copy_from_slice(data);
|
||||
pool.read(&addr, &mut self.tc_buf)
|
||||
.expect("reading pool failed");
|
||||
drop(pool);
|
||||
match PusTcReader::new(&self.tc_buf) {
|
||||
Ok((pus_tc, _)) => {
|
||||
|
@ -6,7 +6,7 @@ use std::{
|
||||
use log::{info, warn};
|
||||
use satrs_core::{
|
||||
hal::std::udp_server::{ReceiveResult, UdpTcServer},
|
||||
pool::{PoolProviderMemInPlaceWithGuards, SharedStaticMemoryPool, StoreAddr},
|
||||
pool::{PoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr},
|
||||
tmtc::CcsdsError,
|
||||
};
|
||||
|
||||
@ -29,20 +29,13 @@ impl UdpTmHandler for StaticUdpTmHandler {
|
||||
}
|
||||
let mut store_lock = store_lock.unwrap();
|
||||
let pg = store_lock.read_with_guard(addr);
|
||||
let read_res = pg.read();
|
||||
let read_res = pg.read_as_vec();
|
||||
if read_res.is_err() {
|
||||
warn!("Error reading TM pool data");
|
||||
continue;
|
||||
}
|
||||
let buf = read_res.unwrap();
|
||||
if buf.len() > 9 {
|
||||
let service = buf[7];
|
||||
let subservice = buf[8];
|
||||
info!("Sending PUS TM[{service},{subservice}]")
|
||||
} else {
|
||||
info!("Sending PUS TM");
|
||||
}
|
||||
let result = socket.send_to(buf, recv_addr);
|
||||
let result = socket.send_to(&buf, recv_addr);
|
||||
if let Err(e) = result {
|
||||
warn!("Sending TM with UDP socket failed: {e}")
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user