diff --git a/satrs-example/src/events.rs b/satrs-example/src/events.rs index 366117a..d9d7a86 100644 --- a/satrs-example/src/events.rs +++ b/satrs-example/src/events.rs @@ -1,6 +1,8 @@ use std::sync::mpsc::{self}; use crate::pus::create_verification_reporter; +use satrs::event_man::{EventMessageU32, EventRoutingError}; +use satrs::params::WritableToBeBytes; use satrs::pus::verification::VerificationReporter; use satrs::pus::EcssTmSenderCore; use satrs::{ @@ -8,8 +10,6 @@ use satrs::{ EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded, MpscEventReceiver, }, - events::EventU32, - params::Params, pus::{ event_man::{ DefaultPusEventU32Dispatcher, EventReporter, EventRequest, EventRequestWithToken, @@ -28,7 +28,7 @@ use crate::update_time; pub struct PusEventHandler { event_request_rx: mpsc::Receiver, pus_event_dispatcher: DefaultPusEventU32Dispatcher<()>, - pus_event_man_rx: mpsc::Receiver<(EventU32, Option)>, + pus_event_man_rx: mpsc::Receiver, tm_sender: TmSender, time_provider: CdsTime, timestamp: [u8; 7], @@ -108,10 +108,18 @@ impl PusEventHandler { pub fn generate_pus_event_tm(&mut self) { // Perform the generation of PUS event packets - if let Ok((event, _param)) = self.pus_event_man_rx.try_recv() { + if let Ok(event_msg) = self.pus_event_man_rx.try_recv() { update_time(&mut self.time_provider, &mut self.timestamp); + let param_vec = event_msg.params().map_or(Vec::new(), |param| { + param.to_vec().expect("failed to convert params to vec") + }); self.pus_event_dispatcher - .generate_pus_event_tm_generic(&self.tm_sender, &self.timestamp, event, None) + .generate_pus_event_tm_generic( + &self.tm_sender, + &self.timestamp, + event_msg.event(), + Some(¶m_vec), + ) .expect("Sending TM as event failed"); } } @@ -121,7 +129,7 @@ impl PusEventHandler { /// used to send events to the event manager. pub struct EventManagerWrapper { event_manager: EventManagerWithBoundedMpsc, - event_sender: mpsc::Sender<(EventU32, Option)>, + event_sender: mpsc::Sender, } impl EventManagerWrapper { @@ -129,7 +137,7 @@ impl EventManagerWrapper { // The sender handle is the primary sender handle for all components which want to create events. // The event manager will receive the RX handle to receive all the events. let (event_sender, event_man_rx) = mpsc::channel(); - let event_recv = MpscEventReceiver::::new(event_man_rx); + let event_recv = MpscEventReceiver::new(event_man_rx); Self { event_manager: EventManagerWithBoundedMpsc::new(event_recv), event_sender, @@ -137,7 +145,7 @@ impl EventManagerWrapper { } // Returns a cached event sender to send events to the event manager for routing. - pub fn clone_event_sender(&self) -> mpsc::Sender<(EventU32, Option)> { + pub fn clone_event_sender(&self) -> mpsc::Sender { self.event_sender.clone() } @@ -146,10 +154,13 @@ impl EventManagerWrapper { } pub fn try_event_routing(&mut self) { + let error_handler = |error| self.routing_error_handler(error); // Perform the event routing. - self.event_manager - .try_event_handling() - .expect("event handling failed"); + self.event_manager.try_event_handling(error_handler); + } + + pub fn routing_error_handler(&self, error: EventRoutingError) { + log::warn!("event routing error: {error:?}"); } } @@ -176,7 +187,7 @@ impl EventHandler { } } - pub fn clone_event_sender(&self) -> mpsc::Sender<(EventU32, Option)> { + pub fn clone_event_sender(&self) -> mpsc::Sender { self.event_man_wrapper.clone_event_sender() } diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index 12bdcba..0111026 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -1,9 +1,10 @@ use crate::pus::create_verification_reporter; use log::{info, warn}; -use satrs::params::Params; +use satrs::event_man::{EventMessage, EventMessageU32}; use satrs::pool::SharedStaticMemoryPool; use satrs::pus::test::PusService17TestHandler; use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; +use satrs::pus::EcssTcInSharedStoreConverter; use satrs::pus::{ EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, EcssTmSenderCore, MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded, PusPacketHandlerResult, PusServiceHelper, @@ -13,7 +14,6 @@ use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::PusPacket; use satrs::spacepackets::time::cds::CdsTime; use satrs::spacepackets::time::TimeWriter; -use satrs::{events::EventU32, pus::EcssTcInSharedStoreConverter}; use satrs_example::config::components::PUS_TEST_SERVICE; use satrs_example::config::{tmtc_err, TEST_EVENT}; use std::sync::mpsc; @@ -21,7 +21,7 @@ use std::sync::mpsc; pub fn create_test_service_static( tm_sender: TmInSharedPoolSender>, tc_pool: SharedStaticMemoryPool, - event_sender: mpsc::Sender<(EventU32, Option)>, + event_sender: mpsc::Sender, pus_test_rx: mpsc::Receiver, ) -> TestCustomServiceWrapper { let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( @@ -39,7 +39,7 @@ pub fn create_test_service_static( pub fn create_test_service_dynamic( tm_funnel_tx: mpsc::Sender, - event_sender: mpsc::Sender<(EventU32, Option)>, + event_sender: mpsc::Sender, pus_test_rx: mpsc::Receiver, ) -> TestCustomServiceWrapper { let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( @@ -61,7 +61,7 @@ pub struct TestCustomServiceWrapper< > { pub handler: PusService17TestHandler, - pub test_srv_event_sender: mpsc::Sender<(EventU32, Option)>, + pub test_srv_event_sender: mpsc::Sender, } impl @@ -101,7 +101,7 @@ impl if subservice == 128 { info!("Generating test event"); self.test_srv_event_sender - .send((TEST_EVENT.into(), None)) + .send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into())) .expect("Sending test event failed"); let start_token = self .handler diff --git a/satrs/CHANGELOG.md b/satrs/CHANGELOG.md index 81b2cfc..bf133bc 100644 --- a/satrs/CHANGELOG.md +++ b/satrs/CHANGELOG.md @@ -16,9 +16,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Various abstraction and objects for targeted requests. This includes mode request/reply types for actions, HK and modes. - `VerificationReportingProvider::owner_id` method. +- Introduced generic `EventMessage` which is generic over the event type and the additional + parameter type. This message also contains the sender ID which can be useful for debugging + or application layer / FDIR logic. ## Changed +- `EventManager::try_event_handling` not expects a mutable error handling closure instead of + returning the occured errors. - Renamed `EventManagerBase` to `EventReportCreator` - Renamed `VerificationReporterCore` to `VerificationReportCreator`. - Removed `VerificationReporterCore`. The high-level API exposed by `VerificationReporter` and diff --git a/satrs/src/event_man.rs b/satrs/src/event_man.rs index 2b5a37b..63c7819 100644 --- a/satrs/src/event_man.rs +++ b/satrs/src/event_man.rs @@ -11,7 +11,7 @@ //! about events first: //! //! The event manager has a listener table abstracted by the [ListenerMapProvider], which maps -//! listener groups identified by [ListenerKey]s to a [sender ID][ComponentId]. +//! listener groups identified by [ListenerKey]s to a [listener ID][ComponentId]. //! It also contains a sender table abstracted by the [SenderMapProvider] which maps these sender //! IDs to concrete [EventSendProvider]s. A simple approach would be to use one send event provider //! for each OBSW thread and then subscribe for all interesting events for a particular thread @@ -28,8 +28,8 @@ //! manager. //! 3. The event manager receives the receiver component as part of a [EventReceiveProvider] //! implementation so all events are routed to the manager. -//! 4. Create the [send event providers][EventSendProvider]s which allow routing events to -//! subscribers. You can now use their [sender IDs][EventSendProvider::channel_id] to subscribe +//! 4. Create the [event sender map][SenderMapProvider]s which allow routing events to +//! subscribers. You can now use the subscriber component IDs to subscribe //! for event groups, for example by using the [EventManager::subscribe_single] method. //! 5. Add the send provider as well using the [EventManager::add_sender] call so the event //! manager can route listener groups to a the send provider. @@ -45,8 +45,9 @@ //! for a concrete example using multi-threading where events are routed to //! different threads. use crate::events::{EventU16, EventU32, GenericEvent, LargestEventRaw, LargestGroupIdRaw}; -use crate::params::{Params, ParamsHeapless}; +use crate::params::Params; use crate::queue::GenericSendError; +use core::fmt::Debug; use core::marker::PhantomData; use core::slice::Iter; @@ -65,29 +66,70 @@ pub enum ListenerKey { All, } -pub type EventWithHeaplessAuxData = (Event, Option); -pub type EventU32WithHeaplessAuxData = EventWithHeaplessAuxData; -pub type EventU16WithHeaplessAuxData = EventWithHeaplessAuxData; +// pub type EventWithAuxData = (Event, Option); +// pub type EventU32WithAuxData = EventWithAuxData; +// pub type EventU16WithAuxData = EventWithAuxData; -pub type EventWithAuxData = (Event, Option); -pub type EventU32WithAuxData = EventWithAuxData; -pub type EventU16WithAuxData = EventWithAuxData; +#[derive(Debug)] +pub struct EventMessage { + sender_id: ComponentId, + event: Event, + params: Option, +} -pub trait EventSendProvider { - fn target_id(&self) -> ComponentId; - - fn send_no_data(&self, event: EV) -> Result<(), GenericSendError> { - self.send(event, None) +impl EventMessage { + pub fn new_generic( + sender_id: ComponentId, + event: Event, + params: Option, + ) -> Self { + Self { + sender_id, + event, + params, + } } - fn send(&self, event: EV, aux_data: Option) -> Result<(), GenericSendError>; + pub fn sender_id(&self) -> ComponentId { + self.sender_id + } + + pub fn event(&self) -> Event { + self.event + } + + pub fn params(&self) -> Option<&ParamProvider> { + self.params.as_ref() + } + + pub fn new(sender_id: ComponentId, event: Event) -> Self { + Self::new_generic(sender_id, event, None) + } + + pub fn new_with_params(sender_id: ComponentId, event: Event, params: ParamProvider) -> Self { + Self::new_generic(sender_id, event, Some(params)) + } +} + +pub type EventMessageU32 = EventMessage; +pub type EventMessageU16 = EventMessage; + +/// Generic abstraction +pub trait EventSendProvider { + type Error; + + fn target_id(&self) -> ComponentId; + + fn send(&self, message: EventMessage) -> Result<(), Self::Error>; } /// Generic abstraction for an event receiver. -pub trait EventReceiveProvider { +pub trait EventReceiveProvider { + type Error; + /// This function has to be provided by any event receiver. A call may or may not return /// an event and optional auxiliary data. - fn try_recv_event(&self) -> Option<(Event, Option)>; + fn try_recv_event(&self) -> Result>, Self::Error>; } pub trait ListenerMapProvider { @@ -96,14 +138,14 @@ pub trait ListenerMapProvider { fn get_listeners(&self) -> alloc::vec::Vec; fn contains_listener(&self, key: &ListenerKey) -> bool; fn get_listener_ids(&self, key: &ListenerKey) -> Option>; - fn add_listener(&mut self, key: ListenerKey, sender_id: ComponentId) -> bool; + fn add_listener(&mut self, key: ListenerKey, listener_id: ComponentId) -> bool; fn remove_duplicates(&mut self, key: &ListenerKey); } pub trait SenderMapProvider< - EventSender: EventSendProvider, - Ev: GenericEvent = EventU32, - Data = Params, + EventSender: EventSendProvider, + Event: GenericEvent = EventU32, + ParamProvider: Debug = Params, > { fn contains_send_event_provider(&self, target_id: &ComponentId) -> bool; @@ -124,28 +166,27 @@ pub trait SenderMapProvider< /// and [EventU16] are supported. /// * `Data`: Auxiliary data which is sent with the event to provide optional context information pub struct EventManager< - EventReceiver: EventReceiveProvider, - SenderMap: SenderMapProvider, + EventReceiver: EventReceiveProvider, + SenderMap: SenderMapProvider, ListenerMap: ListenerMapProvider, - EventSender: EventSendProvider, - Ev: GenericEvent = EventU32, - Data = Params, + EventSender: EventSendProvider, + Event: GenericEvent = EventU32, + ParamProvider: Debug = Params, > { event_receiver: EventReceiver, sender_map: SenderMap, listener_map: ListenerMap, - phantom: core::marker::PhantomData<(EventSender, Ev, Data)>, + phantom: core::marker::PhantomData<(EventSender, Event, ParamProvider)>, } #[derive(Debug)] -pub enum EventRoutingResult { +pub enum EventRoutingResult { /// No event was received Empty, /// An event was received and routed to listeners. Handled { num_recipients: u32, - event: EV, - aux_data: Option, + event_msg: EventMessage, }, } @@ -156,27 +197,21 @@ pub enum EventRoutingError { NoSenderForId(ComponentId), } -#[derive(Debug)] -pub struct EventRoutingErrorsWithResult { - pub result: EventRoutingResult, - pub errors: [Option; 3], -} - impl< - EventReceiver: EventReceiveProvider, - SenderMap: SenderMapProvider, + EventReceiver: EventReceiveProvider, + SenderMap: SenderMapProvider, ListenerMap: ListenerMapProvider, - EventSender: EventSendProvider, - Ev: GenericEvent + Copy, - Data: Clone, - > EventManager + EventSender: EventSendProvider, + Event: GenericEvent + Copy, + ParamProvider: Debug, + > EventManager { pub fn remove_duplicates(&mut self, key: &ListenerKey) { self.listener_map.remove_duplicates(key) } /// Subscribe for a unique event. - pub fn subscribe_single(&mut self, event: &Ev, sender_id: ComponentId) { + pub fn subscribe_single(&mut self, event: &Event, sender_id: ComponentId) { self.update_listeners(ListenerKey::Single(event.raw_as_largest_type()), sender_id); } @@ -193,17 +228,20 @@ impl< self.update_listeners(ListenerKey::All, sender_id); } } - impl< - ERP: EventReceiveProvider, - SMP: SenderMapProvider, - LTR: ListenerMapProvider, - SP: EventSendProvider, - EV: GenericEvent + Copy, - AUX: Clone, - > EventManager + EventReceiver: EventReceiveProvider, + SenderMap: SenderMapProvider, + ListenerMap: ListenerMapProvider, + EventSenderMap: EventSendProvider, + Event: GenericEvent + Copy, + ParamProvider: Debug, + > EventManager { - pub fn new_with_custom_maps(event_receiver: ERP, sender_map: SMP, listener_map: LTR) -> Self { + pub fn new_with_custom_maps( + event_receiver: EventReceiver, + sender_map: SenderMap, + listener_map: ListenerMap, + ) -> Self { EventManager { listener_map, sender_map, @@ -213,7 +251,7 @@ impl< } /// Add a new sender component which can be used to send events to subscribers. - pub fn add_sender(&mut self, send_provider: SP) { + pub fn add_sender(&mut self, send_provider: EventSenderMap) { if !self .sender_map .contains_send_event_provider(&send_provider.target_id()) @@ -226,7 +264,17 @@ impl< fn update_listeners(&mut self, key: ListenerKey, sender_id: ComponentId) { self.listener_map.add_listener(key, sender_id); } +} +impl< + EventReceiver: EventReceiveProvider, + SenderMap: SenderMapProvider, + ListenerMap: ListenerMapProvider, + EventSenderMap: EventSendProvider, + Event: GenericEvent + Copy, + ParamProvider: Clone + Debug, + > EventManager +{ /// This function will use the cached event receiver and try to receive one event. /// If an event was received, it will try to route that event to all subscribed event listeners. /// If this works without any issues, the [EventRoutingResult] will contain context information @@ -234,60 +282,46 @@ impl< /// /// This function will track up to 3 errors returned as part of the /// [EventRoutingErrorsWithResult] error struct. - pub fn try_event_handling( + pub fn try_event_handling( &self, - ) -> Result, EventRoutingErrorsWithResult> { - let mut err_idx = 0; - let mut err_slice = [None, None, None]; + mut error_handler: E, + ) -> EventRoutingResult { + // let mut err_idx = 0; + // let mut err_slice = [None, None, None]; let mut num_recipients = 0; - let mut add_error = |error: EventRoutingError| { - if err_idx < 3 { - err_slice[err_idx] = Some(error); - err_idx += 1; - } - }; - let mut send_handler = |key: &ListenerKey, event: EV, aux_data: &Option| { + let mut send_handler = |key: &ListenerKey, event: Event, params: &Option| { if self.listener_map.contains_listener(key) { if let Some(ids) = self.listener_map.get_listener_ids(key) { for id in ids { if let Some(sender) = self.sender_map.get_send_event_provider(id) { - if let Err(e) = sender.send(event, aux_data.clone()) { - add_error(EventRoutingError::Send(e)); + if let Err(e) = + sender.send(EventMessage::new_generic(*id, event, params.clone())) + { + error_handler(EventRoutingError::Send(e)); } else { num_recipients += 1; } } else { - add_error(EventRoutingError::NoSenderForId(*id)); + error_handler(EventRoutingError::NoSenderForId(*id)); } } } else { - add_error(EventRoutingError::NoSendersForKey(*key)); + error_handler(EventRoutingError::NoSendersForKey(*key)); } } }; - if let Some((event, aux_data)) = self.event_receiver.try_recv_event() { - let single_key = ListenerKey::Single(event.raw_as_largest_type()); - send_handler(&single_key, event, &aux_data); - let group_key = ListenerKey::Group(event.group_id_as_largest_type()); - send_handler(&group_key, event, &aux_data); - send_handler(&ListenerKey::All, event, &aux_data); - if err_idx > 0 { - return Err(EventRoutingErrorsWithResult { - result: EventRoutingResult::Handled { - num_recipients, - event, - aux_data, - }, - errors: err_slice, - }); - } - return Ok(EventRoutingResult::Handled { + if let Ok(Some(event_msg)) = self.event_receiver.try_recv_event() { + let single_key = ListenerKey::Single(event_msg.event.raw_as_largest_type()); + send_handler(&single_key, event_msg.event, &event_msg.params); + let group_key = ListenerKey::Group(event_msg.event.group_id_as_largest_type()); + send_handler(&group_key, event_msg.event, &event_msg.params); + send_handler(&ListenerKey::All, event_msg.event, &event_msg.params); + return EventRoutingResult::Handled { num_recipients, - event, - aux_data, - }); + event_msg, + }; } - Ok(EventRoutingResult::Empty) + EventRoutingResult::Empty } } @@ -311,23 +345,31 @@ pub mod alloc_mod { /// and the [DefaultListenerMap]. It uses /// [bounded mpsc senders](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) as the /// message queue backend. - pub type EventManagerWithBoundedMpsc = EventManager< + pub type EventManagerWithBoundedMpsc = EventManager< MpscEventReceiver, - DefaultSenderMap, EV, AUX>, + DefaultSenderMap, Event, ParamProvider>, DefaultListenerMap, - EventSenderMpscBounded, + EventSenderMpscBounded, >; impl< - ER: EventReceiveProvider, - SP: EventSendProvider, - EV: GenericEvent + Copy, - AUX: 'static, - > EventManager, DefaultListenerMap, SP, EV, AUX> + EventReceiver: EventReceiveProvider, + EventSender: EventSendProvider, + Event: GenericEvent + Copy, + ParamProvider: 'static + Debug, + > + EventManager< + EventReceiver, + DefaultSenderMap, + DefaultListenerMap, + EventSender, + Event, + ParamProvider, + > { /// Create an event manager where the sender table will be the [DefaultSenderMap] /// and the listener table will be the [DefaultListenerMap]. - pub fn new(event_receiver: ER) -> Self { + pub fn new(event_receiver: EventReceiver) -> Self { Self { listener_map: DefaultListenerMap::default(), sender_map: DefaultSenderMap::default(), @@ -384,16 +426,19 @@ pub mod alloc_mod { /// /// Simple implementation which uses a [HashMap] internally. pub struct DefaultSenderMap< - SP: EventSendProvider, - EV: GenericEvent = EventU32, - AUX = Params, + EventSender: EventSendProvider, + Event: GenericEvent = EventU32, + ParamProvider: Debug = Params, > { - senders: HashMap, - phantom: PhantomData<(EV, AUX)>, + senders: HashMap, + phantom: PhantomData<(Event, ParamProvider)>, } - impl, EV: GenericEvent, AUX> Default - for DefaultSenderMap + impl< + EventSender: EventSendProvider, + Event: GenericEvent, + ParamProvider: Debug, + > Default for DefaultSenderMap { fn default() -> Self { Self { @@ -403,20 +448,24 @@ pub mod alloc_mod { } } - impl, EV: GenericEvent, AUX> SenderMapProvider - for DefaultSenderMap + impl< + EventSender: EventSendProvider, + Event: GenericEvent, + ParamProvider: Debug, + > SenderMapProvider + for DefaultSenderMap { fn contains_send_event_provider(&self, id: &ComponentId) -> bool { self.senders.contains_key(id) } - fn get_send_event_provider(&self, id: &ComponentId) -> Option<&SP> { + fn get_send_event_provider(&self, id: &ComponentId) -> Option<&EventSender> { self.senders .get(id) .filter(|sender| sender.target_id() == *id) } - fn add_send_event_provider(&mut self, send_provider: SP) -> bool { + fn add_send_event_provider(&mut self, send_provider: EventSender) -> bool { let id = send_provider.target_id(); if self.senders.contains_key(&id) { return false; @@ -428,26 +477,33 @@ pub mod alloc_mod { #[cfg(feature = "std")] pub mod std_mod { + use crate::queue::GenericReceiveError; + use super::*; use std::sync::mpsc; pub struct MpscEventReceiver { - mpsc_receiver: mpsc::Receiver<(Event, Option)>, + receiver: mpsc::Receiver>, } impl MpscEventReceiver { - pub fn new(receiver: mpsc::Receiver<(Event, Option)>) -> Self { - Self { - mpsc_receiver: receiver, - } + pub fn new(receiver: mpsc::Receiver>) -> Self { + Self { receiver } } } impl EventReceiveProvider for MpscEventReceiver { - fn try_recv_event(&self) -> Option> { - if let Ok(event_and_data) = self.mpsc_receiver.try_recv() { - return Some(event_and_data); + type Error = GenericReceiveError; + + fn try_recv_event(&self) -> Result>, Self::Error> { + match self.receiver.try_recv() { + Ok(msg) => Ok(Some(msg)), + Err(e) => match e { + mpsc::TryRecvError::Empty => Ok(None), + mpsc::TryRecvError::Disconnected => { + Err(GenericReceiveError::TxDisconnected(None)) + } + }, } - None } } @@ -459,22 +515,25 @@ pub mod std_mod { #[derive(Clone)] pub struct EventSenderMpsc { target_id: ComponentId, - sender: mpsc::Sender<(Event, Option)>, + sender: mpsc::Sender>, } impl EventSenderMpsc { - pub fn new(target_id: ComponentId, sender: mpsc::Sender<(Event, Option)>) -> Self { + pub fn new(target_id: ComponentId, sender: mpsc::Sender>) -> Self { Self { target_id, sender } } } impl EventSendProvider for EventSenderMpsc { + type Error = GenericSendError; + fn target_id(&self) -> ComponentId { self.target_id } - fn send(&self, event: Event, aux_data: Option) -> Result<(), GenericSendError> { + + fn send(&self, event_msg: EventMessage) -> Result<(), GenericSendError> { self.sender - .send((event, aux_data)) + .send(event_msg) .map_err(|_| GenericSendError::RxDisconnected) } } @@ -484,14 +543,14 @@ pub mod std_mod { #[derive(Clone)] pub struct EventSenderMpscBounded { target_id: ComponentId, - sender: mpsc::SyncSender<(Event, Option)>, + sender: mpsc::SyncSender>, capacity: usize, } impl EventSenderMpscBounded { pub fn new( target_id: ComponentId, - sender: mpsc::SyncSender<(Event, Option)>, + sender: mpsc::SyncSender>, capacity: usize, ) -> Self { Self { @@ -503,11 +562,14 @@ pub mod std_mod { } impl EventSendProvider for EventSenderMpscBounded { + type Error = GenericSendError; + fn target_id(&self) -> ComponentId { self.target_id } - fn send(&self, event: Event, aux_data: Option) -> Result<(), GenericSendError> { - if let Err(e) = self.sender.try_send((event, aux_data)) { + + fn send(&self, event_msg: EventMessage) -> Result<(), Self::Error> { + if let Err(e) = self.sender.try_send(event_msg) { return match e { mpsc::TrySendError::Full(_) => { Err(GenericSendError::QueueFull(Some(self.capacity as u32))) @@ -530,19 +592,20 @@ mod tests { use super::*; use crate::event_man::EventManager; use crate::events::{EventU32, GenericEvent, Severity}; - use crate::params::ParamsRaw; + use crate::params::{ParamsHeapless, ParamsRaw}; + use crate::pus::test_util::{TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1}; use std::format; - use std::sync::mpsc::{self, channel, Receiver, Sender}; + use std::sync::mpsc::{self}; const TEST_EVENT: EventU32 = EventU32::const_new(Severity::INFO, 0, 5); fn check_next_event( expected: EventU32, - receiver: &Receiver, + receiver: &mpsc::Receiver, ) -> Option { - if let Ok(event) = receiver.try_recv() { - assert_eq!(event.0, expected); - return event.1; + if let Ok(event_msg) = receiver.try_recv() { + assert_eq!(event_msg.event, expected); + return event_msg.params; } None } @@ -555,17 +618,16 @@ mod tests { assert!(matches!(res, EventRoutingResult::Handled { .. })); if let EventRoutingResult::Handled { num_recipients, - event, - .. + event_msg, } = res { - assert_eq!(event, expected); + assert_eq!(event_msg.event, expected); assert_eq!(num_recipients, expected_num_sent); } } - fn generic_event_man() -> (Sender, EventManagerWithMpsc) { - let (event_sender, manager_queue) = channel(); + fn generic_event_man() -> (mpsc::Sender, EventManagerWithMpsc) { + let (event_sender, manager_queue) = mpsc::channel(); let event_man_receiver = MpscEventReceiver::new(manager_queue); (event_sender, EventManager::new(event_man_receiver)) } @@ -575,48 +637,56 @@ mod tests { let (event_sender, mut event_man) = generic_event_man(); let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap(); let event_grp_1_0 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); - let (single_event_sender, single_event_receiver) = channel(); + let (single_event_sender, single_event_receiver) = mpsc::channel(); let single_event_listener = EventSenderMpsc::new(0, single_event_sender); event_man.subscribe_single(&event_grp_0, single_event_listener.target_id()); event_man.add_sender(single_event_listener); - let (group_event_sender_0, group_event_receiver_0) = channel(); + let (group_event_sender_0, group_event_receiver_0) = mpsc::channel(); let group_event_listener = EventU32SenderMpsc::new(1, group_event_sender_0); event_man.subscribe_group(event_grp_1_0.group_id(), group_event_listener.target_id()); event_man.add_sender(group_event_listener); + let error_handler = |e: EventRoutingError| { + panic!("routing error occurred: {:?}", e); + }; // Test event with one listener event_sender - .send((event_grp_0, None)) + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_grp_0)) .expect("Sending single error failed"); - let res = event_man.try_event_handling(); - assert!(res.is_ok()); - check_handled_event(res.unwrap(), event_grp_0, 1); + let res = event_man.try_event_handling(&error_handler); + // assert!(res.is_ok()); + check_handled_event(res, event_grp_0, 1); check_next_event(event_grp_0, &single_event_receiver); // Test event which is sent to all group listeners event_sender - .send((event_grp_1_0, None)) + .send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_grp_1_0)) .expect("Sending group error failed"); - let res = event_man.try_event_handling(); - assert!(res.is_ok()); - check_handled_event(res.unwrap(), event_grp_1_0, 1); + let res = event_man.try_event_handling(&error_handler); + check_handled_event(res, event_grp_1_0, 1); check_next_event(event_grp_1_0, &group_event_receiver_0); } #[test] - fn test_with_basic_aux_data() { + fn test_with_basic_params() { + let error_handler = |e: EventRoutingError| { + panic!("routing error occurred: {:?}", e); + }; let (event_sender, mut event_man) = generic_event_man(); let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap(); - let (single_event_sender, single_event_receiver) = channel(); + let (single_event_sender, single_event_receiver) = mpsc::channel(); let single_event_listener = EventSenderMpsc::new(0, single_event_sender); event_man.subscribe_single(&event_grp_0, single_event_listener.target_id()); event_man.add_sender(single_event_listener); event_sender - .send((event_grp_0, Some(Params::Heapless((2_u32, 3_u32).into())))) + .send(EventMessage::new_with_params( + TEST_COMPONENT_ID_0.id(), + event_grp_0, + Params::Heapless((2_u32, 3_u32).into()), + )) .expect("Sending group error failed"); - let res = event_man.try_event_handling(); - assert!(res.is_ok()); - check_handled_event(res.unwrap(), event_grp_0, 1); + let res = event_man.try_event_handling(&error_handler); + check_handled_event(res, event_grp_0, 1); let aux = check_next_event(event_grp_0, &single_event_receiver); assert!(aux.is_some()); let aux = aux.unwrap(); @@ -631,15 +701,16 @@ mod tests { /// Test listening for multiple groups #[test] fn test_multi_group() { + let error_handler = |e: EventRoutingError| { + panic!("routing error occurred: {:?}", e); + }; let (event_sender, mut event_man) = generic_event_man(); - let res = event_man.try_event_handling(); - assert!(res.is_ok()); - let hres = res.unwrap(); - assert!(matches!(hres, EventRoutingResult::Empty)); + let res = event_man.try_event_handling(&error_handler); + assert!(matches!(res, EventRoutingResult::Empty)); let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap(); let event_grp_1_0 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); - let (event_grp_0_sender, event_grp_0_receiver) = channel(); + let (event_grp_0_sender, event_grp_0_receiver) = mpsc::channel(); let event_grp_0_and_1_listener = EventU32SenderMpsc::new(0, event_grp_0_sender); event_man.subscribe_group( event_grp_0.group_id(), @@ -652,17 +723,15 @@ mod tests { event_man.add_sender(event_grp_0_and_1_listener); event_sender - .send((event_grp_0, None)) + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_grp_0)) .expect("Sending Event Group 0 failed"); event_sender - .send((event_grp_1_0, None)) + .send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_grp_1_0)) .expect("Sendign Event Group 1 failed"); - let res = event_man.try_event_handling(); - assert!(res.is_ok()); - check_handled_event(res.unwrap(), event_grp_0, 1); - let res = event_man.try_event_handling(); - assert!(res.is_ok()); - check_handled_event(res.unwrap(), event_grp_1_0, 1); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_grp_0, 1); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_grp_1_0, 1); check_next_event(event_grp_0, &event_grp_0_receiver); check_next_event(event_grp_1_0, &event_grp_0_receiver); @@ -672,11 +741,14 @@ mod tests { /// to both group and single events from one listener #[test] fn test_listening_to_same_event_and_multi_type() { + let error_handler = |e: EventRoutingError| { + panic!("routing error occurred: {:?}", e); + }; let (event_sender, mut event_man) = generic_event_man(); let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap(); let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); - let (event_0_tx_0, event_0_rx_0) = channel(); - let (event_0_tx_1, event_0_rx_1) = channel(); + let (event_0_tx_0, event_0_rx_0) = mpsc::channel(); + let (event_0_tx_1, event_0_rx_1) = mpsc::channel(); let event_listener_0 = EventU32SenderMpsc::new(0, event_0_tx_0); let event_listener_1 = EventU32SenderMpsc::new(1, event_0_tx_1); let event_listener_0_sender_id = event_listener_0.target_id(); @@ -686,28 +758,25 @@ mod tests { event_man.subscribe_single(&event_0, event_listener_1_sender_id); event_man.add_sender(event_listener_1); event_sender - .send((event_0, None)) + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_0)) .expect("Triggering Event 0 failed"); - let res = event_man.try_event_handling(); - assert!(res.is_ok()); - check_handled_event(res.unwrap(), event_0, 2); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_0, 2); check_next_event(event_0, &event_0_rx_0); check_next_event(event_0, &event_0_rx_1); event_man.subscribe_group(event_1.group_id(), event_listener_0_sender_id); event_sender - .send((event_0, None)) + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_0)) .expect("Triggering Event 0 failed"); event_sender - .send((event_1, None)) + .send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_1)) .expect("Triggering Event 1 failed"); // 3 Events messages will be sent now - let res = event_man.try_event_handling(); - assert!(res.is_ok()); - check_handled_event(res.unwrap(), event_0, 2); - let res = event_man.try_event_handling(); - assert!(res.is_ok()); - check_handled_event(res.unwrap(), event_1, 1); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_0, 2); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_1, 1); // Both the single event and the group event should arrive now check_next_event(event_0, &event_0_rx_0); check_next_event(event_1, &event_0_rx_0); @@ -716,36 +785,36 @@ mod tests { event_man.subscribe_group(event_1.group_id(), event_listener_0_sender_id); event_man.remove_duplicates(&ListenerKey::Group(event_1.group_id())); event_sender - .send((event_1, None)) + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_1)) .expect("Triggering Event 1 failed"); - let res = event_man.try_event_handling(); - assert!(res.is_ok()); - check_handled_event(res.unwrap(), event_1, 1); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_1, 1); } #[test] fn test_all_events_listener() { - let (event_sender, manager_queue) = channel(); + let error_handler = |e: EventRoutingError| { + panic!("routing error occurred: {:?}", e); + }; + let (event_sender, manager_queue) = mpsc::channel(); let event_man_receiver = MpscEventReceiver::new(manager_queue); let mut event_man = EventManagerWithMpsc::new(event_man_receiver); let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap(); let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); - let (event_0_tx_0, all_events_rx) = channel(); + let (event_0_tx_0, all_events_rx) = mpsc::channel(); let all_events_listener = EventU32SenderMpsc::new(0, event_0_tx_0); event_man.subscribe_all(all_events_listener.target_id()); event_man.add_sender(all_events_listener); event_sender - .send((event_0, None)) + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_0)) .expect("Triggering event 0 failed"); event_sender - .send((event_1, None)) + .send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_1)) .expect("Triggering event 1 failed"); - let res = event_man.try_event_handling(); - assert!(res.is_ok()); - check_handled_event(res.unwrap(), event_0, 1); - let res = event_man.try_event_handling(); - assert!(res.is_ok()); - check_handled_event(res.unwrap(), event_1, 1); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_0, 1); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_1, 1); check_next_event(event_0, &all_events_rx); check_next_event(event_1, &all_events_rx); } @@ -755,15 +824,15 @@ mod tests { let (event_sender, _event_receiver) = mpsc::sync_channel(3); let event_sender = EventU32SenderMpscBounded::new(1, event_sender, 3); event_sender - .send_no_data(TEST_EVENT) + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)) .expect("sending test event failed"); event_sender - .send_no_data(TEST_EVENT) + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)) .expect("sending test event failed"); event_sender - .send_no_data(TEST_EVENT) + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)) .expect("sending test event failed"); - let error = event_sender.send_no_data(TEST_EVENT); + let error = event_sender.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)); if let Err(e) = error { assert!(matches!(e, GenericSendError::QueueFull(Some(3)))); } else { @@ -775,7 +844,7 @@ mod tests { let (event_sender, event_receiver) = mpsc::sync_channel(3); let event_sender = EventU32SenderMpscBounded::new(1, event_sender, 3); drop(event_receiver); - if let Err(e) = event_sender.send_no_data(TEST_EVENT) { + if let Err(e) = event_sender.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)) { assert!(matches!(e, GenericSendError::RxDisconnected)); } else { panic!("Expected error"); diff --git a/satrs/src/events.rs b/satrs/src/events.rs index 2732726..032322a 100644 --- a/satrs/src/events.rs +++ b/satrs/src/events.rs @@ -80,7 +80,7 @@ impl HasSeverity for SeverityHigh { const SEVERITY: Severity = Severity::HIGH; } -pub trait GenericEvent: EcssEnumeration { +pub trait GenericEvent: EcssEnumeration + Copy + Clone { type Raw; type GroupId; type UniqueId; diff --git a/satrs/src/pus/event_man.rs b/satrs/src/pus/event_man.rs index c907f94..43c1c31 100644 --- a/satrs/src/pus/event_man.rs +++ b/satrs/src/pus/event_man.rs @@ -157,8 +157,8 @@ pub mod alloc_mod { phantom: PhantomData<(E, EV)>, } - impl, EV: GenericEvent, E> - PusEventDispatcher + impl, Event: GenericEvent, E> + PusEventDispatcher { pub fn new(reporter: EventReporter, backend: B) -> Self { Self { @@ -168,11 +168,11 @@ pub mod alloc_mod { } } - pub fn enable_tm_for_event(&mut self, event: &EV) -> Result { + pub fn enable_tm_for_event(&mut self, event: &Event) -> Result { self.backend.enable_event_reporting(event) } - pub fn disable_tm_for_event(&mut self, event: &EV) -> Result { + pub fn disable_tm_for_event(&mut self, event: &Event) -> Result { self.backend.disable_event_reporting(event) } @@ -180,8 +180,8 @@ pub mod alloc_mod { &self, sender: &(impl EcssTmSenderCore + ?Sized), time_stamp: &[u8], - event: EV, - aux_data: Option<&[u8]>, + event: Event, + params: Option<&[u8]>, ) -> Result { if !self.backend.event_enabled(&event) { return Ok(false); @@ -189,22 +189,22 @@ pub mod alloc_mod { match event.severity() { Severity::INFO => self .reporter - .event_info(sender, time_stamp, event, aux_data) + .event_info(sender, time_stamp, event, params) .map(|_| true) .map_err(|e| e.into()), Severity::LOW => self .reporter - .event_low_severity(sender, time_stamp, event, aux_data) + .event_low_severity(sender, time_stamp, event, params) .map(|_| true) .map_err(|e| e.into()), Severity::MEDIUM => self .reporter - .event_medium_severity(sender, time_stamp, event, aux_data) + .event_medium_severity(sender, time_stamp, event, params) .map(|_| true) .map_err(|e| e.into()), Severity::HIGH => self .reporter - .event_high_severity(sender, time_stamp, event, aux_data) + .event_high_severity(sender, time_stamp, event, params) .map(|_| true) .map_err(|e| e.into()), } diff --git a/satrs/tests/pus_events.rs b/satrs/tests/pus_events.rs index d9c87fe..be4e9ea 100644 --- a/satrs/tests/pus_events.rs +++ b/satrs/tests/pus_events.rs @@ -1,10 +1,12 @@ use satrs::event_man::{ - EventManagerWithMpsc, EventSendProvider, EventU32SenderMpsc, MpscEventU32Receiver, + EventManagerWithMpsc, EventMessage, EventRoutingError, EventSendProvider, EventU32SenderMpsc, + MpscEventU32Receiver, }; use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo}; use satrs::params::U32Pair; use satrs::params::{Params, ParamsHeapless, WritableToBeBytes}; use satrs::pus::event_man::{DefaultPusEventMgmtBackend, EventReporter, PusEventDispatcher}; +use satrs::pus::test_util::TEST_COMPONENT_ID_0; use satrs::pus::PusTmAsVec; use satrs::request::UniqueApidTargetId; use spacepackets::ecss::tm::PusTmReader; @@ -39,24 +41,26 @@ fn test_threaded_usage() { let reporter = EventReporter::new(TEST_ID.raw(), 0x02, 128).expect("Creating event reporter failed"); let pus_event_man = PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default()); + let error_handler = |error: EventRoutingError| { + panic!("received routing error {error:?}"); + }; // PUS + Generic event manager thread let jh0 = thread::spawn(move || { let mut event_cnt = 0; let mut params_array: [u8; 128] = [0; 128]; loop { - let res = event_man.try_event_handling(); - assert!(res.is_ok()); + event_man.try_event_handling(error_handler); match pus_event_man_rx.try_recv() { - Ok((event, aux_data)) => { + Ok(event_msg) => { let gen_event = |aux_data| { pus_event_man.generate_pus_event_tm_generic( &event_tx, &EMPTY_STAMP, - event, + event_msg.event(), aux_data, ) }; - let res = if let Some(aux_data) = aux_data { + let res = if let Some(aux_data) = event_msg.params() { match aux_data { Params::Heapless(heapless) => match heapless { ParamsHeapless::Raw(raw) => { @@ -97,7 +101,10 @@ fn test_threaded_usage() { // Event sender and TM checker thread let jh1 = thread::spawn(move || { event_sender - .send((INFO_EVENT.into(), None)) + .send(EventMessage::new( + TEST_COMPONENT_ID_0.id(), + INFO_EVENT.into(), + )) .expect("Sending info event failed"); loop { match event_rx.try_recv() { @@ -123,7 +130,11 @@ fn test_threaded_usage() { } } event_sender - .send((LOW_SEV_EVENT, Some(Params::Heapless((2_u32, 3_u32).into())))) + .send(EventMessage::new_with_params( + TEST_COMPONENT_ID_0.id(), + LOW_SEV_EVENT, + Params::Heapless((2_u32, 3_u32).into()), + )) .expect("Sending low severity event failed"); loop { match event_rx.try_recv() {