Refactored pool abstraction
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good

- Redesigned PoolProvider and PoolProviderWithGuards to allow
  easer optimizations and increase flexbility
This commit is contained in:
2024-02-10 11:59:26 +01:00
parent 18a5095d0f
commit d017b9c179
12 changed files with 370 additions and 228 deletions

View File

@ -2,7 +2,7 @@ use std::sync::mpsc;
use std::time::Duration;
use log::{error, info, warn};
use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StoreAddr};
use satrs_core::pool::{PoolProvider, StaticMemoryPool, StoreAddr};
use satrs_core::pus::scheduler::{PusScheduler, TcInfo};
use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
use satrs_core::pus::verification::VerificationReporterWithSender;
@ -54,6 +54,7 @@ impl TcReleaser for mpsc::Sender<Vec<u8>> {
pub struct Pus11Wrapper<TcInMemConverter: EcssTcInMemConverter> {
pub pus_11_handler: PusService11SchedHandler<TcInMemConverter, PusScheduler>,
pub sched_tc_pool: StaticMemoryPool,
pub releaser_buf: [u8; 4096],
pub tc_releaser: Box<dyn TcReleaser + Send>,
}
@ -70,7 +71,11 @@ impl<TcInMemConverter: EcssTcInMemConverter> Pus11Wrapper<TcInMemConverter> {
let released_tcs = self
.pus_11_handler
.scheduler_mut()
.release_telecommands(releaser, &mut self.sched_tc_pool)
.release_telecommands_with_buffer(
releaser,
&mut self.sched_tc_pool,
&mut self.releaser_buf,
)
.expect("releasing TCs failed");
if released_tcs > 0 {
info!("{released_tcs} TC(s) released from scheduler");
@ -136,6 +141,7 @@ pub fn create_scheduler_service_static(
Pus11Wrapper {
pus_11_handler,
sched_tc_pool,
releaser_buf: [0; 4096],
tc_releaser: Box::new(tc_releaser),
}
}
@ -172,6 +178,7 @@ pub fn create_scheduler_service_dynamic(
Pus11Wrapper {
pus_11_handler,
sched_tc_pool,
releaser_buf: [0; 4096],
tc_releaser: Box::new(tc_source_sender),
}
}

View File

@ -3,8 +3,9 @@ use std::{
sync::mpsc::{Receiver, Sender},
};
use log::info;
use satrs_core::{
pool::{PoolProviderMemInPlace, StoreAddr},
pool::{PoolProvider, StoreAddr},
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
spacepackets::{
ecss::{tm::PusTmZeroCopyWriter, PusPacket},
@ -63,9 +64,14 @@ impl TmFunnelCommon {
*entry += 1;
}
Self::packet_printout(&zero_copy_writer);
// This operation has to come last!
zero_copy_writer.finish();
}
fn packet_printout(tm: &PusTmZeroCopyWriter) {
info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice());
}
}
pub struct TmFunnelStatic {
@ -96,16 +102,21 @@ impl TmFunnelStatic {
// the CRC.
let shared_pool = self.shared_tm_store.clone_backing_pool();
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
let tm_raw = pool_guard
.modify(&addr)
let mut tm_copy = Vec::new();
pool_guard
.modify(&addr, |buf| {
let zero_copy_writer = PusTmZeroCopyWriter::new(buf, MIN_CDS_FIELD_LEN)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
tm_copy = buf.to_vec()
})
.expect("Reading TM from pool failed");
let zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw, MIN_CDS_FIELD_LEN)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
self.tm_server_tx
.send(addr)
.expect("Sending TM to server failed");
self.common.sync_tm_tcp_source.add_tm(tm_raw);
// We could also do this step in the update closure, but I'd rather avoid this, could
// lead to nested locking.
self.common.sync_tm_tcp_source.add_tm(&tm_copy);
}
}
}

View File

@ -5,7 +5,7 @@ use std::sync::mpsc::{self, Receiver, SendError, Sender, TryRecvError};
use thiserror::Error;
use crate::pus::PusReceiver;
use satrs_core::pool::{PoolProviderMemInPlace, SharedStaticMemoryPool, StoreAddr, StoreError};
use satrs_core::pool::{PoolProvider, SharedStaticMemoryPool, StoreAddr, StoreError};
use satrs_core::spacepackets::ecss::tc::PusTcReader;
use satrs_core::spacepackets::ecss::PusPacket;
use satrs_core::tmtc::ReceivesCcsdsTc;
@ -28,8 +28,9 @@ pub struct SharedTcPool {
impl SharedTcPool {
pub fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result<StoreAddr, StoreError> {
let mut pg = self.pool.write().expect("error locking TC store");
let (addr, buf) = pg.free_element(pus_tc.len_packed())?;
buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data());
let addr = pg.free_element(pus_tc.len_packed(), |buf| {
buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data());
})?;
Ok(addr)
}
}
@ -125,8 +126,8 @@ impl TcSourceTaskStatic {
.pool
.read()
.expect("locking tc pool failed");
let data = pool.read(&addr).expect("reading pool failed");
self.tc_buf[0..data.len()].copy_from_slice(data);
pool.read(&addr, &mut self.tc_buf)
.expect("reading pool failed");
drop(pool);
match PusTcReader::new(&self.tc_buf) {
Ok((pus_tc, _)) => {

View File

@ -6,7 +6,7 @@ use std::{
use log::{info, warn};
use satrs_core::{
hal::std::udp_server::{ReceiveResult, UdpTcServer},
pool::{PoolProviderMemInPlaceWithGuards, SharedStaticMemoryPool, StoreAddr},
pool::{PoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr},
tmtc::CcsdsError,
};
@ -29,20 +29,13 @@ impl UdpTmHandler for StaticUdpTmHandler {
}
let mut store_lock = store_lock.unwrap();
let pg = store_lock.read_with_guard(addr);
let read_res = pg.read();
let read_res = pg.read_as_vec();
if read_res.is_err() {
warn!("Error reading TM pool data");
continue;
}
let buf = read_res.unwrap();
if buf.len() > 9 {
let service = buf[7];
let subservice = buf[8];
info!("Sending PUS TM[{service},{subservice}]")
} else {
info!("Sending PUS TM");
}
let result = socket.send_to(buf, recv_addr);
let result = socket.send_to(&buf, recv_addr);
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}