From c97c22b7acffb90bd079f40fec97cf3ef7b36c15 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 24 Apr 2024 14:09:02 +0200 Subject: [PATCH] fixes for example event management --- satrs-example/pytmtc/common.py | 1 + satrs-example/pytmtc/main.py | 4 +- satrs-example/src/events.rs | 108 +++++++++++++++++++----------- satrs-example/src/main.rs | 36 +++++----- satrs-example/src/tmtc/tm_sink.rs | 15 +++-- satrs/src/pus/event_man.rs | 96 ++++++++++++++------------ 6 files changed, 153 insertions(+), 107 deletions(-) diff --git a/satrs-example/pytmtc/common.py b/satrs-example/pytmtc/common.py index 6f56604..a37967e 100644 --- a/satrs-example/pytmtc/common.py +++ b/satrs-example/pytmtc/common.py @@ -10,6 +10,7 @@ class Apid(enum.IntEnum): GENERIC_PUS = 2 ACS = 3 CFDP = 4 + TMTC = 5 class EventSeverity(enum.IntEnum): diff --git a/satrs-example/pytmtc/main.py b/satrs-example/pytmtc/main.py index 23f10b0..a90a011 100755 --- a/satrs-example/pytmtc/main.py +++ b/satrs-example/pytmtc/main.py @@ -144,7 +144,9 @@ class PusHandler(GenericApidHandlerBase): ) src_data = tm_packet.source_data event_u32 = EventU32.unpack(src_data) - _LOGGER.info(f"Received event packet. Event: {event_u32}") + _LOGGER.info( + f"Received event packet. Source APID: {Apid(tm_packet.apid)!r}, Event: {event_u32}" + ) if event_u32.group_id == 0 and event_u32.unique_id == 0: _LOGGER.info("Received test event") elif service == 17: diff --git a/satrs-example/src/events.rs b/satrs-example/src/events.rs index bfef018..bed3f6f 100644 --- a/satrs-example/src/events.rs +++ b/satrs-example/src/events.rs @@ -11,7 +11,7 @@ use satrs::{ event_man::{EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded}, pus::{ event_man::{ - DefaultPusEventU32Dispatcher, EventReporter, EventRequest, EventRequestWithToken, + DefaultPusEventU32TmCreator, EventReporter, EventRequest, EventRequestWithToken, }, verification::{TcStateStarted, VerificationReportingProvider, VerificationToken}, }, @@ -37,13 +37,12 @@ impl EventTmHookProvider for EventApidSetter { /// packets. It also handles the verification completion of PUS event service requests. pub struct PusEventHandler { event_request_rx: mpsc::Receiver, - pus_event_dispatcher: DefaultPusEventU32Dispatcher<()>, + pus_event_tm_creator: DefaultPusEventU32TmCreator, pus_event_man_rx: mpsc::Receiver, tm_sender: TmSender, time_provider: CdsTime, timestamp: [u8; 7], verif_handler: VerificationReporter, - event_apid_setter: EventApidSetter, } impl PusEventHandler { @@ -58,9 +57,16 @@ impl PusEventHandler { // All events sent to the manager are routed to the PUS event manager, which generates PUS event // telemetry for each event. - let event_reporter = EventReporter::new(PUS_EVENT_MANAGEMENT.raw(), 0, 0, 128).unwrap(); + let event_reporter = EventReporter::new_with_hook( + PUS_EVENT_MANAGEMENT.raw(), + 0, + 0, + 128, + EventApidSetter::default(), + ) + .unwrap(); let pus_event_dispatcher = - DefaultPusEventU32Dispatcher::new_with_default_backend(event_reporter); + DefaultPusEventU32TmCreator::new_with_default_backend(event_reporter); let pus_event_man_send_provider = EventU32SenderMpscBounded::new( PUS_EVENT_MANAGEMENT.raw(), pus_event_man_tx, @@ -72,13 +78,12 @@ impl PusEventHandler { Self { event_request_rx, - pus_event_dispatcher, + pus_event_tm_creator: pus_event_dispatcher, pus_event_man_rx, time_provider: CdsTime::new_with_u16_days(0, 0), timestamp: [0; 7], verif_handler, tm_sender, - event_apid_setter: EventApidSetter::default(), } } @@ -92,43 +97,65 @@ impl PusEventHandler { .completion_success(&self.tm_sender, started_token, timestamp) .expect("Sending completion success failed"); }; - // handle event requests - if let Ok(event_req) = self.event_request_rx.try_recv() { - match event_req.request { - EventRequest::Enable(event) => { - self.pus_event_dispatcher - .enable_tm_for_event(&event) - .expect("Enabling TM failed"); - update_time(&mut self.time_provider, &mut self.timestamp); - report_completion(event_req, &self.timestamp); - } - EventRequest::Disable(event) => { - self.pus_event_dispatcher - .disable_tm_for_event(&event) - .expect("Disabling TM failed"); - update_time(&mut self.time_provider, &mut self.timestamp); - report_completion(event_req, &self.timestamp); - } + loop { + // handle event requests + match self.event_request_rx.try_recv() { + Ok(event_req) => match event_req.request { + EventRequest::Enable(event) => { + self.pus_event_tm_creator + .enable_tm_for_event(&event) + .expect("Enabling TM failed"); + update_time(&mut self.time_provider, &mut self.timestamp); + report_completion(event_req, &self.timestamp); + } + EventRequest::Disable(event) => { + self.pus_event_tm_creator + .disable_tm_for_event(&event) + .expect("Disabling TM failed"); + update_time(&mut self.time_provider, &mut self.timestamp); + report_completion(event_req, &self.timestamp); + } + }, + Err(e) => match e { + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + log::warn!("all event request senders have disconnected"); + break; + } + }, } } } pub fn generate_pus_event_tm(&mut self) { - // Perform the generation of PUS event packets - if let Ok(event_msg) = self.pus_event_man_rx.try_recv() { - update_time(&mut self.time_provider, &mut self.timestamp); - let param_vec = event_msg.params().map_or(Vec::new(), |param| { - param.to_vec().expect("failed to convert params to vec") - }); - self.event_apid_setter.next_apid = UniqueApidTargetId::from(event_msg.sender_id()).apid; - self.pus_event_dispatcher - .generate_pus_event_tm_generic( - &self.tm_sender, - &self.timestamp, - event_msg.event(), - Some(¶m_vec), - ) - .expect("Sending TM as event failed"); + loop { + // Perform the generation of PUS event packets + match self.pus_event_man_rx.try_recv() { + Ok(event_msg) => { + update_time(&mut self.time_provider, &mut self.timestamp); + let param_vec = event_msg.params().map_or(Vec::new(), |param| { + param.to_vec().expect("failed to convert params to vec") + }); + // We use the TM modification hook to set the sender APID for each event. + self.pus_event_tm_creator.reporter.tm_hook.next_apid = + UniqueApidTargetId::from(event_msg.sender_id()).apid; + self.pus_event_tm_creator + .generate_pus_event_tm_generic( + &self.tm_sender, + &self.timestamp, + event_msg.event(), + Some(¶m_vec), + ) + .expect("Sending TM as event failed"); + } + Err(e) => match e { + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + log::warn!("All event senders have disconnected"); + break; + } + }, + } } } } @@ -181,3 +208,6 @@ impl EventHandler { log::warn!("event routing error for event {event_msg:?}: {error:?}"); } } + +#[cfg(test)] +mod tests {} diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 6c7c5f3..d6cf6ff 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -11,7 +11,7 @@ use crate::events::EventHandler; use crate::interface::udp::DynamicUdpTmHandler; use crate::pus::stack::PusStack; use crate::tmtc::tc_source::{TcSourceTaskDynamic, TcSourceTaskStatic}; -use crate::tmtc::tm_sink::{TmFunnelDynamic, TmFunnelStatic}; +use crate::tmtc::tm_sink::{TmSinkDynamic, TmSinkStatic}; use log::info; use pus::test::create_test_service_dynamic; use satrs::hal::std::tcp_server::ServerConfig; @@ -54,11 +54,11 @@ fn static_tmtc_pool_main() { let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool); let shared_tc_pool_wrapper = SharedPacketPool::new(&shared_tc_pool); let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50); - let (tm_funnel_tx, tm_funnel_rx) = mpsc::sync_channel(50); + let (tm_sink_tx, tm_sink_rx) = mpsc::sync_channel(50); let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50); - let tm_funnel_tx_sender = - PacketSenderWithSharedPool::new(tm_funnel_tx.clone(), shared_tm_pool_wrapper.clone()); + let tm_sink_tx_sender = + PacketSenderWithSharedPool::new(tm_sink_tx.clone(), shared_tm_pool_wrapper.clone()); let (mgm_handler_composite_tx, mgm_handler_composite_rx) = mpsc::channel::>(); @@ -85,7 +85,7 @@ fn static_tmtc_pool_main() { // The event task is the core handler to perform the event routing and TM handling as specified // in the sat-rs documentation. - let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_rx, event_request_rx); + let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx); let (pus_test_tx, pus_test_rx) = mpsc::channel(); let (pus_event_tx, pus_event_rx) = mpsc::channel(); @@ -107,39 +107,39 @@ fn static_tmtc_pool_main() { mode_tc_sender: pus_mode_tx, }; let pus_test_service = create_test_service_static( - tm_funnel_tx_sender.clone(), + tm_sink_tx_sender.clone(), shared_tc_pool.clone(), event_tx.clone(), pus_test_rx, ); let pus_scheduler_service = create_scheduler_service_static( - tm_funnel_tx_sender.clone(), + tm_sink_tx_sender.clone(), tc_source.clone(), pus_sched_rx, create_sched_tc_pool(), ); let pus_event_service = create_event_service_static( - tm_funnel_tx_sender.clone(), + tm_sink_tx_sender.clone(), shared_tc_pool.clone(), pus_event_rx, event_request_tx, ); let pus_action_service = create_action_service_static( - tm_funnel_tx_sender.clone(), + tm_sink_tx_sender.clone(), shared_tc_pool.clone(), pus_action_rx, request_map.clone(), pus_action_reply_rx, ); let pus_hk_service = create_hk_service_static( - tm_funnel_tx_sender.clone(), + tm_sink_tx_sender.clone(), shared_tc_pool.clone(), pus_hk_rx, request_map.clone(), pus_hk_reply_rx, ); let pus_mode_service = create_mode_service_static( - tm_funnel_tx_sender.clone(), + tm_sink_tx_sender.clone(), shared_tc_pool.clone(), pus_mode_rx, request_map, @@ -157,7 +157,7 @@ fn static_tmtc_pool_main() { let mut tmtc_task = TcSourceTaskStatic::new( shared_tc_pool_wrapper.clone(), tc_source_rx, - PusTcDistributor::new(tm_funnel_tx_sender, pus_router), + PusTcDistributor::new(tm_sink_tx_sender, pus_router), ); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); @@ -187,10 +187,10 @@ fn static_tmtc_pool_main() { ) .expect("tcp server creation failed"); - let mut tm_funnel = TmFunnelStatic::new( + let mut tm_sink = TmSinkStatic::new( shared_tm_pool_wrapper, sync_tm_tcp_source, - tm_funnel_rx, + tm_sink_rx, tm_server_tx, ); @@ -210,7 +210,7 @@ fn static_tmtc_pool_main() { mode_leaf_interface, mgm_handler_composite_rx, pus_hk_reply_tx, - tm_funnel_tx, + tm_sink_tx, dummy_spi_interface, shared_mgm_set, ); @@ -241,9 +241,9 @@ fn static_tmtc_pool_main() { info!("Starting TM funnel task"); let jh_tm_funnel = thread::Builder::new() - .name("TM Funnel".to_string()) + .name("tm sink".to_string()) .spawn(move || loop { - tm_funnel.operation(); + tm_sink.operation(); }) .unwrap(); @@ -410,7 +410,7 @@ fn dyn_tmtc_pool_main() { ) .expect("tcp server creation failed"); - let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); + let mut tm_funnel = TmSinkDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); let (mgm_handler_mode_reply_to_parent_tx, _mgm_handler_mode_reply_to_parent_rx) = mpsc::channel(); diff --git a/satrs-example/src/tmtc/tm_sink.rs b/satrs-example/src/tmtc/tm_sink.rs index 955a997..0771a79 100644 --- a/satrs-example/src/tmtc/tm_sink.rs +++ b/satrs-example/src/tmtc/tm_sink.rs @@ -70,18 +70,23 @@ impl TmFunnelCommon { } fn packet_printout(tm: &PusTmZeroCopyWriter) { - info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice()); + info!( + "Sending PUS TM[{},{}] with APID {}", + tm.service(), + tm.subservice(), + tm.apid() + ); } } -pub struct TmFunnelStatic { +pub struct TmSinkStatic { common: TmFunnelCommon, shared_tm_store: SharedPacketPool, tm_funnel_rx: mpsc::Receiver, tm_server_tx: mpsc::SyncSender, } -impl TmFunnelStatic { +impl TmSinkStatic { pub fn new( shared_tm_store: SharedPacketPool, sync_tm_tcp_source: SyncTcpTmSource, @@ -121,13 +126,13 @@ impl TmFunnelStatic { } } -pub struct TmFunnelDynamic { +pub struct TmSinkDynamic { common: TmFunnelCommon, tm_funnel_rx: mpsc::Receiver, tm_server_tx: mpsc::Sender, } -impl TmFunnelDynamic { +impl TmSinkDynamic { pub fn new( sync_tm_tcp_source: SyncTcpTmSource, tm_funnel_rx: mpsc::Receiver, diff --git a/satrs/src/pus/event_man.rs b/satrs/src/pus/event_man.rs index b8ddb6b..d5b7a8e 100644 --- a/satrs/src/pus/event_man.rs +++ b/satrs/src/pus/event_man.rs @@ -28,7 +28,7 @@ pub use heapless_mod::*; /// structure to track disabled events. A more primitive and embedded friendly /// solution could track this information in a static or pre-allocated list which contains /// the disabled events. -pub trait PusEventMgmtBackendProvider { +pub trait PusEventReportingMap { type Error; fn event_enabled(&self, event: &Event) -> bool; @@ -56,7 +56,7 @@ pub mod heapless_mod { { } - impl PusEventMgmtBackendProvider + impl PusEventReportingMap for HeaplessPusMgmtBackendProvider { type Error = (); @@ -105,7 +105,10 @@ impl From for EventManError { pub mod alloc_mod { use core::marker::PhantomData; - use crate::events::EventU16; + use crate::{ + events::EventU16, + pus::event::{DummyEventHook, EventTmHookProvider}, + }; use super::*; @@ -114,11 +117,11 @@ pub mod alloc_mod { /// /// This provider is a good option for host systems or larger embedded systems where /// the expected occasional memory allocation performed by the [HashSet] is not an issue. - pub struct DefaultPusEventMgmtBackend { + pub struct DefaultPusEventReportingMap { disabled: HashSet, } - impl Default for DefaultPusEventMgmtBackend { + impl Default for DefaultPusEventReportingMap { fn default() -> Self { Self { disabled: HashSet::default(), @@ -126,51 +129,54 @@ pub mod alloc_mod { } } - impl PusEventMgmtBackendProvider - for DefaultPusEventMgmtBackend + impl PusEventReportingMap + for DefaultPusEventReportingMap { type Error = (); - fn event_enabled(&self, event: &EV) -> bool { + fn event_enabled(&self, event: &Event) -> bool { !self.disabled.contains(event) } - fn enable_event_reporting(&mut self, event: &EV) -> Result { + fn enable_event_reporting(&mut self, event: &Event) -> Result { Ok(self.disabled.remove(event)) } - fn disable_event_reporting(&mut self, event: &EV) -> Result { + fn disable_event_reporting(&mut self, event: &Event) -> Result { Ok(self.disabled.insert(*event)) } } - pub struct PusEventDispatcher< - B: PusEventMgmtBackendProvider, - EV: GenericEvent, - E, + pub struct PusEventTmCreatorWithMap< + ReportingMap: PusEventReportingMap, + Event: GenericEvent, + EventTmHook: EventTmHookProvider = DummyEventHook, > { - reporter: EventReporter, - backend: B, - phantom: PhantomData<(E, EV)>, + pub reporter: EventReporter, + reporting_map: ReportingMap, + phantom: PhantomData, } - impl, Event: GenericEvent, E> - PusEventDispatcher + impl< + ReportingMap: PusEventReportingMap, + Event: GenericEvent, + EventTmHook: EventTmHookProvider, + > PusEventTmCreatorWithMap { - pub fn new(reporter: EventReporter, backend: B) -> Self { + pub fn new(reporter: EventReporter, backend: ReportingMap) -> Self { Self { reporter, - backend, + reporting_map: backend, phantom: PhantomData, } } - pub fn enable_tm_for_event(&mut self, event: &Event) -> Result { - self.backend.enable_event_reporting(event) + pub fn enable_tm_for_event(&mut self, event: &Event) -> Result { + self.reporting_map.enable_event_reporting(event) } - pub fn disable_tm_for_event(&mut self, event: &Event) -> Result { - self.backend.disable_event_reporting(event) + pub fn disable_tm_for_event(&mut self, event: &Event) -> Result { + self.reporting_map.disable_event_reporting(event) } pub fn generate_pus_event_tm_generic( @@ -180,7 +186,7 @@ pub mod alloc_mod { event: Event, params: Option<&[u8]>, ) -> Result { - if !self.backend.event_enabled(&event) { + if !self.reporting_map.event_enabled(&event) { return Ok(false); } match event.severity() { @@ -208,31 +214,33 @@ pub mod alloc_mod { } } - impl - PusEventDispatcher, EV, ()> + impl + PusEventTmCreatorWithMap, Event, EventTmHook> { - pub fn new_with_default_backend(reporter: EventReporter) -> Self { + pub fn new_with_default_backend(reporter: EventReporter) -> Self { Self { reporter, - backend: DefaultPusEventMgmtBackend::default(), + reporting_map: DefaultPusEventReportingMap::default(), phantom: PhantomData, } } } - impl, E> PusEventDispatcher { + impl> + PusEventTmCreatorWithMap + { pub fn enable_tm_for_event_with_sev( &mut self, event: &EventU32TypedSev, - ) -> Result { - self.backend.enable_event_reporting(event.as_ref()) + ) -> Result { + self.reporting_map.enable_event_reporting(event.as_ref()) } pub fn disable_tm_for_event_with_sev( &mut self, event: &EventU32TypedSev, - ) -> Result { - self.backend.disable_event_reporting(event.as_ref()) + ) -> Result { + self.reporting_map.disable_event_reporting(event.as_ref()) } pub fn generate_pus_event_tm( @@ -246,10 +254,10 @@ pub mod alloc_mod { } } - pub type DefaultPusEventU16Dispatcher = - PusEventDispatcher, EventU16, E>; - pub type DefaultPusEventU32Dispatcher = - PusEventDispatcher, EventU32, E>; + pub type DefaultPusEventU16TmCreator = + PusEventTmCreatorWithMap, EventU16, EventTmHook>; + pub type DefaultPusEventU32TmCreator = + PusEventTmCreatorWithMap, EventU32, EventTmHook>; } #[cfg(test)] mod tests { @@ -265,16 +273,16 @@ mod tests { const TEST_APID: u16 = 0x02; const TEST_ID: UniqueApidTargetId = UniqueApidTargetId::new(TEST_APID, 0x05); - fn create_basic_man_1() -> DefaultPusEventU32Dispatcher<()> { + fn create_basic_man_1() -> DefaultPusEventU32TmCreator { let reporter = EventReporter::new(TEST_ID.raw(), TEST_APID, 0, 128) .expect("Creating event repoter failed"); - PusEventDispatcher::new_with_default_backend(reporter) + PusEventTmCreatorWithMap::new_with_default_backend(reporter) } - fn create_basic_man_2() -> DefaultPusEventU32Dispatcher<()> { + fn create_basic_man_2() -> DefaultPusEventU32TmCreator { let reporter = EventReporter::new(TEST_ID.raw(), TEST_APID, 0, 128) .expect("Creating event repoter failed"); - let backend = DefaultPusEventMgmtBackend::default(); - PusEventDispatcher::new(reporter, backend) + let backend = DefaultPusEventReportingMap::default(); + PusEventTmCreatorWithMap::new(reporter, backend) } #[test]