we can unify this trait again
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit

This commit is contained in:
Robin Müller 2024-02-02 11:27:39 +01:00
parent 76a84a4393
commit 74644380f9
Signed by: muellerr
GPG Key ID: A649FB78196E3849
10 changed files with 96 additions and 80 deletions

View File

@ -186,7 +186,9 @@ impl Error for StoreError {
}
}
pub trait MemPoolProvider {
/// Generic trait for pool providers where the data can be modified and read in-place. This
/// generally means that a shared pool structure has to be wrapped inside a lock structure.
pub trait PoolProviderMemInPlace {
/// Add new data to the pool. The provider should attempt to reserve a memory block with the
/// appropriate size and then copy the given data to the block. Yields a [StoreAddr] which can
/// be used to access the data stored in the pool
@ -215,11 +217,31 @@ pub trait MemPoolProvider {
}
Ok(self.read(addr)?.len())
}
/// This function behaves like [PoolProvider::read], but consumes the provided address
/// and returns a RAII conformant guard object.
///
/// Unless the guard [PoolRwGuard::release] method is called, the data for the
/// given address will be deleted automatically when the guard is dropped.
/// This can prevent memory leaks. Users can read the data and release the guard
/// if the data in the store is valid for further processing. If the data is faulty, no
/// manual deletion is necessary when returning from a processing function prematurely.
fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard<Self>;
/// This function behaves like [PoolProvider::modify], but consumes the provided address
/// and returns a RAII conformant guard object.
///
/// Unless the guard [PoolRwGuard::release] method is called, the data for the
/// given address will be deleted automatically when the guard is dropped.
/// This can prevent memory leaks. Users can read (and modify) the data and release the guard
/// if the data in the store is valid for further processing. If the data is faulty, no
/// manual deletion is necessary when returning from a processing function prematurely.
fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard<Self>;
}
#[cfg(feature = "alloc")]
mod alloc_mod {
use super::{MemPoolProvider, StaticPoolAddr};
use super::{PoolProviderMemInPlace, StaticPoolAddr};
use crate::pool::{NumBlocks, StoreAddr, StoreError, StoreIdError};
use alloc::vec;
use alloc::vec::Vec;
@ -263,7 +285,7 @@ mod alloc_mod {
}
}
pub struct PoolGuard<'a, MemProvider: MemPoolProvider> {
pub struct PoolGuard<'a, MemProvider: PoolProviderMemInPlace + ?Sized> {
pool: &'a mut MemProvider,
pub addr: StoreAddr,
no_deletion: bool,
@ -271,7 +293,7 @@ mod alloc_mod {
}
/// This helper object
impl<'a, MemProvider: MemPoolProvider> PoolGuard<'a, MemProvider> {
impl<'a, MemProvider: PoolProviderMemInPlace> PoolGuard<'a, MemProvider> {
pub fn new(pool: &'a mut MemProvider, addr: StoreAddr) -> Self {
Self {
pool,
@ -292,7 +314,7 @@ mod alloc_mod {
}
}
impl<MemProvider: MemPoolProvider> Drop for PoolGuard<'_, MemProvider> {
impl<MemProvider: PoolProviderMemInPlace + ?Sized> Drop for PoolGuard<'_, MemProvider> {
fn drop(&mut self) {
if !self.no_deletion {
if let Err(e) = self.pool.delete(self.addr) {
@ -302,11 +324,11 @@ mod alloc_mod {
}
}
pub struct PoolRwGuard<'a, MemProvider: MemPoolProvider> {
pub struct PoolRwGuard<'a, MemProvider: PoolProviderMemInPlace + ?Sized> {
guard: PoolGuard<'a, MemProvider>,
}
impl<'a, MemProvider: MemPoolProvider> PoolRwGuard<'a, MemProvider> {
impl<'a, MemProvider: PoolProviderMemInPlace> PoolRwGuard<'a, MemProvider> {
pub fn new(pool: &'a mut MemProvider, addr: StoreAddr) -> Self {
Self {
guard: PoolGuard::new(pool, addr),
@ -327,29 +349,6 @@ mod alloc_mod {
);
}
pub trait MemPoolProviderWithGuards: MemPoolProvider {
type MemProvider: MemPoolProvider;
/// This function behaves like [PoolProvider::read], but consumes the provided address
/// and returns a RAII conformant guard object.
///
/// Unless the guard [PoolRwGuard::release] method is called, the data for the
/// given address will be deleted automatically when the guard is dropped.
/// This can prevent memory leaks. Users can read the data and release the guard
/// if the data in the store is valid for further processing. If the data is faulty, no
/// manual deletion is necessary when returning from a processing function prematurely.
fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard<Self::MemProvider>;
/// This function behaves like [PoolProvider::modify], but consumes the provided address
/// and returns a RAII conformant guard object.
///
/// Unless the guard [PoolRwGuard::release] method is called, the data for the
/// given address will be deleted automatically when the guard is dropped.
/// This can prevent memory leaks. Users can read (and modify) the data and release the guard
/// if the data in the store is valid for further processing. If the data is faulty, no
/// manual deletion is necessary when returning from a processing function prematurely.
fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard<Self::MemProvider>;
}
/// Pool implementation providing sub-pools with fixed size memory blocks.
///
/// This is a simple memory pool implementation which pre-allocates all sub-pools using a given pool
@ -472,7 +471,7 @@ mod alloc_mod {
}
}
impl MemPoolProvider for StaticMemoryPool {
impl PoolProviderMemInPlace for StaticMemoryPool {
fn add(&mut self, data: &[u8]) -> Result<StoreAddr, StoreError> {
let data_len = data.len();
if data_len > POOL_MAX_SIZE {
@ -536,14 +535,11 @@ mod alloc_mod {
}
Ok(true)
}
}
impl MemPoolProviderWithGuards for StaticMemoryPool {
type MemProvider = StaticMemoryPool;
fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard<Self::MemProvider> {
fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard<Self> {
PoolRwGuard::new(self, addr)
}
fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard<Self::MemProvider> {
fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard<Self> {
PoolGuard::new(self, addr)
}
}
@ -552,8 +548,8 @@ mod alloc_mod {
#[cfg(test)]
mod tests {
use crate::pool::{
MemPoolProvider, MemPoolProviderWithGuards, PoolGuard, PoolRwGuard, StaticMemoryPool,
StaticPoolAddr, StaticPoolConfig, StoreError, StoreIdError, POOL_MAX_SIZE,
PoolGuard, PoolProviderMemInPlace, PoolRwGuard, StaticMemoryPool, StaticPoolAddr,
StaticPoolConfig, StoreError, StoreIdError, POOL_MAX_SIZE,
};
use std::vec;

View File

@ -369,7 +369,7 @@ mod alloc_mod {
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub mod std_mod {
use crate::pool::{MemPoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr};
use crate::pool::{PoolProviderMemInPlace, SharedStaticMemoryPool, StoreAddr};
use crate::pus::verification::{
StdVerifReporterWithSender, TcStateAccepted, VerificationToken,
};
@ -925,7 +925,8 @@ pub mod tests {
use spacepackets::CcsdsPacket;
use crate::pool::{
MemPoolProvider, SharedStaticMemoryPool, StaticMemoryPool, StaticPoolConfig, StoreAddr,
PoolProviderMemInPlace, SharedStaticMemoryPool, StaticMemoryPool, StaticPoolConfig,
StoreAddr,
};
use crate::pus::verification::RequestId;
use crate::tmtc::tm_helper::SharedTmStore;

View File

@ -14,7 +14,7 @@ use spacepackets::CcsdsPacket;
#[cfg(feature = "std")]
use std::error::Error;
use crate::pool::{MemPoolProvider, StoreError};
use crate::pool::{PoolProviderMemInPlace, StoreError};
#[cfg(feature = "alloc")]
pub use alloc_mod::*;
@ -220,7 +220,10 @@ impl Error for ScheduleError {}
pub trait PusSchedulerInterface {
type TimeProvider: CcsdsTimeProvider + TimeReader;
fn reset(&mut self, store: &mut (impl MemPoolProvider + ?Sized)) -> Result<(), StoreError>;
fn reset(
&mut self,
store: &mut (impl PoolProviderMemInPlace + ?Sized),
) -> Result<(), StoreError>;
fn is_enabled(&self) -> bool;
@ -243,7 +246,7 @@ pub trait PusSchedulerInterface {
fn insert_wrapped_tc<TimeProvider>(
&mut self,
pus_tc: &(impl IsPusTelecommand + PusPacket + GenericPusTcSecondaryHeader),
pool: &mut (impl MemPoolProvider + ?Sized),
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
) -> Result<TcInfo, ScheduleError> {
if PusPacket::service(pus_tc) != 11 {
return Err(ScheduleError::WrongService);
@ -267,7 +270,7 @@ pub trait PusSchedulerInterface {
&mut self,
time_stamp: UnixTimestamp,
tc: &[u8],
pool: &mut (impl MemPoolProvider + ?Sized),
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
) -> Result<TcInfo, ScheduleError> {
let check_tc = PusTcReader::new(tc)?;
if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 {
@ -289,7 +292,7 @@ pub trait PusSchedulerInterface {
#[cfg(feature = "alloc")]
pub mod alloc_mod {
use super::*;
use crate::pool::{MemPoolProvider, StoreAddr, StoreError};
use crate::pool::{PoolProviderMemInPlace, StoreAddr, StoreError};
use alloc::collections::btree_map::{Entry, Range};
use alloc::collections::BTreeMap;
use alloc::vec;
@ -409,7 +412,7 @@ pub mod alloc_mod {
&mut self,
time_stamp: UnixTimestamp,
tc: &[u8],
pool: &mut (impl MemPoolProvider + ?Sized),
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
) -> Result<TcInfo, ScheduleError> {
let check_tc = PusTcReader::new(tc)?;
if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 {
@ -432,7 +435,7 @@ pub mod alloc_mod {
pub fn insert_wrapped_tc_cds_short(
&mut self,
pus_tc: &PusTc,
pool: &mut (impl MemPoolProvider + ?Sized),
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
) -> Result<TcInfo, ScheduleError> {
self.insert_wrapped_tc::<cds::TimeProvider>(pus_tc, pool)
}
@ -442,7 +445,7 @@ pub mod alloc_mod {
pub fn insert_wrapped_tc_cds_long(
&mut self,
pus_tc: &PusTc,
pool: &mut (impl MemPoolProvider + ?Sized),
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
) -> Result<TcInfo, ScheduleError> {
self.insert_wrapped_tc::<cds::TimeProvider<DaysLen24Bits>>(pus_tc, pool)
}
@ -458,7 +461,7 @@ pub mod alloc_mod {
pub fn delete_by_time_filter<TimeProvider: CcsdsTimeProvider + Clone>(
&mut self,
time_window: TimeWindow<TimeProvider>,
pool: &mut (impl MemPoolProvider + ?Sized),
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
) -> Result<u64, (u64, StoreError)> {
let range = self.retrieve_by_time_filter(time_window);
let mut del_packets = 0;
@ -488,7 +491,7 @@ pub mod alloc_mod {
/// the last deletion will be supplied in addition to the number of deleted commands.
pub fn delete_all(
&mut self,
pool: &mut (impl MemPoolProvider + ?Sized),
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
) -> Result<u64, (u64, StoreError)> {
self.delete_by_time_filter(TimeWindow::<cds::TimeProvider>::new_select_all(), pool)
}
@ -535,7 +538,7 @@ pub mod alloc_mod {
/// called repeatedly.
pub fn delete_by_request_id(&mut self, req_id: &RequestId) -> Option<StoreAddr> {
if let DeletionResult::WithoutStoreDeletion(v) =
self.delete_by_request_id_internal(req_id, None::<&mut dyn MemPoolProvider>)
self.delete_by_request_id_internal_without_store_deletion(req_id)
{
return v;
}
@ -546,20 +549,19 @@ pub mod alloc_mod {
pub fn delete_by_request_id_and_from_pool(
&mut self,
req_id: &RequestId,
pool: &mut (impl MemPoolProvider + ?Sized),
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
) -> Result<bool, StoreError> {
if let DeletionResult::WithStoreDeletion(v) =
self.delete_by_request_id_internal(req_id, Some(pool))
self.delete_by_request_id_internal_with_store_deletion(req_id, pool)
{
return v;
}
panic!("unexpected deletion result");
}
fn delete_by_request_id_internal(
fn delete_by_request_id_internal_without_store_deletion(
&mut self,
req_id: &RequestId,
pool: Option<&mut (impl MemPoolProvider + ?Sized)>,
) -> DeletionResult {
let mut idx_found = None;
for time_bucket in &mut self.tc_map {
@ -570,20 +572,33 @@ pub mod alloc_mod {
}
if let Some(idx) = idx_found {
let addr = time_bucket.1.remove(idx).addr;
if let Some(pool) = pool {
return match pool.delete(addr) {
Ok(_) => DeletionResult::WithStoreDeletion(Ok(true)),
Err(e) => DeletionResult::WithStoreDeletion(Err(e)),
};
}
return DeletionResult::WithoutStoreDeletion(Some(addr));
}
}
if pool.is_none() {
DeletionResult::WithoutStoreDeletion(None)
} else {
DeletionResult::WithStoreDeletion(Ok(false))
DeletionResult::WithoutStoreDeletion(None)
}
fn delete_by_request_id_internal_with_store_deletion(
&mut self,
req_id: &RequestId,
pool: &mut (impl PoolProviderMemInPlace + ?Sized),
) -> DeletionResult {
let mut idx_found = None;
for time_bucket in &mut self.tc_map {
for (idx, tc_info) in time_bucket.1.iter().enumerate() {
if &tc_info.request_id == req_id {
idx_found = Some(idx);
}
}
if let Some(idx) = idx_found {
let addr = time_bucket.1.remove(idx).addr;
return match pool.delete(addr) {
Ok(_) => DeletionResult::WithStoreDeletion(Ok(true)),
Err(e) => DeletionResult::WithStoreDeletion(Err(e)),
};
}
}
DeletionResult::WithStoreDeletion(Ok(false))
}
#[cfg(feature = "std")]
@ -608,7 +623,7 @@ pub mod alloc_mod {
pub fn release_telecommands<R: FnMut(bool, &TcInfo, &[u8]) -> bool>(
&mut self,
mut releaser: R,
tc_store: &mut (impl MemPoolProvider + ?Sized),
tc_store: &mut (impl PoolProviderMemInPlace + ?Sized),
) -> Result<u64, (u64, StoreError)> {
let tcs_to_release = self.telecommands_to_release();
let mut released_tcs = 0;
@ -647,7 +662,10 @@ pub mod alloc_mod {
/// The holding store for the telecommands needs to be passed so all the stored telecommands
/// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error
/// will be returned but the method will still try to delete all the commands in the schedule.
fn reset(&mut self, store: &mut (impl MemPoolProvider + ?Sized)) -> Result<(), StoreError> {
fn reset(
&mut self,
store: &mut (impl PoolProviderMemInPlace + ?Sized),
) -> Result<(), StoreError> {
self.enabled = false;
let mut deletion_ok = Ok(());
for tc_lists in &mut self.tc_map {
@ -705,7 +723,8 @@ pub mod alloc_mod {
mod tests {
use super::*;
use crate::pool::{
MemPoolProvider, StaticMemoryPool, StaticPoolAddr, StaticPoolConfig, StoreAddr, StoreError,
PoolProviderMemInPlace, StaticMemoryPool, StaticPoolAddr, StaticPoolConfig, StoreAddr,
StoreError,
};
use alloc::collections::btree_map::Range;
use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader};

View File

@ -1,6 +1,6 @@
use super::scheduler::PusSchedulerInterface;
use super::{EcssTcInMemConverter, PusServiceBase, PusServiceHelper};
use crate::pool::MemPoolProvider;
use crate::pool::PoolProviderMemInPlace;
use crate::pus::{PusPacketHandlerResult, PusPacketHandlingError};
use spacepackets::ecss::{scheduling, PusPacket};
use spacepackets::time::cds::TimeProvider;
@ -15,7 +15,7 @@ use spacepackets::time::cds::TimeProvider;
/// telecommands when applicable.
pub struct PusService11SchedHandler<
TcInMemConverter: EcssTcInMemConverter,
MemPool: MemPoolProvider,
MemPool: PoolProviderMemInPlace,
Scheduler: PusSchedulerInterface,
> {
pub service_helper: PusServiceHelper<TcInMemConverter>,
@ -25,7 +25,7 @@ pub struct PusService11SchedHandler<
impl<
TcInMemConverter: EcssTcInMemConverter,
MemPool: MemPoolProvider,
MemPool: PoolProviderMemInPlace,
Scheduler: PusSchedulerInterface,
> PusService11SchedHandler<TcInMemConverter, MemPool, Scheduler>
{
@ -241,7 +241,7 @@ mod tests {
fn reset(
&mut self,
store: &mut (impl crate::pool::MemPoolProvider + ?Sized),
store: &mut (impl crate::pool::PoolProviderMemInPlace + ?Sized),
) -> Result<(), crate::pool::StoreError> {
self.reset_count += 1;
Ok(())

View File

@ -8,7 +8,9 @@ pub use std_mod::*;
#[cfg(feature = "std")]
pub mod std_mod {
use crate::pool::{MemPoolProvider, SharedStaticMemoryPool, StaticMemoryPool, StoreAddr};
use crate::pool::{
PoolProviderMemInPlace, SharedStaticMemoryPool, StaticMemoryPool, StoreAddr,
};
use crate::pus::EcssTmtcError;
use spacepackets::ecss::tm::PusTmCreator;
use spacepackets::ecss::WritablePusPacket;

View File

@ -1,9 +1,7 @@
#[cfg(feature = "crossbeam")]
pub mod crossbeam_test {
use hashbrown::HashMap;
use satrs_core::pool::{
MemPoolProvider, MemPoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig,
};
use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StaticPoolConfig};
use satrs_core::pus::verification::{
FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender,
};

View File

@ -29,7 +29,7 @@ use satrs_core::event_man::{
};
use satrs_core::events::EventU32;
use satrs_core::hk::HkRequest;
use satrs_core::pool::{MemPoolProvider, StaticMemoryPool, StaticPoolConfig};
use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StaticPoolConfig};
use satrs_core::pus::event_man::{
DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken,
PusEventDispatcher,

View File

@ -1,6 +1,6 @@
use crate::tmtc::PusTcSource;
use log::{error, info, warn};
use satrs_core::pool::{MemPoolProvider, StaticMemoryPool};
use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool};
use satrs_core::pus::scheduler::{PusScheduler, PusSchedulerInterface, TcInfo};
use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
use satrs_core::pus::{EcssTcInSharedStoreConverter, PusPacketHandlerResult};

View File

@ -5,7 +5,7 @@ use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError};
use thiserror::Error;
use crate::pus::PusReceiver;
use satrs_core::pool::{MemPoolProvider, SharedStaticMemoryPool, StoreAddr, StoreError};
use satrs_core::pool::{PoolProviderMemInPlace, SharedStaticMemoryPool, StoreAddr, StoreError};
use satrs_core::spacepackets::ecss::tc::PusTcReader;
use satrs_core::spacepackets::ecss::PusPacket;
use satrs_core::tmtc::tm_helper::SharedTmStore;

View File

@ -3,7 +3,7 @@ use std::{net::SocketAddr, sync::mpsc::Receiver};
use log::{info, warn};
use satrs_core::{
hal::std::udp_server::{ReceiveResult, UdpTcServer},
pool::{MemPoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr},
pool::{PoolProviderMemInPlace, SharedStaticMemoryPool, StoreAddr},
tmtc::CcsdsError,
};