this works!
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit
This commit is contained in:
parent
0a21fcf23a
commit
c45eb1495c
@ -610,16 +610,17 @@ pub mod alloc_mod {
|
|||||||
|
|
||||||
/// Utility method which calls [Self::telecommands_to_release] and then calls a releaser
|
/// Utility method which calls [Self::telecommands_to_release] and then calls a releaser
|
||||||
/// closure for each telecommand which should be released. This function will also delete
|
/// closure for each telecommand which should be released. This function will also delete
|
||||||
/// the telecommands from the holding store after calling the release closure, if the scheduler
|
/// the telecommands from the holding store after calling the release closure if the user
|
||||||
/// is disabled.
|
/// returns [true] from the release closure.
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
///
|
///
|
||||||
/// * `releaser` - Closure where the first argument is whether the scheduler is enabled and
|
/// * `releaser` - Closure where the first argument is whether the scheduler is enabled and
|
||||||
/// the second argument is the telecommand information also containing the store address.
|
/// the second argument is the telecommand information also containing the store
|
||||||
/// This closure should return whether the command should be deleted if the scheduler is
|
/// address. This closure should return whether the command should be deleted. Please
|
||||||
/// disabled to prevent memory leaks.
|
/// note that returning false might lead to memory leaks if the TC is not cleared from
|
||||||
/// * `store` - The holding store of the telecommands.
|
/// the store in some other way.
|
||||||
|
/// * `tc_store` - The holding store of the telecommands.
|
||||||
pub fn release_telecommands<R: FnMut(bool, &TcInfo, &[u8]) -> bool>(
|
pub fn release_telecommands<R: FnMut(bool, &TcInfo, &[u8]) -> bool>(
|
||||||
&mut self,
|
&mut self,
|
||||||
mut releaser: R,
|
mut releaser: R,
|
||||||
@ -633,7 +634,7 @@ pub mod alloc_mod {
|
|||||||
let tc = tc_store.read(&info.addr).map_err(|e| (released_tcs, e))?;
|
let tc = tc_store.read(&info.addr).map_err(|e| (released_tcs, e))?;
|
||||||
let should_delete = releaser(self.enabled, info, tc);
|
let should_delete = releaser(self.enabled, info, tc);
|
||||||
released_tcs += 1;
|
released_tcs += 1;
|
||||||
if should_delete && !self.is_enabled() {
|
if should_delete {
|
||||||
let res = tc_store.delete(info.addr);
|
let res = tc_store.delete(info.addr);
|
||||||
if res.is_err() {
|
if res.is_err() {
|
||||||
store_error = res;
|
store_error = res;
|
||||||
@ -647,6 +648,32 @@ pub mod alloc_mod {
|
|||||||
.map_err(|e| (released_tcs, e))
|
.map_err(|e| (released_tcs, e))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This utility method is similar to [Self::release_telecommands] but will not perform
|
||||||
|
/// store deletions and thus does not require a mutable reference of the TC store.
|
||||||
|
///
|
||||||
|
/// It will returns a [Vec] of [TcInfo]s to transfer the list of released
|
||||||
|
/// telecommands to the user. The user should take care of deleting those telecommands
|
||||||
|
/// from the holding store to prevent memory leaks.
|
||||||
|
pub fn release_telecommands_no_deletion<R: FnMut(bool, &TcInfo, &[u8])>(
|
||||||
|
&mut self,
|
||||||
|
mut releaser: R,
|
||||||
|
tc_store: &(impl PoolProviderMemInPlace + ?Sized),
|
||||||
|
) -> Result<Vec<TcInfo>, (Vec<TcInfo>, StoreError)> {
|
||||||
|
let tcs_to_release = self.telecommands_to_release();
|
||||||
|
let mut released_tcs = Vec::new();
|
||||||
|
for tc in tcs_to_release {
|
||||||
|
for info in tc.1 {
|
||||||
|
let tc = tc_store
|
||||||
|
.read(&info.addr)
|
||||||
|
.map_err(|e| (released_tcs.clone(), e))?;
|
||||||
|
releaser(self.is_enabled(), info, tc);
|
||||||
|
released_tcs.push(*info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.tc_map.retain(|k, _| k > &self.current_time);
|
||||||
|
Ok(released_tcs)
|
||||||
|
}
|
||||||
|
|
||||||
/// Retrieve all telecommands which should be release based on the current time.
|
/// Retrieve all telecommands which should be release based on the current time.
|
||||||
pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec<TcInfo>> {
|
pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec<TcInfo>> {
|
||||||
self.tc_map.range(..=self.current_time)
|
self.tc_map.range(..=self.current_time)
|
||||||
|
@ -15,28 +15,18 @@ use spacepackets::time::cds::TimeProvider;
|
|||||||
/// telecommands when applicable.
|
/// telecommands when applicable.
|
||||||
pub struct PusService11SchedHandler<
|
pub struct PusService11SchedHandler<
|
||||||
TcInMemConverter: EcssTcInMemConverter,
|
TcInMemConverter: EcssTcInMemConverter,
|
||||||
MemPool: PoolProviderMemInPlace,
|
|
||||||
Scheduler: PusSchedulerInterface,
|
Scheduler: PusSchedulerInterface,
|
||||||
> {
|
> {
|
||||||
pub service_helper: PusServiceHelper<TcInMemConverter>,
|
pub service_helper: PusServiceHelper<TcInMemConverter>,
|
||||||
pub sched_tc_pool: MemPool,
|
|
||||||
scheduler: Scheduler,
|
scheduler: Scheduler,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<TcInMemConverter: EcssTcInMemConverter, Scheduler: PusSchedulerInterface>
|
||||||
TcInMemConverter: EcssTcInMemConverter,
|
PusService11SchedHandler<TcInMemConverter, Scheduler>
|
||||||
MemPool: PoolProviderMemInPlace,
|
|
||||||
Scheduler: PusSchedulerInterface,
|
|
||||||
> PusService11SchedHandler<TcInMemConverter, MemPool, Scheduler>
|
|
||||||
{
|
{
|
||||||
pub fn new(
|
pub fn new(service_helper: PusServiceHelper<TcInMemConverter>, scheduler: Scheduler) -> Self {
|
||||||
service_helper: PusServiceHelper<TcInMemConverter>,
|
|
||||||
sched_tc_pool: MemPool,
|
|
||||||
scheduler: Scheduler,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
Self {
|
||||||
service_helper,
|
service_helper,
|
||||||
sched_tc_pool,
|
|
||||||
scheduler,
|
scheduler,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -49,7 +39,10 @@ impl<
|
|||||||
&self.scheduler
|
&self.scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle_one_tc(&mut self) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
|
pub fn handle_one_tc(
|
||||||
|
&mut self,
|
||||||
|
sched_tc_pool: &mut (impl PoolProviderMemInPlace + ?Sized),
|
||||||
|
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
|
||||||
let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?;
|
let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?;
|
||||||
if possible_packet.is_none() {
|
if possible_packet.is_none() {
|
||||||
return Ok(PusPacketHandlerResult::Empty);
|
return Ok(PusPacketHandlerResult::Empty);
|
||||||
@ -124,7 +117,7 @@ impl<
|
|||||||
// let mut pool = self.shared_tc_store.write().expect("Locking pool failed");
|
// let mut pool = self.shared_tc_store.write().expect("Locking pool failed");
|
||||||
|
|
||||||
self.scheduler
|
self.scheduler
|
||||||
.reset(&mut self.sched_tc_pool)
|
.reset(sched_tc_pool)
|
||||||
.expect("Error resetting TC Pool");
|
.expect("Error resetting TC Pool");
|
||||||
|
|
||||||
self.service_helper
|
self.service_helper
|
||||||
@ -145,7 +138,7 @@ impl<
|
|||||||
|
|
||||||
// let mut pool = self.sched_tc_pool.write().expect("locking pool failed");
|
// let mut pool = self.sched_tc_pool.write().expect("locking pool failed");
|
||||||
self.scheduler
|
self.scheduler
|
||||||
.insert_wrapped_tc::<TimeProvider>(&tc, &mut self.sched_tc_pool)
|
.insert_wrapped_tc::<TimeProvider>(&tc, sched_tc_pool)
|
||||||
.expect("insertion of activity into pool failed");
|
.expect("insertion of activity into pool failed");
|
||||||
|
|
||||||
self.service_helper
|
self.service_helper
|
||||||
@ -175,7 +168,6 @@ impl<
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::pool::StaticMemoryPool;
|
use crate::pool::StaticMemoryPool;
|
||||||
use crate::pus::scheduler::RequestId as RequestIdSched;
|
|
||||||
use crate::{
|
use crate::{
|
||||||
events::EventU32,
|
events::EventU32,
|
||||||
pus::{
|
pus::{
|
||||||
|
@ -89,7 +89,7 @@ fn main() {
|
|||||||
let tc_store = TcStore {
|
let tc_store = TcStore {
|
||||||
pool: Arc::new(RwLock::new(tc_pool)),
|
pool: Arc::new(RwLock::new(tc_pool)),
|
||||||
};
|
};
|
||||||
let tc_sched_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![
|
let sched_tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![
|
||||||
(30, 32),
|
(30, 32),
|
||||||
(15, 64),
|
(15, 64),
|
||||||
(15, 128),
|
(15, 128),
|
||||||
@ -220,11 +220,11 @@ fn main() {
|
|||||||
verif_reporter.clone(),
|
verif_reporter.clone(),
|
||||||
EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048),
|
EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048),
|
||||||
),
|
),
|
||||||
tc_sched_pool,
|
|
||||||
scheduler,
|
scheduler,
|
||||||
);
|
);
|
||||||
let mut pus_11_wrapper = Pus11Wrapper {
|
let mut pus_11_wrapper = Pus11Wrapper {
|
||||||
pus_11_handler,
|
pus_11_handler,
|
||||||
|
sched_tc_pool,
|
||||||
tc_source_wrapper,
|
tc_source_wrapper,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1,30 +1,30 @@
|
|||||||
use crate::tmtc::PusTcSource;
|
use crate::tmtc::PusTcSource;
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool};
|
use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool};
|
||||||
use satrs_core::pus::scheduler::{PusScheduler, PusSchedulerInterface, TcInfo};
|
use satrs_core::pus::scheduler::{PusScheduler, TcInfo};
|
||||||
use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
|
use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
|
||||||
use satrs_core::pus::{EcssTcInSharedStoreConverter, PusPacketHandlerResult};
|
use satrs_core::pus::{EcssTcInSharedStoreConverter, PusPacketHandlerResult};
|
||||||
|
|
||||||
pub struct Pus11Wrapper {
|
pub struct Pus11Wrapper {
|
||||||
pub pus_11_handler:
|
pub pus_11_handler: PusService11SchedHandler<EcssTcInSharedStoreConverter, PusScheduler>,
|
||||||
PusService11SchedHandler<EcssTcInSharedStoreConverter, StaticMemoryPool, PusScheduler>,
|
pub sched_tc_pool: StaticMemoryPool,
|
||||||
pub tc_source_wrapper: PusTcSource,
|
pub tc_source_wrapper: PusTcSource,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Pus11Wrapper {
|
impl Pus11Wrapper {
|
||||||
pub fn release_tcs(&mut self) {
|
pub fn release_tcs(&mut self) {
|
||||||
let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool {
|
let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool {
|
||||||
// Transfer TC from scheduler TC pool to shared TC pool.
|
|
||||||
let released_tc_addr = self
|
|
||||||
.tc_source_wrapper
|
|
||||||
.tc_store
|
|
||||||
.pool
|
|
||||||
.write()
|
|
||||||
.expect("locking pool failed")
|
|
||||||
.add(tc)
|
|
||||||
.expect("adding TC to shared pool failed");
|
|
||||||
|
|
||||||
if enabled {
|
if enabled {
|
||||||
|
// Transfer TC from scheduler TC pool to shared TC pool.
|
||||||
|
let released_tc_addr = self
|
||||||
|
.tc_source_wrapper
|
||||||
|
.tc_store
|
||||||
|
.pool
|
||||||
|
.write()
|
||||||
|
.expect("locking pool failed")
|
||||||
|
.add(tc)
|
||||||
|
.expect("adding TC to shared pool failed");
|
||||||
|
|
||||||
self.tc_source_wrapper
|
self.tc_source_wrapper
|
||||||
.tc_source
|
.tc_source
|
||||||
.send(released_tc_addr)
|
.send(released_tc_addr)
|
||||||
@ -37,40 +37,18 @@ impl Pus11Wrapper {
|
|||||||
.scheduler_mut()
|
.scheduler_mut()
|
||||||
.update_time_from_now()
|
.update_time_from_now()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let sched_is_enabled = self.pus_11_handler.scheduler().is_enabled();
|
let released_tcs = self
|
||||||
|
.pus_11_handler
|
||||||
// We have to implement some boilerplate ourself, because the borrow checker falls over
|
.scheduler_mut()
|
||||||
// multiple borrows of the same object.
|
.release_telecommands(releaser, &mut self.sched_tc_pool)
|
||||||
let tcs_to_release = self.pus_11_handler.scheduler().telecommands_to_release();
|
.expect("releasing TCs failed");
|
||||||
let mut released_tcs = 0;
|
|
||||||
let mut tcs_to_delete = Vec::new();
|
|
||||||
for tc in tcs_to_release {
|
|
||||||
for info in tc.1 {
|
|
||||||
let tc = self
|
|
||||||
.pus_11_handler
|
|
||||||
.sched_tc_pool
|
|
||||||
.read(&info.addr())
|
|
||||||
.expect("reading pool failed");
|
|
||||||
let should_delete = releaser(sched_is_enabled, info, tc);
|
|
||||||
released_tcs += 1;
|
|
||||||
if should_delete && !sched_is_enabled {
|
|
||||||
tcs_to_delete.push(info.addr());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for addr in tcs_to_delete {
|
|
||||||
self.pus_11_handler
|
|
||||||
.sched_tc_pool
|
|
||||||
.delete(addr)
|
|
||||||
.expect("deleting TC from pool failed");
|
|
||||||
}
|
|
||||||
if released_tcs > 0 {
|
if released_tcs > 0 {
|
||||||
info!("{released_tcs} TC(s) released from scheduler");
|
info!("{released_tcs} TC(s) released from scheduler");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle_next_packet(&mut self) -> bool {
|
pub fn handle_next_packet(&mut self) -> bool {
|
||||||
match self.pus_11_handler.handle_one_tc() {
|
match self.pus_11_handler.handle_one_tc(&mut self.sched_tc_pool) {
|
||||||
Ok(result) => match result {
|
Ok(result) => match result {
|
||||||
PusPacketHandlerResult::RequestHandled => {}
|
PusPacketHandlerResult::RequestHandled => {}
|
||||||
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
|
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user