From 3a1c5832a9481def49b1b3afc53005529bdde71e Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 4 Jul 2023 15:17:43 +0200 Subject: [PATCH] continue --- satrs-core/src/pus/mod.rs | 1 - satrs-core/src/tmtc/tm_helper.rs | 35 ++++++++++++ satrs-example/src/main.rs | 27 +++++++--- satrs-example/src/pus/mod.rs | 8 ++- satrs-example/src/pus/test.rs | 93 ++++++++++++++++++++++++++++++-- satrs-example/src/tmtc.rs | 21 ++------ 6 files changed, 149 insertions(+), 36 deletions(-) diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index 1fd7a23..c74e587 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -20,7 +20,6 @@ pub mod verification; #[cfg(feature = "alloc")] pub use alloc_mod::*; -use crate::pus::verification::TcStateToken; use crate::SenderId; #[cfg(feature = "std")] pub use std_mod::*; diff --git a/satrs-core/src/tmtc/tm_helper.rs b/satrs-core/src/tmtc/tm_helper.rs index c1e5720..6bc0edd 100644 --- a/satrs-core/src/tmtc/tm_helper.rs +++ b/satrs-core/src/tmtc/tm_helper.rs @@ -1,8 +1,43 @@ +use spacepackets::ecss::SerializablePusPacket; use spacepackets::time::cds::TimeProvider; use spacepackets::time::TimeWriter; use spacepackets::tm::{PusTm, PusTmSecondaryHeader}; use spacepackets::SpHeader; +#[cfg(feature = "std")] +pub use std_mod::*; + +#[cfg(feature = "std")] +pub mod std_mod { + use crate::pool::{SharedPool, StoreAddr}; + use spacepackets::ecss::SerializablePusPacket; + use spacepackets::tm::PusTm; + + #[derive(Clone)] + pub struct SharedTmStore { + pool: SharedPool, + } + + impl SharedTmStore { + pub fn new(backing_pool: SharedPool) -> Self { + Self { pool: backing_pool } + } + + pub fn backing_pool(&self) -> SharedPool { + self.pool.clone() + } + + pub fn add_pus_tm(&mut self, pus_tm: &PusTm) -> StoreAddr { + let mut pg = self.pool.write().expect("error locking TM store"); + let (addr, buf) = pg.free_element(pus_tm.len_packed()).expect("Store error"); + pus_tm + .write_to_bytes(buf) + .expect("writing PUS TM to store failed"); + addr + } + } +} + pub struct PusTmWithCdsShortHelper { apid: u16, cds_short_buf: [u8; 7], diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 450c694..916db1b 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -11,7 +11,7 @@ use crate::hk::AcsHkIds; use crate::logging::setup_logger; use crate::requests::{Request, RequestWithToken}; use crate::tmtc::{ - core_tmtc_task, OtherArgs, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmStore, PUS_APID, + core_tmtc_task, OtherArgs, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, PUS_APID, }; use satrs_core::event_man::{ EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider, @@ -35,6 +35,7 @@ use satrs_core::spacepackets::{ tm::{PusTm, PusTmSecondaryHeader}, SequenceFlags, SpHeader, }; +use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::AddressableId; use satrs_example::{RequestTargetId, OBSW_SERVER_ADDR, SERVER_PORT}; use std::collections::HashMap; @@ -43,6 +44,7 @@ use std::sync::mpsc::{channel, TryRecvError}; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; +use crate::pus::test::PusService17Handler; fn main() { setup_logger().expect("setting up logging with fern failed"); @@ -55,9 +57,7 @@ fn main() { (15, 1024), (15, 2048), ])); - let tm_store = TmStore { - pool: Arc::new(RwLock::new(Box::new(tm_pool))), - }; + let tm_store = SharedTmStore::new(Arc::new(RwLock::new(Box::new(tm_pool)))); let tc_pool = LocalPool::new(PoolCfg::new(vec![ (30, 32), (15, 64), @@ -80,7 +80,7 @@ fn main() { let verif_sender = MpscVerifSender::new( 0, "verif_sender", - tm_store.pool.clone(), + tm_store.backing_pool(), tm_funnel_tx.clone(), ); let verif_cfg = VerificationReporterCfg::new( @@ -152,6 +152,8 @@ fn main() { let aocs_to_funnel = tm_funnel_tx.clone(); let mut aocs_tm_store = tm_store.clone(); + let pus17_handler = PusService17Handler::new() + info!("Starting TMTC task"); let jh0 = thread::Builder::new() .name("TMTC".to_string()) @@ -184,8 +186,12 @@ fn main() { .name("Event".to_string()) .spawn(move || { let mut timestamp: [u8; 7] = [0; 7]; - let mut sender = - MpscTmtcInStoreSender::new(1, "event_sender", tm_store.pool, tm_funnel_tx); + let mut sender = MpscTmtcInStoreSender::new( + 1, + "event_sender", + tm_store.backing_pool(), + tm_funnel_tx, + ); let mut time_provider = TimeProvider::new_with_u16_days(0, 0); let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { reporter_event_handler @@ -307,10 +313,17 @@ fn main() { }) .unwrap(); + info!("Starting PUS handler thread"); + let jh4 = thread::Builder::new() + .name("AOCS".to_string()) + .spawn(move || { + + }); jh0.join().expect("Joining UDP TMTC server thread failed"); jh1.join().expect("Joining TM Funnel thread failed"); jh2.join().expect("Joining Event Manager thread failed"); jh3.join().expect("Joining AOCS thread failed"); + jh4.join().expect("Joining PUS handler thread failed"); } pub fn update_time(time_provider: &mut TimeProvider, timestamp: &mut [u8]) { diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index 96389e4..8146c1e 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -1,6 +1,4 @@ -use crate::requests::{Request, RequestWithToken}; -use crate::tmtc::{MpscStoreAndSendError, PusTcSource, TmStore, PUS_APID}; -use log::{info, warn}; +use crate::tmtc::MpscStoreAndSendError; use satrs_core::events::EventU32; use satrs_core::hk::{CollectionIntervalFactor, HkRequest}; use satrs_core::mode::{ModeAndSubmode, ModeRequest}; @@ -21,7 +19,7 @@ use satrs_core::res_code::ResultU16; use satrs_core::seq_count::{SeqCountProviderSyncClonable, SequenceCountProviderCore}; use satrs_core::spacepackets::ecss::{scheduling, PusServiceId}; use satrs_core::spacepackets::time::CcsdsTimeProvider; -use satrs_core::tmtc::tm_helper::PusTmWithCdsShortHelper; +use satrs_core::tmtc::tm_helper::{PusTmWithCdsShortHelper, SharedTmStore}; use satrs_core::tmtc::{AddressableId, PusServiceProvider, TargetId}; use satrs_core::{ spacepackets::ecss::PusPacket, spacepackets::tc::PusTc, spacepackets::time::cds::TimeProvider, @@ -98,7 +96,7 @@ pub struct PusTmArgs { /// All telemetry is sent with this sender handle. pub tm_tx: Sender, /// All TM to be sent is stored here - pub tm_store: TmStore, + pub tm_store: SharedTmStore, /// All verification reporting is done with this reporter. pub verif_reporter: StdVerifReporterWithSender, /// Sequence count provider for TMs sent from within pus demultiplexer diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index 4fcf8dc..ccc9d2a 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -1,12 +1,95 @@ use crate::pus::AcceptedTc; -use satrs_core::pus::verification::StdVerifReporterWithSender; -use std::sync::mpsc::Receiver; +use log::info; +use satrs_core::pool::{SharedPool, StoreAddr}; +use satrs_core::pus::verification::{ + StdVerifReporterWithSender, TcStateAccepted, VerificationToken, +}; +use satrs_core::seq_count::{SeqCountProviderSyncClonable, SequenceCountProviderCore}; +use satrs_core::spacepackets::ecss::PusPacket; +use satrs_core::spacepackets::tc::PusTc; +use satrs_core::spacepackets::time::cds::TimeProvider; +use satrs_core::spacepackets::time::TimeWriter; +use satrs_core::spacepackets::tm::PusTm; +use satrs_core::tmtc::tm_helper::{PusTmWithCdsShortHelper, SharedTmStore}; +use std::sync::mpsc::{Receiver, Sender, TryRecvError}; -struct PusService17Handler { - receiver: Receiver, +pub struct PusService17Handler { + tc_rx: Receiver, + tc_store: SharedPool, + tm_helper: PusTmWithCdsShortHelper, + tm_tx: Sender, + tm_store: SharedTmStore, verification_handler: StdVerifReporterWithSender, + stamp_buf: [u8; 7], + pus_buf: [u8; 2048], + handled_tcs: u32, } impl PusService17Handler { - pub fn periodic_operation(&mut self) {} + pub fn new(receiver: Receiver, tc_pool: SharedPool, tm_helper: PusTmWithCdsShortHelper, tm_tx: Sender, tm_store: SharedTmStore, verification_handler: StdVerifReporterWithSender) -> Self { + Self { + tc_rx: receiver, + tc_store: tc_pool, + tm_helper, + tm_tx, + tm_store, + verification_handler, + stamp_buf: [0; 7], + pus_buf: [0; 2048], + handled_tcs: 0 + } + } + // TODO: Return errors which occured + pub fn periodic_operation(&mut self) -> Result { + self.handled_tcs = 0; + loop { + match self.tc_rx.try_recv() { + Ok((addr, token)) => { + self.handle_one_tc(addr, token); + } + Err(e) => { + match e { + TryRecvError::Empty => return Ok(self.handled_tcs), + TryRecvError::Disconnected => { + // TODO: Replace panic by something cleaner + panic!("PusService17Handler: Sender disconnected"); + } + } + } + } + } + } + pub fn handle_one_tc(&mut self, addr: StoreAddr, token: VerificationToken) { + let time_provider = TimeProvider::from_now_with_u16_days().unwrap(); + // TODO: Better error handling + let (addr, token) = self.tc_rx.try_recv().unwrap(); + { + // Keep locked section as short as possible. + let mut tc_pool = self.tc_store.write().unwrap(); + let tc_guard = tc_pool.read_with_guard(addr); + let tc_raw = tc_guard.read().unwrap(); + self.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); + } + let tc = PusTc::from_bytes(&self.pus_buf).unwrap(); + // TODO: Robustness: Check that service is 17 + if tc.0.subservice() == 1 { + info!("Received PUS ping command TC[17,1]"); + info!("Sending ping reply PUS TM[17,2]"); + time_provider.write_to_bytes(&mut self.stamp_buf).unwrap(); + let start_token = self + .verification_handler + .start_success(token, Some(&self.stamp_buf)) + .expect("Error sending start success"); + // Sequence count will be handled centrally in TM funnel. + let ping_reply = self.tm_helper.create_pus_tm_with_stamp(17, 2, None, &time_provider, 0); + let addr = self.tm_store.add_pus_tm(&ping_reply); + self.tm_tx + .send(addr) + .expect("Sending TM to TM funnel failed"); + self.verification_handler + .completion_success(start_token, Some(&self.stamp_buf)) + .expect("Error sending completion success"); + self.handled_tcs += 1; + } + } } diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 0e05799..ded9090 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -23,6 +23,7 @@ use satrs_core::pus::verification::StdVerifReporterWithSender; use satrs_core::seq_count::SeqCountProviderSyncClonable; use satrs_core::spacepackets::ecss::SerializablePusPacket; use satrs_core::spacepackets::{ecss::PusPacket, tc::PusTc, tm::PusTm, SpHeader}; +use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::{ CcsdsDistributor, CcsdsError, PusServiceProvider, ReceivesCcsdsTc, ReceivesEcssPusTc, }; @@ -39,7 +40,7 @@ pub struct OtherArgs { } pub struct TmArgs { - pub tm_store: TmStore, + pub tm_store: SharedTmStore, pub tm_sink_sender: Sender, pub tm_server_rx: Receiver, } @@ -96,27 +97,11 @@ impl From> for MpscStoreAndSendError { } } -#[derive(Clone)] -pub struct TmStore { - pub pool: SharedPool, -} - #[derive(Clone)] pub struct TcStore { pub pool: SharedPool, } -impl TmStore { - pub fn add_pus_tm(&mut self, pus_tm: &PusTm) -> StoreAddr { - let mut pg = self.pool.write().expect("error locking TM store"); - let (addr, buf) = pg.free_element(pus_tm.len_packed()).expect("Store error"); - pus_tm - .write_to_bytes(buf) - .expect("writing PUS TM to store failed"); - addr - } -} - impl TcStore { pub fn add_pus_tc(&mut self, pus_tc: &PusTc) -> Result { let mut pg = self.pool.write().expect("error locking TC store"); @@ -209,7 +194,7 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, tm_rx: tm_args.tm_server_rx, - tm_store: tm_args.tm_store.pool.clone(), + tm_store: tm_args.tm_store.backing_pool(), }; let mut tc_buf: [u8; 4096] = [0; 4096];