this is complicated

This commit is contained in:
2022-09-10 13:34:04 +02:00
parent a2c1a38b48
commit f5a45c008e
11 changed files with 227 additions and 151 deletions

View File

@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
thiserror = "1.0"
delegate = "0.7.0"
delegate = "0.8.0"
hashbrown = "0.12.3"
[dependencies.num-traits]

View File

@ -13,7 +13,7 @@
//! # Example
//!
//! ```
//! use fsrc_core::pool::{LocalPool, PoolCfg};
//! use fsrc_core::pool::{LocalPool, PoolCfg, PoolProvider};
//!
//! // 4 buckets of 4 bytes, 2 of 8 bytes and 1 of 16 bytes
//! let pool_cfg = PoolCfg::new(vec![(4, 4), (2, 8), (1, 16)]);
@ -78,9 +78,18 @@ use alloc::string::String;
use alloc::vec;
use alloc::vec::Vec;
use delegate::delegate;
#[cfg(feature = "std")]
use std::boxed::Box;
#[cfg(feature = "std")]
use std::sync::{Arc, RwLock};
type NumBlocks = u16;
#[cfg(feature = "std")]
pub type ShareablePoolProvider = Box<dyn PoolProvider + Send + Sync>;
#[cfg(feature = "std")]
pub type SharedPool = Arc<RwLock<ShareablePoolProvider>>;
/// Configuration structure of the [local pool][LocalPool]
///
/// # Parameters
@ -151,10 +160,52 @@ pub enum StoreError {
InternalError(String),
}
pub trait PoolProvider {
/// Add new data to the pool. The provider should attempt to reserve a memory block with the
/// appropriate size and then copy the given data to the block. Yields a [StoreAddr] which can
/// be used to access the data stored in the pool
fn add(&mut self, data: &[u8]) -> Result<StoreAddr, StoreError>;
/// The provider should attempt to reserve a free memory block with the appropriate size and
/// then return a mutable reference to it. Yields a [StoreAddr] which can be used to access
/// the data stored in the pool
fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError>;
/// Modify data added previously using a given [StoreAddr] by yielding a mutable reference
/// to it
fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError>;
/// This function behaves like [Self::modify], but consumes the provided address and returns a
/// RAII conformant guard object.
///
/// Unless the guard [PoolRwGuard::release] method is called, the data for the
/// given address will be deleted automatically when the guard is dropped.
/// This can prevent memory leaks. Users can read (and modify) the data and release the guard
/// if the data in the store is valid for further processing. If the data is faulty, no
/// manual deletion is necessary when returning from a processing function prematurely.
fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard;
/// Read data by yielding a read-only reference given a [StoreAddr]
fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError>;
/// This function behaves like [Self::read], but consumes the provided address and returns a
/// RAII conformant guard object.
///
/// Unless the guard [PoolRwGuard::release] method is called, the data for the
/// given address will be deleted automatically when the guard is dropped.
/// This can prevent memory leaks. Users can read the data and release the guard
/// if the data in the store is valid for further processing. If the data is faulty, no
/// manual deletion is necessary when returning from a processing function prematurely.
fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard;
/// Delete data inside the pool given a [StoreAddr]
fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError>;
fn has_element_at(&self, addr: &StoreAddr) -> Result<bool, StoreError>;
}
impl LocalPool {
const STORE_FREE: PoolSize = PoolSize::MAX;
const MAX_SIZE: PoolSize = Self::STORE_FREE - 1;
/// Create a new local pool from the [given configuration][PoolCfg]. This function will sanitize
/// the given configuration as well.
pub fn new(mut cfg: PoolCfg) -> LocalPool {
@ -175,96 +226,6 @@ impl LocalPool {
local_pool
}
/// Add new data to the pool. It will attempt to reserve a memory block with the appropriate
/// size and then copy the given data to the block. Yields a [StoreAddr] which can be used
/// to access the data stored in the pool
pub fn add(&mut self, data: &[u8]) -> Result<StoreAddr, StoreError> {
let data_len = data.len();
if data_len > Self::MAX_SIZE {
return Err(StoreError::DataTooLarge(data_len));
}
let addr = self.reserve(data_len)?;
self.write(&addr, data)?;
Ok(addr)
}
/// Reserves a free memory block with the appropriate size and returns a mutable reference
/// to it. Yields a [StoreAddr] which can be used to access the data stored in the pool
pub fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError> {
if len > Self::MAX_SIZE {
return Err(StoreError::DataTooLarge(len));
}
let addr = self.reserve(len)?;
let raw_pos = self.raw_pos(&addr).unwrap();
let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + len];
Ok((addr, block))
}
/// Modify data added previously using a given [StoreAddr] by yielding a mutable reference
/// to it
pub fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError> {
let curr_size = self.addr_check(addr)?;
let raw_pos = self.raw_pos(addr).unwrap();
let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..curr_size];
Ok(block)
}
/// This function behaves like [Self::modify], but consumes the provided address and returns a
/// RAII conformant guard object.
///
/// Unless the guard [PoolRwGuard::release] method is called, the data for the
/// given address will be deleted automatically when the guard is dropped.
/// This can prevent memory leaks. Users can read (and modify) the data and release the guard
/// if the data in the store is valid for further processing. If the data is faulty, no
/// manual deletion is necessary when returning from a processing function prematurely.
pub fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard {
PoolRwGuard::new(self, addr)
}
/// Read data by yielding a read-only reference given a [StoreAddr]
pub fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError> {
let curr_size = self.addr_check(addr)?;
let raw_pos = self.raw_pos(addr).unwrap();
let block = &self.pool.get(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + curr_size];
Ok(block)
}
/// This function behaves like [Self::read], but consumes the provided address and returns a
/// RAII conformant guard object.
///
/// Unless the guard [PoolRwGuard::release] method is called, the data for the
/// given address will be deleted automatically when the guard is dropped.
/// This can prevent memory leaks. Users can read the data and release the guard
/// if the data in the store is valid for further processing. If the data is faulty, no
/// manual deletion is necessary when returning from a processing function prematurely.
pub fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard {
PoolGuard::new(self, addr)
}
/// Delete data inside the pool given a [StoreAddr]
pub fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError> {
self.addr_check(&addr)?;
let block_size = self.pool_cfg.cfg.get(addr.pool_idx as usize).unwrap().1;
let raw_pos = self.raw_pos(&addr).unwrap();
let block =
&mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + block_size];
let size_list = self.sizes_lists.get_mut(addr.pool_idx as usize).unwrap();
size_list[addr.packet_idx as usize] = Self::STORE_FREE;
block.fill(0);
Ok(())
}
pub fn has_element_at(&self, addr: &StoreAddr) -> Result<bool, StoreError> {
self.validate_addr(addr)?;
let pool_idx = addr.pool_idx as usize;
let size_list = self.sizes_lists.get(pool_idx).unwrap();
let curr_size = size_list[addr.packet_idx as usize];
if curr_size == Self::STORE_FREE {
return Ok(false);
}
Ok(true)
}
fn addr_check(&self, addr: &StoreAddr) -> Result<usize, StoreError> {
self.validate_addr(addr)?;
let pool_idx = addr.pool_idx as usize;
@ -355,6 +316,73 @@ impl LocalPool {
}
}
impl PoolProvider for LocalPool {
fn add(&mut self, data: &[u8]) -> Result<StoreAddr, StoreError> {
let data_len = data.len();
if data_len > Self::MAX_SIZE {
return Err(StoreError::DataTooLarge(data_len));
}
let addr = self.reserve(data_len)?;
self.write(&addr, data)?;
Ok(addr)
}
fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError> {
if len > Self::MAX_SIZE {
return Err(StoreError::DataTooLarge(len));
}
let addr = self.reserve(len)?;
let raw_pos = self.raw_pos(&addr).unwrap();
let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + len];
Ok((addr, block))
}
fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError> {
let curr_size = self.addr_check(addr)?;
let raw_pos = self.raw_pos(addr).unwrap();
let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..curr_size];
Ok(block)
}
fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard {
PoolRwGuard::new(self, addr)
}
fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError> {
let curr_size = self.addr_check(addr)?;
let raw_pos = self.raw_pos(addr).unwrap();
let block = &self.pool.get(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + curr_size];
Ok(block)
}
fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard {
PoolGuard::new(self, addr)
}
fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError> {
self.addr_check(&addr)?;
let block_size = self.pool_cfg.cfg.get(addr.pool_idx as usize).unwrap().1;
let raw_pos = self.raw_pos(&addr).unwrap();
let block =
&mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + block_size];
let size_list = self.sizes_lists.get_mut(addr.pool_idx as usize).unwrap();
size_list[addr.packet_idx as usize] = Self::STORE_FREE;
block.fill(0);
Ok(())
}
fn has_element_at(&self, addr: &StoreAddr) -> Result<bool, StoreError> {
self.validate_addr(addr)?;
let pool_idx = addr.pool_idx as usize;
let size_list = self.sizes_lists.get(pool_idx).unwrap();
let curr_size = size_list[addr.packet_idx as usize];
if curr_size == Self::STORE_FREE {
return Ok(false);
}
Ok(true)
}
}
pub struct PoolGuard<'a> {
pool: &'a mut LocalPool,
pub addr: StoreAddr,
@ -418,7 +446,8 @@ impl<'a> PoolRwGuard<'a> {
#[cfg(test)]
mod tests {
use crate::pool::{
LocalPool, PoolCfg, PoolGuard, PoolRwGuard, StoreAddr, StoreError, StoreIdError,
LocalPool, PoolCfg, PoolGuard, PoolProvider, PoolRwGuard, StoreAddr, StoreError,
StoreIdError,
};
use std::vec;

View File

@ -15,7 +15,7 @@
//! ```
//! use std::sync::{Arc, RwLock};
//! use std::time::Duration;
//! use fsrc_core::pool::{LocalPool, PoolCfg};
//! use fsrc_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool};
//! use fsrc_core::pus::verification::{CrossbeamVerifSender, VerificationReporterCfg, VerificationReporterWithSender};
//! use spacepackets::ecss::PusPacket;
//! use spacepackets::SpHeader;
@ -26,7 +26,7 @@
//! const TEST_APID: u16 = 0x02;
//!
//! let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]);
//! let shared_tm_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg.clone())));
//! let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone()))));
//! let (verif_tx, verif_rx) = crossbeam_channel::bounded(10);
//! let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), verif_tx);
//! let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8);
@ -87,7 +87,10 @@ use spacepackets::{ByteConversionError, SizeMissmatch, SpHeader};
use spacepackets::{CcsdsPacket, PacketId, PacketSequenceCtrl};
#[cfg(feature = "std")]
pub use stdmod::{CrossbeamVerifSender, StdVerifSender, StdVerifSenderError};
pub use stdmod::{
CrossbeamVerifSender, MpscVerifSender, SharedStdVerifReporterWithSender,
StdVerifReporterWithSender, StdVerifSenderError,
};
/// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard
/// This field equivalent to the first two bytes of the CCSDS space packet header.
@ -711,11 +714,16 @@ impl<E: 'static> VerificationReporterWithSender<E> {
#[cfg(feature = "std")]
mod stdmod {
use crate::pool::{LocalPool, StoreAddr, StoreError};
use crate::pus::verification::{VerificationError, VerificationSender};
use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError};
use crate::pus::verification::{
VerificationError, VerificationReporterWithSender, VerificationSender,
};
use delegate::delegate;
use spacepackets::tm::PusTm;
use std::sync::{mpsc, Arc, RwLock, RwLockWriteGuard};
use std::sync::{mpsc, Arc, Mutex, RwLockWriteGuard};
pub type StdVerifReporterWithSender = VerificationReporterWithSender<StdVerifSenderError>;
pub type SharedStdVerifReporterWithSender = Arc<Mutex<StdVerifReporterWithSender>>;
#[derive(Debug, Eq, PartialEq)]
pub enum StdVerifSenderError {
@ -730,12 +738,12 @@ mod stdmod {
struct StdSenderBase<S> {
pub ignore_poison_error: bool,
tm_store: Arc<RwLock<LocalPool>>,
tm_store: SharedPool,
tx: S,
}
impl<S: SendBackend> StdSenderBase<S> {
pub fn new(tm_store: Arc<RwLock<LocalPool>>, tx: S) -> Self {
pub fn new(tm_store: SharedPool, tx: S) -> Self {
Self {
ignore_poison_error: false,
tm_store,
@ -744,20 +752,23 @@ mod stdmod {
}
}
unsafe impl<S: Sync> Sync for StdSenderBase<S> {}
unsafe impl<S: Send> Send for StdSenderBase<S> {}
impl SendBackend for mpsc::Sender<StoreAddr> {
fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr> {
self.send(addr).map_err(|_| addr)
}
}
pub struct StdVerifSender {
pub struct MpscVerifSender {
base: StdSenderBase<mpsc::Sender<StoreAddr>>,
}
/// Verification sender with a [mpsc::Sender] backend.
/// It implements the [VerificationSender] trait to be used as PUS Verification TM sender.
impl StdVerifSender {
pub fn new(tm_store: Arc<RwLock<LocalPool>>, tx: mpsc::Sender<StoreAddr>) -> Self {
impl MpscVerifSender {
pub fn new(tm_store: SharedPool, tx: mpsc::Sender<StoreAddr>) -> Self {
Self {
base: StdSenderBase::new(tm_store, tx),
}
@ -765,15 +776,15 @@ mod stdmod {
}
//noinspection RsTraitImplementation
impl VerificationSender<StdVerifSenderError> for StdVerifSender {
impl VerificationSender<StdVerifSenderError> for MpscVerifSender {
delegate!(
to self.base {
fn send_verification_tm(&mut self, tm: PusTm) -> Result<(), VerificationError<StdVerifSenderError>>;
}
);
}
unsafe impl Sync for StdVerifSender {}
unsafe impl Send for StdVerifSender {}
unsafe impl Sync for MpscVerifSender {}
unsafe impl Send for MpscVerifSender {}
impl SendBackend for crossbeam_channel::Sender<StoreAddr> {
fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr> {
@ -788,10 +799,7 @@ mod stdmod {
}
impl CrossbeamVerifSender {
pub fn new(
tm_store: Arc<RwLock<LocalPool>>,
tx: crossbeam_channel::Sender<StoreAddr>,
) -> Self {
pub fn new(tm_store: SharedPool, tx: crossbeam_channel::Sender<StoreAddr>) -> Self {
Self {
base: StdSenderBase::new(tm_store, tx),
}
@ -815,7 +823,7 @@ mod stdmod {
&mut self,
tm: PusTm,
) -> Result<(), VerificationError<StdVerifSenderError>> {
let operation = |mut mg: RwLockWriteGuard<LocalPool>| {
let operation = |mut mg: RwLockWriteGuard<ShareablePoolProvider>| {
let (addr, buf) = mg.free_element(tm.len_packed()).map_err(|e| {
VerificationError::SendError(StdVerifSenderError::StoreError(e))
})?;

View File

@ -1,4 +1,4 @@
use fsrc_core::pool::{LocalPool, PoolCfg, PoolGuard, StoreAddr};
use fsrc_core::pool::{LocalPool, PoolCfg, PoolGuard, PoolProvider, StoreAddr};
use std::ops::DerefMut;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};

View File

@ -1,4 +1,4 @@
use fsrc_core::pool::{LocalPool, PoolCfg};
use fsrc_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool};
use fsrc_core::pus::verification::{
CrossbeamVerifSender, FailParams, RequestId, VerificationReporterCfg,
VerificationReporterWithSender,
@ -28,7 +28,8 @@ fn test_shared_reporter() {
let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8);
// Shared pool object to store the verification PUS telemetry
let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]);
let shared_tm_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg.clone())));
let shared_tm_pool: SharedPool =
Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone()))));
let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg)));
let shared_tc_pool_1 = shared_tc_pool_0.clone();
let (tx, rx) = crossbeam_channel::bounded(5);