continue example update
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good

This commit is contained in:
Robin Müller 2024-02-07 12:09:39 +01:00
parent ccb8bdbb95
commit d4a69a77e9
Signed by: muellerr
GPG Key ID: A649FB78196E3849
8 changed files with 269 additions and 85 deletions

View File

@ -73,10 +73,11 @@ features = ["all"]
optional = true
[dependencies.spacepackets]
version = "0.8.1"
version = "0.9.0"
default-features = false
# git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
# rev = "297cfad22637d3b07a1b27abe56d9a607b5b82a7"
branch = "main"
[dependencies.cobs]
git = "https://github.com/robamu/cobs.rs.git"

View File

@ -1,10 +1,9 @@
use std::sync::mpsc::{self, TryRecvError};
use log::{info, warn};
use satrs_core::pool::StoreAddr;
use satrs_core::pus::verification::VerificationReporterWithSender;
use satrs_core::pus::{EcssTmSender, PusTmWrapper};
use satrs_core::spacepackets::ecss::hk::Subservice as HkSubservice;
use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::{
hk::HkRequest,
spacepackets::{
@ -24,26 +23,23 @@ use crate::{
pub struct AcsTask {
timestamp: [u8; 7],
time_provider: TimeProvider<DaysLen16Bits>,
tm_store: SharedTmStore,
tm_funnel: mpsc::Sender<StoreAddr>,
request_rx: mpsc::Receiver<RequestWithToken>,
verif_reporter: VerificationReporterWithSender,
tm_sender: Box<dyn EcssTmSender>,
request_rx: mpsc::Receiver<RequestWithToken>,
}
impl AcsTask {
pub fn new(
tm_store: SharedTmStore,
tm_funnel: mpsc::Sender<StoreAddr>,
tm_sender: impl EcssTmSender,
request_rx: mpsc::Receiver<RequestWithToken>,
verif_reporter: VerificationReporterWithSender,
) -> Self {
Self {
timestamp: [0; 7],
time_provider: TimeProvider::new_with_u16_days(0, 0),
tm_store,
tm_funnel,
request_rx,
verif_reporter,
tm_sender: Box::new(tm_sender),
request_rx,
}
}
@ -60,11 +56,9 @@ impl AcsTask {
let hk_id = HkUniqueId::new(target_id, unique_id);
hk_id.write_to_be_bytes(&mut buf).unwrap();
let pus_tm = PusTmCreator::new(&mut sp_header, sec_header, &buf, true);
let addr = self
.tm_store
.add_pus_tm(&pus_tm)
.expect("Adding PUS TM failed");
self.tm_funnel.send(addr).expect("Sending HK TM failed");
self.tm_sender
.send_tm(PusTmWrapper::Direct(pus_tm))
.expect("Sending HK TM failed");
}
// TODO: Verification failure for invalid unique IDs.
}

View File

@ -145,6 +145,7 @@ pub enum TmSenderId {
PusAction = 4,
PusSched = 5,
AllEvents = 6,
AcsSubsystem = 7,
}
#[derive(Copy, Clone, PartialEq, Eq)]

View File

@ -12,12 +12,12 @@ mod udp;
use crate::events::EventHandler;
use crate::pus::stack::PusStack;
use crate::tm_funnel::TmFunnel;
use crate::tm_funnel::{TmFunnelDynamic, TmFunnelStatic};
use log::info;
use pus::test::create_test_service_dynamic;
use satrs_core::hal::std::tcp_server::ServerConfig;
use satrs_core::hal::std::udp_server::UdpTcServer;
use tmtc::PusTcSourceDynamic;
use tmtc::PusTcSourceProviderDynamic;
use udp::DynamicUdpTmHandler;
use crate::acs::AcsTask;
@ -26,14 +26,13 @@ use crate::logger::setup_logger;
use crate::pus::action::create_action_service;
use crate::pus::event::create_event_service;
use crate::pus::hk::create_hk_service;
use crate::pus::scheduler::create_scheduler_service;
use crate::pus::scheduler::create_scheduler_service_static;
use crate::pus::test::create_test_service_static;
use crate::pus::{PusReceiver, PusTcMpscRouter};
use crate::requests::RequestWithToken;
use crate::tcp::{SyncTcpTmSource, TcpTask};
use crate::tmtc::{
MpscStoreAndSendError, PusTcSourceStaticPool, SharedTcPool, TcArgs, TmArgs, TmtcTaskDynamic,
TmtcTaskStatic,
PusTcSourceProviderSharedPool, SharedTcPool, TcArgs, TmArgs, TmtcTaskDynamic, TmtcTaskStatic,
};
use crate::udp::{StaticUdpTmHandler, UdpTmtcServer};
use satrs_core::pool::{StaticMemoryPool, StaticPoolConfig};
@ -117,8 +116,8 @@ fn static_tmtc_pool_main() {
let mut request_map = HashMap::new();
request_map.insert(acs_target_id, acs_thread_tx);
let tc_source_wrapper = PusTcSourceStaticPool {
tc_store: shared_tc_pool.clone(),
let tc_source_wrapper = PusTcSourceProviderSharedPool {
shared_pool: shared_tc_pool.clone(),
tc_source: tc_source_tx,
};
@ -171,7 +170,7 @@ fn static_tmtc_pool_main() {
event_handler.clone_event_sender(),
pus_test_rx,
);
let pus_scheduler_service = create_scheduler_service(
let pus_scheduler_service = create_scheduler_service_static(
shared_tm_store.clone(),
tm_funnel_tx.clone(),
verif_reporter.clone(),
@ -242,13 +241,17 @@ fn static_tmtc_pool_main() {
.expect("tcp server creation failed");
let mut acs_task = AcsTask::new(
MpscTmInStoreSender::new(
TmSenderId::AcsSubsystem as ChannelId,
"ACS_TASK_SENDER",
shared_tm_store.clone(),
tm_funnel_tx.clone(),
),
acs_thread_rx,
verif_reporter,
);
let mut tm_funnel = TmFunnel::new(
let mut tm_funnel = TmFunnelStatic::new(
shared_tm_store,
sync_tm_tcp_source,
tm_funnel_rx,
@ -343,7 +346,7 @@ fn dyn_tmtc_pool_main() {
let mut request_map = HashMap::new();
request_map.insert(acs_target_id, acs_thread_tx);
let tc_source = PusTcSourceDynamic {
let tc_source = PusTcSourceProviderDynamic {
tc_source: tc_source_tx,
};
@ -411,6 +414,17 @@ fn dyn_tmtc_pool_main() {
)
.expect("tcp server creation failed");
let mut acs_task = AcsTask::new(
MpscTmAsVecSender::new(
TmSenderId::AcsSubsystem as ChannelId,
"ACS_TASK_SENDER",
tm_funnel_tx.clone(),
),
acs_thread_rx,
verif_reporter,
);
let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx);
info!("Starting TMTC and UDP task");
let jh_udp_tmtc = thread::Builder::new()
.name("TMTC and UDP".to_string())
@ -434,6 +448,51 @@ fn dyn_tmtc_pool_main() {
}
})
.unwrap();
info!("Starting TM funnel task");
let jh1 = thread::Builder::new()
.name("TM Funnel".to_string())
.spawn(move || loop {
tm_funnel.operation();
})
.unwrap();
info!("Starting event handling task");
let jh2 = thread::Builder::new()
.name("Event".to_string())
.spawn(move || loop {
event_handler.periodic_operation();
thread::sleep(Duration::from_millis(400));
})
.unwrap();
info!("Starting AOCS thread");
let jh3 = thread::Builder::new()
.name("AOCS".to_string())
.spawn(move || loop {
acs_task.periodic_operation();
thread::sleep(Duration::from_millis(500));
})
.unwrap();
info!("Starting PUS handler thread");
let jh4 = thread::Builder::new()
.name("PUS".to_string())
.spawn(move || loop {
// pus_stack.periodic_operation();
thread::sleep(Duration::from_millis(200));
})
.unwrap();
jh_udp_tmtc
.join()
.expect("Joining UDP TMTC server thread failed");
jh_tcp
.join()
.expect("Joining TCP TMTC server thread failed");
jh1.join().expect("Joining TM Funnel thread failed");
jh2.join().expect("Joining Event Manager thread failed");
jh3.join().expect("Joining AOCS thread failed");
jh4.join().expect("Joining PUS handler thread failed");
}
fn main() {

View File

@ -7,29 +7,29 @@ use satrs_core::pus::scheduler::{PusScheduler, TcInfo};
use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
use satrs_core::pus::verification::VerificationReporterWithSender;
use satrs_core::pus::{
EcssTcAndToken, EcssTcInSharedStoreConverter, MpscTcReceiver, MpscTmInStoreSender,
PusPacketHandlerResult, PusServiceHelper,
EcssTcAndToken, EcssTcInSharedStoreConverter, EcssTcInVecConverter, MpscTcReceiver,
MpscTmInStoreSender, PusPacketHandlerResult, PusServiceHelper,
};
use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::ChannelId;
use satrs_example::{TcReceiverId, TmSenderId, PUS_APID};
use crate::tmtc::PusTcSourceStaticPool;
use crate::tmtc::{PusTcSourceProviderDynamic, PusTcSourceProviderSharedPool};
pub struct Pus11Wrapper {
pub struct Pus11WrapperStatic {
pub pus_11_handler: PusService11SchedHandler<EcssTcInSharedStoreConverter, PusScheduler>,
pub sched_tc_pool: StaticMemoryPool,
pub tc_source_wrapper: PusTcSourceStaticPool,
pub tc_source_wrapper: PusTcSourceProviderSharedPool,
}
pub fn create_scheduler_service(
pub fn create_scheduler_service_static(
shared_tm_store: SharedTmStore,
tm_funnel_tx: mpsc::Sender<StoreAddr>,
verif_reporter: VerificationReporterWithSender,
tc_source_wrapper: PusTcSourceStaticPool,
tc_source_wrapper: PusTcSourceProviderSharedPool,
pus_sched_rx: mpsc::Receiver<EcssTcAndToken>,
sched_tc_pool: StaticMemoryPool,
) -> Pus11Wrapper {
) -> Pus11WrapperStatic {
let sched_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusSched as ChannelId,
"PUS_11_TM_SENDER",
@ -53,21 +53,21 @@ pub fn create_scheduler_service(
),
scheduler,
);
Pus11Wrapper {
Pus11WrapperStatic {
pus_11_handler,
sched_tc_pool,
tc_source_wrapper,
}
}
impl Pus11Wrapper {
impl Pus11WrapperStatic {
pub fn release_tcs(&mut self) {
let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool {
if enabled {
// Transfer TC from scheduler TC pool to shared TC pool.
let released_tc_addr = self
.tc_source_wrapper
.tc_store
.shared_pool
.pool
.write()
.expect("locking pool failed")
@ -120,3 +120,61 @@ impl Pus11Wrapper {
false
}
}
pub struct Pus11WrapperDynamic {
pub pus_11_handler: PusService11SchedHandler<EcssTcInVecConverter, PusScheduler>,
pub sched_tc_pool: StaticMemoryPool,
pub tc_source_wrapper: PusTcSourceProviderDynamic,
}
impl Pus11WrapperDynamic {
pub fn release_tcs(&mut self) {
let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool {
if enabled {
// Send released TC to centralized TC source.
self.tc_source_wrapper
.tc_source
.send(tc.to_vec())
.expect("sending TC to TC source failed");
}
true
};
self.pus_11_handler
.scheduler_mut()
.update_time_from_now()
.unwrap();
let released_tcs = self
.pus_11_handler
.scheduler_mut()
.release_telecommands(releaser, &mut self.sched_tc_pool)
.expect("releasing TCs failed");
if released_tcs > 0 {
info!("{released_tcs} TC(s) released from scheduler");
}
}
pub fn handle_next_packet(&mut self) -> bool {
match self.pus_11_handler.handle_one_tc(&mut self.sched_tc_pool) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS11 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS11 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS11: Subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => {
return true;
}
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
}
}
false
}
}

View File

@ -1,7 +1,7 @@
use satrs_core::pus::EcssTcInSharedStoreConverter;
use super::{
action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11Wrapper,
action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11WrapperStatic,
test::Service17CustomWrapper,
};
@ -9,7 +9,7 @@ pub struct PusStack {
event_srv: Pus5Wrapper,
hk_srv: Pus3Wrapper,
action_srv: Pus8Wrapper,
schedule_srv: Pus11Wrapper,
schedule_srv: Pus11WrapperStatic,
test_srv: Service17CustomWrapper<EcssTcInSharedStoreConverter>,
}
@ -18,7 +18,7 @@ impl PusStack {
hk_srv: Pus3Wrapper,
event_srv: Pus5Wrapper,
action_srv: Pus8Wrapper,
schedule_srv: Pus11Wrapper,
schedule_srv: Pus11WrapperStatic,
test_srv: Service17CustomWrapper<EcssTcInSharedStoreConverter>,
) -> Self {
Self {

View File

@ -6,23 +6,76 @@ use std::{
use satrs_core::{
pool::{PoolProviderMemInPlace, StoreAddr},
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
spacepackets::ecss::tm::PusTmZeroCopyWriter,
spacepackets::{
ecss::{tm::PusTmZeroCopyWriter, PusPacket},
time::cds::MIN_CDS_FIELD_LEN,
CcsdsPacket,
},
tmtc::tm_helper::SharedTmStore,
};
use satrs_example::PUS_APID;
use crate::tcp::SyncTcpTmSource;
pub struct TmFunnel {
shared_tm_store: SharedTmStore,
seq_count_provider: CcsdsSimpleSeqCountProvider,
sync_tm_tcp_source: SyncTcpTmSource,
#[derive(Default)]
pub struct CcsdsSeqCounterMap {
apid_seq_counter_map: HashMap<u16, CcsdsSimpleSeqCountProvider>,
}
impl CcsdsSeqCounterMap {
pub fn get_and_increment(&mut self, apid: u16) -> u16 {
self.apid_seq_counter_map
.entry(apid)
.or_default()
.get_and_increment()
}
}
pub struct TmFunnelCommon {
seq_counter_map: CcsdsSeqCounterMap,
msg_counter_map: HashMap<u8, u16>,
sync_tm_tcp_source: SyncTcpTmSource,
}
impl TmFunnelCommon {
pub fn new(sync_tm_tcp_source: SyncTcpTmSource) -> Self {
Self {
seq_counter_map: Default::default(),
msg_counter_map: Default::default(),
sync_tm_tcp_source,
}
}
// Applies common packet processing operations for PUS TM packets. This includes setting
// a sequence counter
fn apply_packet_processing(&mut self, mut zero_copy_writer: PusTmZeroCopyWriter) {
// zero_copy_writer.set_apid(PUS_APID);
zero_copy_writer.set_seq_count(
self.seq_counter_map
.get_and_increment(zero_copy_writer.apid()),
);
let entry = self
.msg_counter_map
.entry(zero_copy_writer.service())
.or_insert(0);
zero_copy_writer.set_msg_count(*entry);
if *entry == u16::MAX {
*entry = 0;
} else {
*entry += 1;
}
// This operation has to come last!
zero_copy_writer.finish();
}
}
pub struct TmFunnelStatic {
common: TmFunnelCommon,
shared_tm_store: SharedTmStore,
tm_funnel_rx: Receiver<StoreAddr>,
tm_server_tx: Sender<StoreAddr>,
}
impl TmFunnel {
impl TmFunnelStatic {
pub fn new(
shared_tm_store: SharedTmStore,
sync_tm_tcp_source: SyncTcpTmSource,
@ -30,10 +83,8 @@ impl TmFunnel {
tm_server_tx: Sender<StoreAddr>,
) -> Self {
Self {
common: TmFunnelCommon::new(sync_tm_tcp_source),
shared_tm_store,
seq_count_provider: CcsdsSimpleSeqCountProvider::new(),
msg_counter_map: HashMap::new(),
sync_tm_tcp_source,
tm_funnel_rx,
tm_server_tx,
}
@ -48,27 +99,47 @@ impl TmFunnel {
let tm_raw = pool_guard
.modify(&addr)
.expect("Reading TM from pool failed");
let mut zero_copy_writer =
PusTmZeroCopyWriter::new(tm_raw).expect("Creating TM zero copy writer failed");
zero_copy_writer.set_apid(PUS_APID);
zero_copy_writer.set_seq_count(self.seq_count_provider.get_and_increment());
let entry = self
.msg_counter_map
.entry(zero_copy_writer.service())
.or_insert(0);
zero_copy_writer.set_msg_count(*entry);
if *entry == u16::MAX {
*entry = 0;
} else {
*entry += 1;
}
// This operation has to come last!
zero_copy_writer.finish();
let zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw, MIN_CDS_FIELD_LEN)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
self.tm_server_tx
.send(addr)
.expect("Sending TM to server failed");
self.sync_tm_tcp_source.add_tm(tm_raw);
self.common.sync_tm_tcp_source.add_tm(tm_raw);
}
}
}
pub struct TmFunnelDynamic {
common: TmFunnelCommon,
tm_funnel_rx: Receiver<Vec<u8>>,
tm_server_tx: Sender<Vec<u8>>,
}
impl TmFunnelDynamic {
pub fn new(
sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: Receiver<Vec<u8>>,
tm_server_tx: Sender<Vec<u8>>,
) -> Self {
Self {
common: TmFunnelCommon::new(sync_tm_tcp_source),
tm_funnel_rx,
tm_server_tx,
}
}
pub fn operation(&mut self) {
if let Ok(mut tm) = self.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.
let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm, MIN_CDS_FIELD_LEN)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
self.tm_server_tx
.send(tm.clone())
.expect("Sending TM to server failed");
self.common.sync_tm_tcp_source.add_tm(&tm);
}
}
}

View File

@ -18,13 +18,13 @@ pub struct TmArgs {
}
pub struct TcArgs {
pub tc_source: PusTcSourceStaticPool,
pub tc_source: PusTcSourceProviderSharedPool,
pub tc_receiver: Receiver<StoreAddr>,
}
impl TcArgs {
#[allow(dead_code)]
fn split(self) -> (PusTcSourceStaticPool, Receiver<StoreAddr>) {
fn split(self) -> (PusTcSourceProviderSharedPool, Receiver<StoreAddr>) {
(self.tc_source, self.tc_receiver)
}
}
@ -54,33 +54,33 @@ impl SharedTcPool {
}
#[derive(Clone)]
pub struct PusTcSourceStaticPool {
pub struct PusTcSourceProviderSharedPool {
pub tc_source: Sender<StoreAddr>,
pub tc_store: SharedTcPool,
pub shared_pool: SharedTcPool,
}
impl PusTcSourceStaticPool {
impl PusTcSourceProviderSharedPool {
#[allow(dead_code)]
pub fn clone_backing_pool(&self) -> SharedStaticMemoryPool {
self.tc_store.pool.clone()
self.shared_pool.pool.clone()
}
}
impl ReceivesEcssPusTc for PusTcSourceStaticPool {
impl ReceivesEcssPusTc for PusTcSourceProviderSharedPool {
type Error = MpscStoreAndSendError;
fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
let addr = self.tc_store.add_pus_tc(pus_tc)?;
let addr = self.shared_pool.add_pus_tc(pus_tc)?;
self.tc_source.send(addr)?;
Ok(())
}
}
impl ReceivesCcsdsTc for PusTcSourceStaticPool {
impl ReceivesCcsdsTc for PusTcSourceProviderSharedPool {
type Error = MpscStoreAndSendError;
fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
let mut pool = self.tc_store.pool.write().expect("locking pool failed");
let mut pool = self.shared_pool.pool.write().expect("locking pool failed");
let addr = pool.add(tc_raw)?;
drop(pool);
self.tc_source.send(addr)?;
@ -89,11 +89,11 @@ impl ReceivesCcsdsTc for PusTcSourceStaticPool {
}
#[derive(Clone)]
pub struct PusTcSourceDynamic {
pub struct PusTcSourceProviderDynamic {
pub tc_source: Sender<Vec<u8>>,
}
impl ReceivesEcssPusTc for PusTcSourceDynamic {
impl ReceivesEcssPusTc for PusTcSourceProviderDynamic {
type Error = SendError<Vec<u8>>;
fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
@ -102,7 +102,7 @@ impl ReceivesEcssPusTc for PusTcSourceDynamic {
}
}
impl ReceivesCcsdsTc for PusTcSourceDynamic {
impl ReceivesCcsdsTc for PusTcSourceProviderDynamic {
type Error = mpsc::SendError<Vec<u8>>;
fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
@ -136,7 +136,7 @@ impl TmtcTaskStatic {
let pool = self
.tc_args
.tc_source
.tc_store
.shared_pool
.pool
.read()
.expect("locking tc pool failed");