From d4a69a77e96a0854826929119818199b5399f47c Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 7 Feb 2024 12:09:39 +0100 Subject: [PATCH] continue example update --- satrs-core/Cargo.toml | 5 +- satrs-example/src/acs.rs | 24 ++---- satrs-example/src/lib.rs | 1 + satrs-example/src/main.rs | 83 ++++++++++++++++--- satrs-example/src/pus/scheduler.rs | 80 +++++++++++++++--- satrs-example/src/pus/stack.rs | 6 +- satrs-example/src/tm_funnel.rs | 127 ++++++++++++++++++++++------- satrs-example/src/tmtc.rs | 28 +++---- 8 files changed, 269 insertions(+), 85 deletions(-) diff --git a/satrs-core/Cargo.toml b/satrs-core/Cargo.toml index 488966c..c5f7182 100644 --- a/satrs-core/Cargo.toml +++ b/satrs-core/Cargo.toml @@ -73,10 +73,11 @@ features = ["all"] optional = true [dependencies.spacepackets] -version = "0.8.1" +version = "0.9.0" default-features = false -# git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" +git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" # rev = "297cfad22637d3b07a1b27abe56d9a607b5b82a7" +branch = "main" [dependencies.cobs] git = "https://github.com/robamu/cobs.rs.git" diff --git a/satrs-example/src/acs.rs b/satrs-example/src/acs.rs index fa3a75b..2974b32 100644 --- a/satrs-example/src/acs.rs +++ b/satrs-example/src/acs.rs @@ -1,10 +1,9 @@ use std::sync::mpsc::{self, TryRecvError}; use log::{info, warn}; -use satrs_core::pool::StoreAddr; use satrs_core::pus::verification::VerificationReporterWithSender; +use satrs_core::pus::{EcssTmSender, PusTmWrapper}; use satrs_core::spacepackets::ecss::hk::Subservice as HkSubservice; -use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::{ hk::HkRequest, spacepackets::{ @@ -24,26 +23,23 @@ use crate::{ pub struct AcsTask { timestamp: [u8; 7], time_provider: TimeProvider, - tm_store: SharedTmStore, - tm_funnel: mpsc::Sender, - request_rx: mpsc::Receiver, verif_reporter: VerificationReporterWithSender, + tm_sender: Box, + request_rx: mpsc::Receiver, } impl AcsTask { pub fn new( - tm_store: SharedTmStore, - tm_funnel: mpsc::Sender, + tm_sender: impl EcssTmSender, request_rx: mpsc::Receiver, verif_reporter: VerificationReporterWithSender, ) -> Self { Self { timestamp: [0; 7], time_provider: TimeProvider::new_with_u16_days(0, 0), - tm_store, - tm_funnel, - request_rx, verif_reporter, + tm_sender: Box::new(tm_sender), + request_rx, } } @@ -60,11 +56,9 @@ impl AcsTask { let hk_id = HkUniqueId::new(target_id, unique_id); hk_id.write_to_be_bytes(&mut buf).unwrap(); let pus_tm = PusTmCreator::new(&mut sp_header, sec_header, &buf, true); - let addr = self - .tm_store - .add_pus_tm(&pus_tm) - .expect("Adding PUS TM failed"); - self.tm_funnel.send(addr).expect("Sending HK TM failed"); + self.tm_sender + .send_tm(PusTmWrapper::Direct(pus_tm)) + .expect("Sending HK TM failed"); } // TODO: Verification failure for invalid unique IDs. } diff --git a/satrs-example/src/lib.rs b/satrs-example/src/lib.rs index 75fc598..a552fe0 100644 --- a/satrs-example/src/lib.rs +++ b/satrs-example/src/lib.rs @@ -145,6 +145,7 @@ pub enum TmSenderId { PusAction = 4, PusSched = 5, AllEvents = 6, + AcsSubsystem = 7, } #[derive(Copy, Clone, PartialEq, Eq)] diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 0f1f6ad..b139ee4 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -12,12 +12,12 @@ mod udp; use crate::events::EventHandler; use crate::pus::stack::PusStack; -use crate::tm_funnel::TmFunnel; +use crate::tm_funnel::{TmFunnelDynamic, TmFunnelStatic}; use log::info; use pus::test::create_test_service_dynamic; use satrs_core::hal::std::tcp_server::ServerConfig; use satrs_core::hal::std::udp_server::UdpTcServer; -use tmtc::PusTcSourceDynamic; +use tmtc::PusTcSourceProviderDynamic; use udp::DynamicUdpTmHandler; use crate::acs::AcsTask; @@ -26,14 +26,13 @@ use crate::logger::setup_logger; use crate::pus::action::create_action_service; use crate::pus::event::create_event_service; use crate::pus::hk::create_hk_service; -use crate::pus::scheduler::create_scheduler_service; +use crate::pus::scheduler::create_scheduler_service_static; use crate::pus::test::create_test_service_static; use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::requests::RequestWithToken; use crate::tcp::{SyncTcpTmSource, TcpTask}; use crate::tmtc::{ - MpscStoreAndSendError, PusTcSourceStaticPool, SharedTcPool, TcArgs, TmArgs, TmtcTaskDynamic, - TmtcTaskStatic, + PusTcSourceProviderSharedPool, SharedTcPool, TcArgs, TmArgs, TmtcTaskDynamic, TmtcTaskStatic, }; use crate::udp::{StaticUdpTmHandler, UdpTmtcServer}; use satrs_core::pool::{StaticMemoryPool, StaticPoolConfig}; @@ -117,8 +116,8 @@ fn static_tmtc_pool_main() { let mut request_map = HashMap::new(); request_map.insert(acs_target_id, acs_thread_tx); - let tc_source_wrapper = PusTcSourceStaticPool { - tc_store: shared_tc_pool.clone(), + let tc_source_wrapper = PusTcSourceProviderSharedPool { + shared_pool: shared_tc_pool.clone(), tc_source: tc_source_tx, }; @@ -171,7 +170,7 @@ fn static_tmtc_pool_main() { event_handler.clone_event_sender(), pus_test_rx, ); - let pus_scheduler_service = create_scheduler_service( + let pus_scheduler_service = create_scheduler_service_static( shared_tm_store.clone(), tm_funnel_tx.clone(), verif_reporter.clone(), @@ -242,13 +241,17 @@ fn static_tmtc_pool_main() { .expect("tcp server creation failed"); let mut acs_task = AcsTask::new( - shared_tm_store.clone(), - tm_funnel_tx.clone(), + MpscTmInStoreSender::new( + TmSenderId::AcsSubsystem as ChannelId, + "ACS_TASK_SENDER", + shared_tm_store.clone(), + tm_funnel_tx.clone(), + ), acs_thread_rx, verif_reporter, ); - let mut tm_funnel = TmFunnel::new( + let mut tm_funnel = TmFunnelStatic::new( shared_tm_store, sync_tm_tcp_source, tm_funnel_rx, @@ -343,7 +346,7 @@ fn dyn_tmtc_pool_main() { let mut request_map = HashMap::new(); request_map.insert(acs_target_id, acs_thread_tx); - let tc_source = PusTcSourceDynamic { + let tc_source = PusTcSourceProviderDynamic { tc_source: tc_source_tx, }; @@ -411,6 +414,17 @@ fn dyn_tmtc_pool_main() { ) .expect("tcp server creation failed"); + let mut acs_task = AcsTask::new( + MpscTmAsVecSender::new( + TmSenderId::AcsSubsystem as ChannelId, + "ACS_TASK_SENDER", + tm_funnel_tx.clone(), + ), + acs_thread_rx, + verif_reporter, + ); + let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); + info!("Starting TMTC and UDP task"); let jh_udp_tmtc = thread::Builder::new() .name("TMTC and UDP".to_string()) @@ -434,6 +448,51 @@ fn dyn_tmtc_pool_main() { } }) .unwrap(); + + info!("Starting TM funnel task"); + let jh1 = thread::Builder::new() + .name("TM Funnel".to_string()) + .spawn(move || loop { + tm_funnel.operation(); + }) + .unwrap(); + + info!("Starting event handling task"); + let jh2 = thread::Builder::new() + .name("Event".to_string()) + .spawn(move || loop { + event_handler.periodic_operation(); + thread::sleep(Duration::from_millis(400)); + }) + .unwrap(); + + info!("Starting AOCS thread"); + let jh3 = thread::Builder::new() + .name("AOCS".to_string()) + .spawn(move || loop { + acs_task.periodic_operation(); + thread::sleep(Duration::from_millis(500)); + }) + .unwrap(); + + info!("Starting PUS handler thread"); + let jh4 = thread::Builder::new() + .name("PUS".to_string()) + .spawn(move || loop { + // pus_stack.periodic_operation(); + thread::sleep(Duration::from_millis(200)); + }) + .unwrap(); + jh_udp_tmtc + .join() + .expect("Joining UDP TMTC server thread failed"); + jh_tcp + .join() + .expect("Joining TCP TMTC server thread failed"); + jh1.join().expect("Joining TM Funnel thread failed"); + jh2.join().expect("Joining Event Manager thread failed"); + jh3.join().expect("Joining AOCS thread failed"); + jh4.join().expect("Joining PUS handler thread failed"); } fn main() { diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index ed89397..06f8491 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -7,29 +7,29 @@ use satrs_core::pus::scheduler::{PusScheduler, TcInfo}; use satrs_core::pus::scheduler_srv::PusService11SchedHandler; use satrs_core::pus::verification::VerificationReporterWithSender; use satrs_core::pus::{ - EcssTcAndToken, EcssTcInSharedStoreConverter, MpscTcReceiver, MpscTmInStoreSender, - PusPacketHandlerResult, PusServiceHelper, + EcssTcAndToken, EcssTcInSharedStoreConverter, EcssTcInVecConverter, MpscTcReceiver, + MpscTmInStoreSender, PusPacketHandlerResult, PusServiceHelper, }; use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::ChannelId; use satrs_example::{TcReceiverId, TmSenderId, PUS_APID}; -use crate::tmtc::PusTcSourceStaticPool; +use crate::tmtc::{PusTcSourceProviderDynamic, PusTcSourceProviderSharedPool}; -pub struct Pus11Wrapper { +pub struct Pus11WrapperStatic { pub pus_11_handler: PusService11SchedHandler, pub sched_tc_pool: StaticMemoryPool, - pub tc_source_wrapper: PusTcSourceStaticPool, + pub tc_source_wrapper: PusTcSourceProviderSharedPool, } -pub fn create_scheduler_service( +pub fn create_scheduler_service_static( shared_tm_store: SharedTmStore, tm_funnel_tx: mpsc::Sender, verif_reporter: VerificationReporterWithSender, - tc_source_wrapper: PusTcSourceStaticPool, + tc_source_wrapper: PusTcSourceProviderSharedPool, pus_sched_rx: mpsc::Receiver, sched_tc_pool: StaticMemoryPool, -) -> Pus11Wrapper { +) -> Pus11WrapperStatic { let sched_srv_tm_sender = MpscTmInStoreSender::new( TmSenderId::PusSched as ChannelId, "PUS_11_TM_SENDER", @@ -53,21 +53,21 @@ pub fn create_scheduler_service( ), scheduler, ); - Pus11Wrapper { + Pus11WrapperStatic { pus_11_handler, sched_tc_pool, tc_source_wrapper, } } -impl Pus11Wrapper { +impl Pus11WrapperStatic { pub fn release_tcs(&mut self) { let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool { if enabled { // Transfer TC from scheduler TC pool to shared TC pool. let released_tc_addr = self .tc_source_wrapper - .tc_store + .shared_pool .pool .write() .expect("locking pool failed") @@ -120,3 +120,61 @@ impl Pus11Wrapper { false } } + +pub struct Pus11WrapperDynamic { + pub pus_11_handler: PusService11SchedHandler, + pub sched_tc_pool: StaticMemoryPool, + pub tc_source_wrapper: PusTcSourceProviderDynamic, +} + +impl Pus11WrapperDynamic { + pub fn release_tcs(&mut self) { + let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool { + if enabled { + // Send released TC to centralized TC source. + self.tc_source_wrapper + .tc_source + .send(tc.to_vec()) + .expect("sending TC to TC source failed"); + } + true + }; + + self.pus_11_handler + .scheduler_mut() + .update_time_from_now() + .unwrap(); + let released_tcs = self + .pus_11_handler + .scheduler_mut() + .release_telecommands(releaser, &mut self.sched_tc_pool) + .expect("releasing TCs failed"); + if released_tcs > 0 { + info!("{released_tcs} TC(s) released from scheduler"); + } + } + + pub fn handle_next_packet(&mut self) -> bool { + match self.pus_11_handler.handle_one_tc(&mut self.sched_tc_pool) { + Ok(result) => match result { + PusPacketHandlerResult::RequestHandled => {} + PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { + warn!("PUS11 partial packet handling success: {e:?}") + } + PusPacketHandlerResult::CustomSubservice(invalid, _) => { + warn!("PUS11 invalid subservice {invalid}"); + } + PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { + warn!("PUS11: Subservice {subservice} not implemented"); + } + PusPacketHandlerResult::Empty => { + return true; + } + }, + Err(error) => { + error!("PUS packet handling error: {error:?}") + } + } + false + } +} diff --git a/satrs-example/src/pus/stack.rs b/satrs-example/src/pus/stack.rs index cd27856..3e53bef 100644 --- a/satrs-example/src/pus/stack.rs +++ b/satrs-example/src/pus/stack.rs @@ -1,7 +1,7 @@ use satrs_core::pus::EcssTcInSharedStoreConverter; use super::{ - action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11Wrapper, + action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11WrapperStatic, test::Service17CustomWrapper, }; @@ -9,7 +9,7 @@ pub struct PusStack { event_srv: Pus5Wrapper, hk_srv: Pus3Wrapper, action_srv: Pus8Wrapper, - schedule_srv: Pus11Wrapper, + schedule_srv: Pus11WrapperStatic, test_srv: Service17CustomWrapper, } @@ -18,7 +18,7 @@ impl PusStack { hk_srv: Pus3Wrapper, event_srv: Pus5Wrapper, action_srv: Pus8Wrapper, - schedule_srv: Pus11Wrapper, + schedule_srv: Pus11WrapperStatic, test_srv: Service17CustomWrapper, ) -> Self { Self { diff --git a/satrs-example/src/tm_funnel.rs b/satrs-example/src/tm_funnel.rs index cca3bb1..ba1ff32 100644 --- a/satrs-example/src/tm_funnel.rs +++ b/satrs-example/src/tm_funnel.rs @@ -6,23 +6,76 @@ use std::{ use satrs_core::{ pool::{PoolProviderMemInPlace, StoreAddr}, seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, - spacepackets::ecss::tm::PusTmZeroCopyWriter, + spacepackets::{ + ecss::{tm::PusTmZeroCopyWriter, PusPacket}, + time::cds::MIN_CDS_FIELD_LEN, + CcsdsPacket, + }, tmtc::tm_helper::SharedTmStore, }; -use satrs_example::PUS_APID; use crate::tcp::SyncTcpTmSource; -pub struct TmFunnel { - shared_tm_store: SharedTmStore, - seq_count_provider: CcsdsSimpleSeqCountProvider, - sync_tm_tcp_source: SyncTcpTmSource, +#[derive(Default)] +pub struct CcsdsSeqCounterMap { + apid_seq_counter_map: HashMap, +} +impl CcsdsSeqCounterMap { + pub fn get_and_increment(&mut self, apid: u16) -> u16 { + self.apid_seq_counter_map + .entry(apid) + .or_default() + .get_and_increment() + } +} + +pub struct TmFunnelCommon { + seq_counter_map: CcsdsSeqCounterMap, msg_counter_map: HashMap, + sync_tm_tcp_source: SyncTcpTmSource, +} + +impl TmFunnelCommon { + pub fn new(sync_tm_tcp_source: SyncTcpTmSource) -> Self { + Self { + seq_counter_map: Default::default(), + msg_counter_map: Default::default(), + sync_tm_tcp_source, + } + } + + // Applies common packet processing operations for PUS TM packets. This includes setting + // a sequence counter + fn apply_packet_processing(&mut self, mut zero_copy_writer: PusTmZeroCopyWriter) { + // zero_copy_writer.set_apid(PUS_APID); + zero_copy_writer.set_seq_count( + self.seq_counter_map + .get_and_increment(zero_copy_writer.apid()), + ); + let entry = self + .msg_counter_map + .entry(zero_copy_writer.service()) + .or_insert(0); + zero_copy_writer.set_msg_count(*entry); + if *entry == u16::MAX { + *entry = 0; + } else { + *entry += 1; + } + + // This operation has to come last! + zero_copy_writer.finish(); + } +} + +pub struct TmFunnelStatic { + common: TmFunnelCommon, + shared_tm_store: SharedTmStore, tm_funnel_rx: Receiver, tm_server_tx: Sender, } -impl TmFunnel { +impl TmFunnelStatic { pub fn new( shared_tm_store: SharedTmStore, sync_tm_tcp_source: SyncTcpTmSource, @@ -30,10 +83,8 @@ impl TmFunnel { tm_server_tx: Sender, ) -> Self { Self { + common: TmFunnelCommon::new(sync_tm_tcp_source), shared_tm_store, - seq_count_provider: CcsdsSimpleSeqCountProvider::new(), - msg_counter_map: HashMap::new(), - sync_tm_tcp_source, tm_funnel_rx, tm_server_tx, } @@ -48,27 +99,47 @@ impl TmFunnel { let tm_raw = pool_guard .modify(&addr) .expect("Reading TM from pool failed"); - let mut zero_copy_writer = - PusTmZeroCopyWriter::new(tm_raw).expect("Creating TM zero copy writer failed"); - zero_copy_writer.set_apid(PUS_APID); - zero_copy_writer.set_seq_count(self.seq_count_provider.get_and_increment()); - let entry = self - .msg_counter_map - .entry(zero_copy_writer.service()) - .or_insert(0); - zero_copy_writer.set_msg_count(*entry); - if *entry == u16::MAX { - *entry = 0; - } else { - *entry += 1; - } - - // This operation has to come last! - zero_copy_writer.finish(); + let zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw, MIN_CDS_FIELD_LEN) + .expect("Creating TM zero copy writer failed"); + self.common.apply_packet_processing(zero_copy_writer); self.tm_server_tx .send(addr) .expect("Sending TM to server failed"); - self.sync_tm_tcp_source.add_tm(tm_raw); + self.common.sync_tm_tcp_source.add_tm(tm_raw); + } + } +} + +pub struct TmFunnelDynamic { + common: TmFunnelCommon, + tm_funnel_rx: Receiver>, + tm_server_tx: Sender>, +} + +impl TmFunnelDynamic { + pub fn new( + sync_tm_tcp_source: SyncTcpTmSource, + tm_funnel_rx: Receiver>, + tm_server_tx: Sender>, + ) -> Self { + Self { + common: TmFunnelCommon::new(sync_tm_tcp_source), + tm_funnel_rx, + tm_server_tx, + } + } + + pub fn operation(&mut self) { + if let Ok(mut tm) = self.tm_funnel_rx.recv() { + // Read the TM, set sequence counter and message counter, and finally update + // the CRC. + let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm, MIN_CDS_FIELD_LEN) + .expect("Creating TM zero copy writer failed"); + self.common.apply_packet_processing(zero_copy_writer); + self.tm_server_tx + .send(tm.clone()) + .expect("Sending TM to server failed"); + self.common.sync_tm_tcp_source.add_tm(&tm); } } } diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 853d538..2690827 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -18,13 +18,13 @@ pub struct TmArgs { } pub struct TcArgs { - pub tc_source: PusTcSourceStaticPool, + pub tc_source: PusTcSourceProviderSharedPool, pub tc_receiver: Receiver, } impl TcArgs { #[allow(dead_code)] - fn split(self) -> (PusTcSourceStaticPool, Receiver) { + fn split(self) -> (PusTcSourceProviderSharedPool, Receiver) { (self.tc_source, self.tc_receiver) } } @@ -54,33 +54,33 @@ impl SharedTcPool { } #[derive(Clone)] -pub struct PusTcSourceStaticPool { +pub struct PusTcSourceProviderSharedPool { pub tc_source: Sender, - pub tc_store: SharedTcPool, + pub shared_pool: SharedTcPool, } -impl PusTcSourceStaticPool { +impl PusTcSourceProviderSharedPool { #[allow(dead_code)] pub fn clone_backing_pool(&self) -> SharedStaticMemoryPool { - self.tc_store.pool.clone() + self.shared_pool.pool.clone() } } -impl ReceivesEcssPusTc for PusTcSourceStaticPool { +impl ReceivesEcssPusTc for PusTcSourceProviderSharedPool { type Error = MpscStoreAndSendError; fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { - let addr = self.tc_store.add_pus_tc(pus_tc)?; + let addr = self.shared_pool.add_pus_tc(pus_tc)?; self.tc_source.send(addr)?; Ok(()) } } -impl ReceivesCcsdsTc for PusTcSourceStaticPool { +impl ReceivesCcsdsTc for PusTcSourceProviderSharedPool { type Error = MpscStoreAndSendError; fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { - let mut pool = self.tc_store.pool.write().expect("locking pool failed"); + let mut pool = self.shared_pool.pool.write().expect("locking pool failed"); let addr = pool.add(tc_raw)?; drop(pool); self.tc_source.send(addr)?; @@ -89,11 +89,11 @@ impl ReceivesCcsdsTc for PusTcSourceStaticPool { } #[derive(Clone)] -pub struct PusTcSourceDynamic { +pub struct PusTcSourceProviderDynamic { pub tc_source: Sender>, } -impl ReceivesEcssPusTc for PusTcSourceDynamic { +impl ReceivesEcssPusTc for PusTcSourceProviderDynamic { type Error = SendError>; fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { @@ -102,7 +102,7 @@ impl ReceivesEcssPusTc for PusTcSourceDynamic { } } -impl ReceivesCcsdsTc for PusTcSourceDynamic { +impl ReceivesCcsdsTc for PusTcSourceProviderDynamic { type Error = mpsc::SendError>; fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { @@ -136,7 +136,7 @@ impl TmtcTaskStatic { let pool = self .tc_args .tc_source - .tc_store + .shared_pool .pool .read() .expect("locking tc pool failed");