zero copy TM funnel
This commit is contained in:
parent
f34ef841bf
commit
8210e01615
@ -64,7 +64,7 @@ optional = true
|
||||
# version = "0.5.4"
|
||||
# path = "../spacepackets"
|
||||
git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
|
||||
rev = "28cd8c02ac0"
|
||||
rev = "4485ed26699d32"
|
||||
default-features = false
|
||||
|
||||
[dev-dependencies]
|
||||
|
@ -233,7 +233,7 @@ impl TryFrom<TcStateToken> for VerificationToken<TcStateAccepted> {
|
||||
if let TcStateToken::Accepted(token) = value {
|
||||
Ok(token)
|
||||
} else {
|
||||
return Err(());
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6,7 +6,6 @@ mod requests;
|
||||
mod tmtc;
|
||||
|
||||
use log::{info, warn};
|
||||
use std::collections::hash_map::Entry;
|
||||
|
||||
use crate::hk::AcsHkIds;
|
||||
use crate::logging::setup_logger;
|
||||
@ -38,7 +37,7 @@ use satrs_core::pus::verification::{
|
||||
};
|
||||
use satrs_core::pus::MpscTmtcInStoreSender;
|
||||
use satrs_core::seq_count::{SeqCountProviderSimple, SequenceCountProviderCore};
|
||||
use satrs_core::spacepackets::ecss::{PusPacket, SerializablePusPacket};
|
||||
use satrs_core::spacepackets::tm::PusTmZeroCopyWriter;
|
||||
use satrs_core::spacepackets::{
|
||||
time::cds::TimeProvider,
|
||||
time::TimeWriter,
|
||||
@ -213,38 +212,35 @@ fn main() {
|
||||
let jh1 = thread::Builder::new()
|
||||
.name("TM Funnel".to_string())
|
||||
.spawn(move || {
|
||||
let mut tm_buf: [u8; 2048] = [0; 2048];
|
||||
let tm_funnel = TmFunnel {
|
||||
tm_server_tx,
|
||||
tm_funnel_rx,
|
||||
};
|
||||
loop {
|
||||
if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() {
|
||||
// Read the TM, set sequence counter and message counter, and finally write
|
||||
// it back with the updated CRC.
|
||||
// We could theoretically manipulate the counters and the CRC directly
|
||||
// in place as an optimization, but I don't think this is necessary..
|
||||
// Read the TM, set sequence counter and message counter, and finally update
|
||||
// the CRC.
|
||||
let shared_pool = tm_store.backing_pool();
|
||||
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
|
||||
let tm_raw = pool_guard
|
||||
.modify(&addr)
|
||||
.expect("Reading TM from pool failed");
|
||||
tm_buf[0..tm_raw.len()].copy_from_slice(&tm_raw);
|
||||
let (mut tm, size) =
|
||||
PusTm::from_bytes(&tm_buf, 7).expect("Creating TM from raw slice failed");
|
||||
tm.sp_header.set_apid(PUS_APID);
|
||||
tm.sp_header
|
||||
.set_seq_count(seq_count_provider.get_and_increment());
|
||||
let entry = msg_counter_map.entry(tm.service()).or_insert(0);
|
||||
tm.sec_header.msg_counter = *entry;
|
||||
let mut zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw)
|
||||
.expect("Creating TM zero copy writer failed");
|
||||
zero_copy_writer.set_apid(PUS_APID);
|
||||
zero_copy_writer.set_seq_count(seq_count_provider.get_and_increment());
|
||||
let entry = msg_counter_map
|
||||
.entry(zero_copy_writer.service())
|
||||
.or_insert(0);
|
||||
zero_copy_writer.set_msg_count(*entry);
|
||||
if *entry == u16::MAX {
|
||||
*entry = 0;
|
||||
} else {
|
||||
*entry += 1;
|
||||
}
|
||||
tm.calc_crc_on_serialization = true;
|
||||
tm.write_to_bytes(tm_raw)
|
||||
.expect("Writing PUS TM back failed");
|
||||
|
||||
// This operation has to come last!
|
||||
zero_copy_writer.finish();
|
||||
tm_funnel
|
||||
.tm_server_tx
|
||||
.send(addr)
|
||||
|
@ -1,6 +1,5 @@
|
||||
use crate::tmtc::PusTcSource;
|
||||
use log::{error, info, warn};
|
||||
use satrs_core::pool::{SharedPool, StoreAddr};
|
||||
use satrs_core::pus::scheduler::TcInfo;
|
||||
use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
|
||||
use satrs_core::pus::{PusPacketHandlerResult, PusServiceHandler};
|
||||
|
Loading…
Reference in New Issue
Block a user