From 4fe95edecd21376abf0792f0381ffd8023c8aee1 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 24 Apr 2024 13:06:12 +0200 Subject: [PATCH] simplified event management --- satrs-example/src/events.rs | 86 +++++++++++------------------------ satrs-example/src/main.rs | 15 +++--- satrs-example/src/pus/test.rs | 6 +-- satrs/src/event_man.rs | 55 +++++++++++----------- satrs/tests/pus_events.rs | 19 ++++---- 5 files changed, 71 insertions(+), 110 deletions(-) diff --git a/satrs-example/src/events.rs b/satrs-example/src/events.rs index 5d1bdaf..bfef018 100644 --- a/satrs-example/src/events.rs +++ b/satrs-example/src/events.rs @@ -8,10 +8,7 @@ use satrs::pus::verification::VerificationReporter; use satrs::pus::EcssTmSender; use satrs::request::UniqueApidTargetId; use satrs::{ - event_man::{ - EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded, - MpscEventReceiver, - }, + event_man::{EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded}, pus::{ event_man::{ DefaultPusEventU32Dispatcher, EventReporter, EventRequest, EventRequestWithToken, @@ -136,34 +133,42 @@ impl PusEventHandler { } } -/// This is a thin wrapper around the event manager which also caches the sender component -/// used to send events to the event manager. -pub struct EventManagerWrapper { +pub struct EventHandler { + pub pus_event_handler: PusEventHandler, event_manager: EventManagerWithBoundedMpsc, - event_sender: mpsc::Sender, } -impl EventManagerWrapper { - pub fn new() -> Self { - // The sender handle is the primary sender handle for all components which want to create events. - // The event manager will receive the RX handle to receive all the events. - let (event_sender, event_man_rx) = mpsc::channel(); - let event_recv = MpscEventReceiver::new(event_man_rx); +impl EventHandler { + pub fn new( + tm_sender: TmSender, + event_rx: mpsc::Receiver, + event_request_rx: mpsc::Receiver, + ) -> Self { + let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx); + let pus_event_handler = PusEventHandler::new( + tm_sender, + create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid), + &mut event_manager, + event_request_rx, + ); + Self { - event_manager: EventManagerWithBoundedMpsc::new(event_recv), - event_sender, + pus_event_handler, + event_manager, } } - // Returns a cached event sender to send events to the event manager for routing. - pub fn clone_event_sender(&self) -> mpsc::Sender { - self.event_sender.clone() - } - + #[allow(dead_code)] pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc { &mut self.event_manager } + pub fn periodic_operation(&mut self) { + self.pus_event_handler.handle_event_requests(); + self.try_event_routing(); + self.pus_event_handler.generate_pus_event_tm(); + } + pub fn try_event_routing(&mut self) { let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| { self.routing_error_handler(event_msg, error) @@ -176,42 +181,3 @@ impl EventManagerWrapper { log::warn!("event routing error for event {event_msg:?}: {error:?}"); } } - -pub struct EventHandler { - pub event_man_wrapper: EventManagerWrapper, - pub pus_event_handler: PusEventHandler, -} - -impl EventHandler { - pub fn new( - tm_sender: TmSender, - event_request_rx: mpsc::Receiver, - ) -> Self { - let mut event_man_wrapper = EventManagerWrapper::new(); - let pus_event_handler = PusEventHandler::new( - tm_sender, - create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid), - event_man_wrapper.event_manager(), - event_request_rx, - ); - Self { - event_man_wrapper, - pus_event_handler, - } - } - - pub fn clone_event_sender(&self) -> mpsc::Sender { - self.event_man_wrapper.clone_event_sender() - } - - #[allow(dead_code)] - pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc { - self.event_man_wrapper.event_manager() - } - - pub fn periodic_operation(&mut self) { - self.pus_event_handler.handle_event_requests(); - self.event_man_wrapper.try_event_routing(); - self.pus_event_handler.generate_pus_event_tm(); - } -} diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index cf8e050..6c7c5f3 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -80,11 +80,12 @@ fn static_tmtc_pool_main() { // Create event handling components // These sender handles are used to send event requests, for example to enable or disable // certain events. + let (event_tx, event_rx) = mpsc::sync_channel(100); let (event_request_tx, event_request_rx) = mpsc::channel::(); // The event task is the core handler to perform the event routing and TM handling as specified // in the sat-rs documentation. - let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_request_rx); + let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_rx, event_request_rx); let (pus_test_tx, pus_test_rx) = mpsc::channel(); let (pus_event_tx, pus_event_rx) = mpsc::channel(); @@ -108,7 +109,7 @@ fn static_tmtc_pool_main() { let pus_test_service = create_test_service_static( tm_funnel_tx_sender.clone(), shared_tc_pool.clone(), - event_handler.clone_event_sender(), + event_tx.clone(), pus_test_rx, ); let pus_scheduler_service = create_scheduler_service_static( @@ -314,10 +315,11 @@ fn dyn_tmtc_pool_main() { // Create event handling components // These sender handles are used to send event requests, for example to enable or disable // certain events. + let (event_tx, event_rx) = mpsc::sync_channel(100); let (event_request_tx, event_request_rx) = mpsc::channel::(); // The event task is the core handler to perform the event routing and TM handling as specified // in the sat-rs documentation. - let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_request_rx); + let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_rx, event_request_rx); let (pus_test_tx, pus_test_rx) = mpsc::channel(); let (pus_event_tx, pus_event_rx) = mpsc::channel(); @@ -339,11 +341,8 @@ fn dyn_tmtc_pool_main() { mode_tc_sender: pus_mode_tx, }; - let pus_test_service = create_test_service_dynamic( - tm_funnel_tx.clone(), - event_handler.clone_event_sender(), - pus_test_rx, - ); + let pus_test_service = + create_test_service_dynamic(tm_funnel_tx.clone(), event_tx.clone(), pus_test_rx); let pus_scheduler_service = create_scheduler_service_dynamic( tm_funnel_tx.clone(), tc_source_tx.clone(), diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index 583b72c..585e93b 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -23,7 +23,7 @@ use super::HandlingStatus; pub fn create_test_service_static( tm_sender: PacketSenderWithSharedPool, tc_pool: SharedStaticMemoryPool, - event_sender: mpsc::Sender, + event_sender: mpsc::SyncSender, pus_test_rx: mpsc::Receiver, ) -> TestCustomServiceWrapper { let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( @@ -41,7 +41,7 @@ pub fn create_test_service_static( pub fn create_test_service_dynamic( tm_funnel_tx: mpsc::Sender, - event_sender: mpsc::Sender, + event_sender: mpsc::SyncSender, pus_test_rx: mpsc::Receiver, ) -> TestCustomServiceWrapper { let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( @@ -61,7 +61,7 @@ pub struct TestCustomServiceWrapper, - pub test_srv_event_sender: mpsc::Sender, + pub test_srv_event_sender: mpsc::SyncSender, } impl diff --git a/satrs/src/event_man.rs b/satrs/src/event_man.rs index 38752eb..4ad18ea 100644 --- a/satrs/src/event_man.rs +++ b/satrs/src/event_man.rs @@ -21,8 +21,8 @@ //! //! 1. Provide a concrete [EventReceiveProvider] implementation. This abstraction allow to use different //! message queue backends. A straightforward implementation where dynamic memory allocation is -//! not a big concern could use [std::sync::mpsc::channel] to do this and is provided in -//! form of the [MpscEventReceiver]. +//! not a big concern would be to use the [std::sync::mpsc::Receiver] handle. The trait is +//! already implemented for this type. //! 2. To set up event creators, create channel pairs using some message queue implementation. //! Each event creator gets a (cloned) sender component which allows it to send events to the //! manager. @@ -157,9 +157,10 @@ pub trait SenderMapProvider< /// * `SenderMap`: [SenderMapProvider] which maps channel IDs to send providers. /// * `ListenerMap`: [ListenerMapProvider] which maps listener keys to channel IDs. /// * `EventSender`: [EventSendProvider] contained within the sender map which sends the events. -/// * `Ev`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32] +/// * `Event`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32] /// and [EventU16] are supported. -/// * `Data`: Auxiliary data which is sent with the event to provide optional context information +/// * `ParamProvider`: Auxiliary data which is sent with the event to provide optional context +/// information pub struct EventManager< EventReceiver: EventReceiveProvider, SenderMap: SenderMapProvider, @@ -331,11 +332,11 @@ pub mod alloc_mod { /// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap] /// and the [DefaultListenerMap]. It uses regular mpsc channels as the message queue backend. - pub type EventManagerWithMpsc = EventManager< - MpscEventReceiver, - DefaultSenderMap, EV, AUX>, + pub type EventManagerWithMpsc = EventManager< + EventU32ReceiverMpsc, + DefaultSenderMap, Event, ParamProvider>, DefaultListenerMap, - EventSenderMpsc, + EventSenderMpsc, >; /// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap] @@ -343,7 +344,7 @@ pub mod alloc_mod { /// [bounded mpsc senders](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) as the /// message queue backend. pub type EventManagerWithBoundedMpsc = EventManager< - MpscEventReceiver, + EventU32ReceiverMpsc, DefaultSenderMap, Event, ParamProvider>, DefaultListenerMap, EventSenderMpscBounded, @@ -479,20 +480,16 @@ pub mod std_mod { use super::*; use std::sync::mpsc; - pub struct MpscEventReceiver { - receiver: mpsc::Receiver>, - } - - impl MpscEventReceiver { - pub fn new(receiver: mpsc::Receiver>) -> Self { - Self { receiver } - } - } - impl EventReceiveProvider for MpscEventReceiver { + impl + EventReceiveProvider + for mpsc::Receiver> + { type Error = GenericReceiveError; - fn try_recv_event(&self) -> Result>, Self::Error> { - match self.receiver.try_recv() { + fn try_recv_event( + &self, + ) -> Result>, Self::Error> { + match self.try_recv() { Ok(msg) => Ok(Some(msg)), Err(e) => match e { mpsc::TryRecvError::Empty => Ok(None), @@ -504,8 +501,10 @@ pub mod std_mod { } } - pub type MpscEventU32Receiver = MpscEventReceiver; - pub type MpscEventU16Receiver = MpscEventReceiver; + pub type EventU32ReceiverMpsc = + mpsc::Receiver>; + pub type EventU16ReceiverMpsc = + mpsc::Receiver>; /// Generic event sender which uses a regular [mpsc::Sender] as the messaging backend to /// send events. @@ -624,9 +623,8 @@ mod tests { } fn generic_event_man() -> (mpsc::Sender, EventManagerWithMpsc) { - let (event_sender, manager_queue) = mpsc::channel(); - let event_man_receiver = MpscEventReceiver::new(manager_queue); - (event_sender, EventManager::new(event_man_receiver)) + let (event_sender, event_receiver) = mpsc::channel(); + (event_sender, EventManager::new(event_receiver)) } #[test] @@ -793,9 +791,8 @@ mod tests { let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| { panic!("routing error occurred for event {:?}: {:?}", event_msg, e); }; - let (event_sender, manager_queue) = mpsc::channel(); - let event_man_receiver = MpscEventReceiver::new(manager_queue); - let mut event_man = EventManagerWithMpsc::new(event_man_receiver); + let (event_sender, event_receiver) = mpsc::channel(); + let mut event_man = EventManagerWithMpsc::new(event_receiver); let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap(); let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); let (event_0_tx_0, all_events_rx) = mpsc::channel(); diff --git a/satrs/tests/pus_events.rs b/satrs/tests/pus_events.rs index a5c3061..ec9b8d1 100644 --- a/satrs/tests/pus_events.rs +++ b/satrs/tests/pus_events.rs @@ -1,6 +1,6 @@ use satrs::event_man::{ EventManagerWithMpsc, EventMessage, EventMessageU32, EventRoutingError, EventSendProvider, - EventU32SenderMpsc, MpscEventU32Receiver, + EventU32SenderMpsc, }; use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo}; use satrs::params::U32Pair; @@ -29,15 +29,14 @@ pub enum CustomTmSenderError { #[test] fn test_threaded_usage() { - let (event_sender, event_man_receiver) = mpsc::channel(); - let event_receiver = MpscEventU32Receiver::new(event_man_receiver); - let mut event_man = EventManagerWithMpsc::new(event_receiver); + let (event_tx, event_rx) = mpsc::sync_channel(100); + let mut event_man = EventManagerWithMpsc::new(event_rx); let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel(); let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx); event_man.subscribe_all(pus_event_man_send_provider.target_id()); event_man.add_sender(pus_event_man_send_provider); - let (event_tx, event_rx) = mpsc::channel::(); + let (event_packet_tx, event_packet_rx) = mpsc::channel::(); let reporter = EventReporter::new(TEST_ID.raw(), 0x02, 0, 128).expect("Creating event reporter failed"); let pus_event_man = PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default()); @@ -54,7 +53,7 @@ fn test_threaded_usage() { Ok(event_msg) => { let gen_event = |aux_data| { pus_event_man.generate_pus_event_tm_generic( - &event_tx, + &event_packet_tx, &EMPTY_STAMP, event_msg.event(), aux_data, @@ -100,14 +99,14 @@ fn test_threaded_usage() { // Event sender and TM checker thread let jh1 = thread::spawn(move || { - event_sender + event_tx .send(EventMessage::new( TEST_COMPONENT_ID_0.id(), INFO_EVENT.into(), )) .expect("Sending info event failed"); loop { - match event_rx.try_recv() { + match event_packet_rx.try_recv() { // Event TM received successfully Ok(event_tm) => { let tm = PusTmReader::new(event_tm.packet.as_slice(), 7) @@ -129,7 +128,7 @@ fn test_threaded_usage() { } } } - event_sender + event_tx .send(EventMessage::new_with_params( TEST_COMPONENT_ID_0.id(), LOW_SEV_EVENT, @@ -137,7 +136,7 @@ fn test_threaded_usage() { )) .expect("Sending low severity event failed"); loop { - match event_rx.try_recv() { + match event_packet_rx.try_recv() { // Event TM received successfully Ok(event_tm) => { let tm = PusTmReader::new(event_tm.packet.as_slice(), 7)