From fe7e4fac59cbd93e6ce389983aad6a58a542eb40 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 16 Oct 2025 12:53:06 +0200 Subject: [PATCH] rework event management module --- satrs/Cargo.toml | 6 +- satrs/src/event_man_2.rs | 876 +++++++++++++++++++++++++++++++++++++++ satrs/src/events.rs | 2 +- satrs/src/events2.rs | 222 ++++++++++ satrs/src/lib.rs | 3 + 5 files changed, 1105 insertions(+), 4 deletions(-) create mode 100644 satrs/src/event_man_2.rs create mode 100644 satrs/src/events2.rs diff --git a/satrs/Cargo.toml b/satrs/Cargo.toml index 28b64b1..58755cd 100644 --- a/satrs/Cargo.toml +++ b/satrs/Cargo.toml @@ -24,9 +24,9 @@ cobs = { version = "0.4", default-features = false } thiserror = { version = "2", default-features = false } hashbrown = { version = ">=0.14, <=0.15", optional = true } -static_cell = { version = "2", optional = true } +static_cell = { version = "2" } +heapless = { version = "0.9" } dyn-clone = { version = "1", optional = true } -heapless = { version = "0.9", optional = true } downcast-rs = { version = "2", default-features = false, optional = true } bus = { version = "2.2", optional = true } crossbeam-channel = { version = "0.5", default-features = false, optional = true } @@ -70,7 +70,7 @@ alloc = [ ] serde = ["dep:serde", "spacepackets/serde", "satrs-shared/serde"] crossbeam = ["crossbeam-channel"] -heapless = ["dep:heapless", "static_cell"] +# heapless = ["dep:heapless", "static_cell"] defmt = ["dep:defmt", "spacepackets/defmt"] test_util = [] diff --git a/satrs/src/event_man_2.rs b/satrs/src/event_man_2.rs new file mode 100644 index 0000000..10bfbcd --- /dev/null +++ b/satrs/src/event_man_2.rs @@ -0,0 +1,876 @@ +//! Event management and forwarding +//! +//! It is recommended to read the +//! [sat-rs book chapter](https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/book/events.html) +//! about events first. +//! +//! This module provides components to perform event routing. The most important component for this +//! task is the [EventManager]. It receives all events and then routes them to event subscribers +//! where appropriate. +//! +//! The event manager has a listener table abstracted by the [ListenerMap], which maps +//! listener groups identified by [ListenerKey]s to a [listener ID][ComponentId]. +//! It also contains a sender table abstracted by the [SenderMap] which maps these sender +//! IDs to concrete [EventSender]s. A simple approach would be to use one send event provider +//! for each OBSW thread and then subscribe for all interesting events for a particular thread +//! using the send event provider ID. +//! +//! This can be done with the [EventManager] like this: +//! +//! 1. Provide a concrete [EventReceiver] implementation. This abstraction allow to use different +//! message queue backends. A straightforward implementation where dynamic memory allocation is +//! not a big concern would be to use the [std::sync::mpsc::Receiver] handle. The trait is +//! already implemented for this type. +//! 2. To set up event creators, create channel pairs using some message queue implementation. +//! Each event creator gets a (cloned) sender component which allows it to send events to the +//! manager. +//! 3. The event manager receives the receiver component as part of a [EventReceiver] +//! implementation so all events are routed to the manager. +//! 4. Create the [event sender map][SenderMap]s which allow routing events to +//! subscribers. You can now use the subscriber component IDs to subscribe +//! for event groups, for example by using the [EventManager::subscribe_single] method. +//! 5. Add the send provider as well using the [EventManager::add_sender] call so the event +//! manager can route listener groups to a the send provider. +//! +//! Some components like a PUS Event Service or PUS Event Action Service might require all +//! events to package them as telemetry or start actions where applicable. +//! Other components might only be interested in certain events. For example, a thermal system +//! handler might only be interested in temperature events generated by a thermal sensor component. +//! +//! # Examples +//! +//! You can check [integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/pus_events.rs) +//! for a concrete example using multi-threading where events are routed to +//! different threads. +//! +//! The [satrs-example](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example) +//! also contains a full event manager instance and exposes a test event via the PUS test service. +//! The [PUS event](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/pus/event.rs) +//! module and the generic [events module](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/events.rs) +//! show how the event management modules can be integrated into a more complex software. +use core::{marker::PhantomData, option::Iter}; + +use crate::{ + ComponentId, + events2::{Event, EventDynParam, EventId, GroupId}, + queue::GenericSendError, +}; + +#[cfg(feature = "alloc")] +pub use alloc_mod::*; +#[cfg(feature = "std")] +pub use std_mod::*; + +#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum ListenerKey { + Single(EventId), + Group(GroupId), + All, +} + +#[derive(Debug)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub struct EventMessage { + sender_id: ComponentId, + event: EventInstance, +} + +impl EventMessage { + pub fn new(sender_id: ComponentId, event: &EventInstance) -> Self { + EventMessage { sender_id, event: event.clone() } + } + + pub fn sender_id(&self) -> ComponentId { + self.sender_id + } + + pub fn event(&self) -> &EventInstance { + &self.event + } +} + +/// Generic abstraction for event senders. +pub trait EventSender { + type Error; + + fn target_id(&self) -> ComponentId; + + fn send(&self, message: EventMessage) -> Result<(), Self::Error>; +} + +pub trait ListenerMap { + #[cfg(feature = "alloc")] + fn get_listeners(&self) -> alloc::vec::Vec; + fn contains_listener(&self, key: &ListenerKey) -> bool; + fn get_listener_ids(&self, key: &ListenerKey) -> Option>; + fn add_listener(&mut self, key: ListenerKey, listener_id: ComponentId) -> bool; + fn remove_duplicates(&mut self, key: &ListenerKey); +} + +/// Generic abstraction for an event receiver. +pub trait EventReceiver { + type Error; + + /// This function has to be provided by any event receiver. A call may or may not return + /// an event. + fn try_recv_event(&self) -> Result>, Self::Error>; +} + +pub trait SenderMap, EventInstance: Event> { + fn contains_send_event_provider(&self, target_id: &ComponentId) -> bool; + + fn get_send_event_provider(&self, target_id: &ComponentId) -> Option<&EventSenderInstance>; + fn add_send_event_provider(&mut self, send_provider: EventSenderInstance) -> bool; +} + +/// Generic event manager implementation. +/// +/// # Generics +/// +/// * `EventReceiver`: [EventReceiveProvider] used to receive all events. +/// * `SenderMap`: [SenderMapProvider] which maps channel IDs to send providers. +/// * `ListenerMap`: [ListenerMapProvider] which maps listener keys to channel IDs. +/// * `EventSender`: [EventSendProvider] contained within the sender map which sends the events. +/// * `Event`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32] +/// and [EventU16] are supported. +/// * `ParamProvider`: Auxiliary data which is sent with the event to provide optional context +/// information +pub struct EventManager< + EventReceiverInstance: EventReceiver, + SenderMapInstance: SenderMap, + ListenerMapInstance: ListenerMap, + EventSenderInstance: EventSender, + EventInstance: Event, +> { + event_receiver: EventReceiverInstance, + sender_map: SenderMapInstance, + listener_map: ListenerMapInstance, + phantom: core::marker::PhantomData<(EventSenderInstance, EventInstance)>, +} + +#[derive(Debug)] +pub enum EventRoutingResult { + /// No event was received + Empty, + /// An event was received and routed to listeners. + Handled { + num_recipients: u32, + event_msg: EventMessage, + }, +} + +#[derive(Debug)] +pub enum EventRoutingError { + Send(GenericSendError), + NoSendersForKey(ListenerKey), + NoSenderForId(ComponentId), +} + +impl< + EventReceiverInstance: EventReceiver, + SenderMapInstance: SenderMap, + ListenerMapInstance: ListenerMap, + EventSenderInstance: EventSender, + EventInstance: Event, +> + EventManager< + EventReceiverInstance, + SenderMapInstance, + ListenerMapInstance, + EventSenderInstance, + EventInstance, + > +{ + pub fn remove_duplicates(&mut self, key: &ListenerKey) { + self.listener_map.remove_duplicates(key) + } + + /// Subscribe for a unique event. + pub fn subscribe_single(&mut self, event: EventId, sender_id: ComponentId) { + self.update_listeners(ListenerKey::Single(event), sender_id); + } + + /// Subscribe for an event group. + pub fn subscribe_group(&mut self, group_id: GroupId, sender_id: ComponentId) { + self.update_listeners(ListenerKey::Group(group_id), sender_id); + } + + /// Subscribe for all events received by the manager. + /// + /// For example, this can be useful for a handler component which sends every event as + /// a telemetry packet. + pub fn subscribe_all(&mut self, sender_id: ComponentId) { + self.update_listeners(ListenerKey::All, sender_id); + } +} +impl< + EventReceiverInstance: EventReceiver, + SenderMapInstance: SenderMap, + ListenerMapInstance: ListenerMap, + EventSenderInstance: EventSender, + EventInstance: Event, +> + EventManager< + EventReceiverInstance, + SenderMapInstance, + ListenerMapInstance, + EventSenderInstance, + EventInstance, + > +{ + pub fn new_with_custom_maps( + event_receiver: EventReceiverInstance, + sender_map: SenderMapInstance, + listener_map: ListenerMapInstance, + ) -> Self { + EventManager { + listener_map, + sender_map, + event_receiver, + phantom: PhantomData, + } + } + + /// Add a new sender component which can be used to send events to subscribers. + pub fn add_sender(&mut self, send_provider: EventSenderInstance) { + if !self + .sender_map + .contains_send_event_provider(&send_provider.target_id()) + { + self.sender_map.add_send_event_provider(send_provider); + } + } + + /// Generic function to update the event subscribers. + fn update_listeners(&mut self, key: ListenerKey, sender_id: ComponentId) { + self.listener_map.add_listener(key, sender_id); + } +} + +impl< + EventReceiverInstance: EventReceiver, + SenderMapInstance: SenderMap, + ListenerMapInstance: ListenerMap, + EventSenderInstance: EventSender, + EventInstance: Event, +> + EventManager< + EventReceiverInstance, + SenderMapInstance, + ListenerMapInstance, + EventSenderInstance, + EventInstance, + > +{ + /// This function will use the cached event receiver and try to receive one event. + /// If an event was received, it will try to route that event to all subscribed event listeners. + /// If this works without any issues, the [EventRoutingResult] will contain context information + /// about the routed event. + /// + /// If an error occurs during the routing, the error handler will be called. The error handler + /// should take a reference to the event message as the first argument, and the routing error + /// as the second argument. + pub fn try_event_handling, EventRoutingError)>( + &self, + mut error_handler: E, + ) -> EventRoutingResult { + let mut num_recipients = 0; + let mut send_handler = |key: &ListenerKey, event_msg: &EventMessage| { + if self.listener_map.contains_listener(key) { + if let Some(ids) = self.listener_map.get_listener_ids(key) { + for id in ids { + if let Some(sender) = self.sender_map.get_send_event_provider(id) { + if let Err(e) = sender.send(EventMessage::new( + event_msg.sender_id, + event_msg.event.clone(), + )) { + error_handler(event_msg, EventRoutingError::Send(e)); + } else { + num_recipients += 1; + } + } else { + error_handler(event_msg, EventRoutingError::NoSenderForId(*id)); + } + } + } else { + error_handler(event_msg, EventRoutingError::NoSendersForKey(*key)); + } + } + }; + if let Ok(Some(event_msg)) = self.event_receiver.try_recv_event() { + let single_key = ListenerKey::Single(event_msg.event().id()); + send_handler(&single_key, &event_msg); + let group_key = ListenerKey::Group(event_msg.event().id().group_id()); + send_handler(&group_key, &event_msg); + send_handler(&ListenerKey::All, &event_msg); + return EventRoutingResult::Handled { + num_recipients, + event_msg, + }; + } + EventRoutingResult::Empty + } +} + +#[cfg(feature = "alloc")] +pub mod alloc_mod { + use alloc::vec::Vec; + use hashbrown::HashMap; + + use super::*; + + /// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap] + /// and the [DefaultListenerMap]. It uses regular mpsc channels as the message queue backend. + pub type EventManagerWithMpsc = EventManager< + EventDynParamReceiverMpsc, + DefaultSenderMap, EventInstance>, + DefaultListenerMap, + EventSenderMpsc, + EventInstance, + >; + + /// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap] + /// and the [DefaultListenerMap]. It uses + /// [bounded mpsc senders](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) as the + /// message queue backend. + pub type EventManagerWithBoundedMpsc = EventManager< + EventDynParamReceiverMpsc, + DefaultSenderMap, EventInstance>, + DefaultListenerMap, + EventSenderMpscBounded, + EventInstance, + >; + + impl< + EventReceiverInstance: EventReceiver, + EventSenderInstance: EventSender, + EventInstance: Event, + > + EventManager< + EventReceiverInstance, + DefaultSenderMap, + DefaultListenerMap, + EventSenderInstance, + EventInstance, + > + { + /// Create an event manager where the sender table will be the [DefaultSenderMap] + /// and the listener table will be the [DefaultListenerMap]. + pub fn new(event_receiver: EventReceiverInstance) -> Self { + Self { + listener_map: DefaultListenerMap::default(), + sender_map: DefaultSenderMap::default(), + event_receiver, + phantom: PhantomData, + } + } + } + + /// Default listener map. + /// + /// Simple implementation which uses a [HashMap] and a [Vec] internally. + #[derive(Default)] + pub struct DefaultListenerMap { + listeners: HashMap>, + } + + impl ListenerMap for DefaultListenerMap { + fn get_listeners(&self) -> Vec { + let mut key_list = Vec::new(); + for key in self.listeners.keys() { + key_list.push(*key); + } + key_list + } + + fn contains_listener(&self, key: &ListenerKey) -> bool { + self.listeners.contains_key(key) + } + + fn get_listener_ids(&self, key: &ListenerKey) -> Option> { + self.listeners.get(key).map(|vec| vec.iter()) + } + + fn add_listener(&mut self, key: ListenerKey, sender_id: ComponentId) -> bool { + if let Some(existing_list) = self.listeners.get_mut(&key) { + existing_list.push(sender_id); + } else { + let new_list = alloc::vec![sender_id]; + self.listeners.insert(key, new_list); + } + true + } + + fn remove_duplicates(&mut self, key: &ListenerKey) { + if let Some(list) = self.listeners.get_mut(key) { + list.sort_unstable(); + list.dedup(); + } + } + } + + /// Default sender map. + /// + /// Simple implementation which uses a [HashMap] internally. + pub struct DefaultSenderMap< + EventSenderInstance: EventSender, + EventInstance: Event = EventDynParam, + > { + senders: HashMap, + phantom: PhantomData, + } + + impl, EventInstance: Event> Default + for DefaultSenderMap + { + fn default() -> Self { + Self { + senders: Default::default(), + phantom: Default::default(), + } + } + } + + impl, EventInstance: Event> + SenderMap + for DefaultSenderMap + { + fn contains_send_event_provider(&self, id: &ComponentId) -> bool { + self.senders.contains_key(id) + } + + fn get_send_event_provider(&self, id: &ComponentId) -> Option<&EventSenderInstance> { + self.senders + .get(id) + .filter(|sender| sender.target_id() == *id) + } + + fn add_send_event_provider(&mut self, send_provider: EventSenderInstance) -> bool { + let id = send_provider.target_id(); + if self.senders.contains_key(&id) { + return false; + } + self.senders.insert(id, send_provider).is_none() + } + } +} + +#[cfg(feature = "std")] +pub mod std_mod { + use crate::{events2::EventHeapless, queue::GenericReceiveError}; + + use super::*; + use std::sync::mpsc; + + impl EventReceiver + for mpsc::Receiver> + { + type Error = GenericReceiveError; + + fn try_recv_event(&self) -> Result>, Self::Error> { + match self.try_recv() { + Ok(msg) => Ok(Some(msg)), + Err(e) => match e { + mpsc::TryRecvError::Empty => Ok(None), + mpsc::TryRecvError::Disconnected => { + Err(GenericReceiveError::TxDisconnected(None)) + } + }, + } + } + } + + pub type EventDynParamReceiverMpsc = mpsc::Receiver>; + pub type EventHeaplessReceiverMpsc = + mpsc::Receiver>>; + + /// Generic event sender which uses a regular [mpsc::Sender] as the messaging backend to + /// send events. + #[derive(Clone)] + pub struct EventSenderMpsc { + target_id: ComponentId, + sender: mpsc::Sender>, + } + + impl EventSenderMpsc { + pub fn new( + target_id: ComponentId, + sender: mpsc::Sender>, + ) -> Self { + Self { target_id, sender } + } + } + + impl EventSender for EventSenderMpsc { + type Error = GenericSendError; + + fn target_id(&self) -> ComponentId { + self.target_id + } + + fn send(&self, event_msg: EventMessage) -> Result<(), GenericSendError> { + self.sender + .send(event_msg) + .map_err(|_| GenericSendError::RxDisconnected) + } + } + + /// Generic event sender which uses the [mpsc::SyncSender] as the messaging backend to send + /// events. This has the advantage that the channel is bounded and thus more deterministic. + #[derive(Clone)] + pub struct EventSenderMpscBounded { + target_id: ComponentId, + sender: mpsc::SyncSender>, + capacity: usize, + } + + impl EventSenderMpscBounded { + pub fn new( + target_id: ComponentId, + sender: mpsc::SyncSender>, + capacity: usize, + ) -> Self { + Self { + target_id, + sender, + capacity, + } + } + } + + impl EventSender + for EventSenderMpscBounded + { + type Error = GenericSendError; + + fn target_id(&self) -> ComponentId { + self.target_id + } + + fn send(&self, event_msg: EventMessage) -> Result<(), Self::Error> { + if let Err(e) = self.sender.try_send(event_msg) { + return match e { + mpsc::TrySendError::Full(_) => { + Err(GenericSendError::QueueFull(Some(self.capacity as u32))) + } + mpsc::TrySendError::Disconnected(_) => Err(GenericSendError::RxDisconnected), + }; + } + Ok(()) + } + } + + pub type EventDynParamSenderMpsc = EventSenderMpsc; + pub type EventHeaplessSenderMpsc = EventSenderMpsc>; + pub type EventDynParamSenderMpscBounded = EventSenderMpscBounded; + pub type EventHeaplessSenderMpscBounded = + EventSenderMpscBounded>; +} + +#[cfg(test)] +mod tests { + use arbitrary_int::u14; + + use super::*; + use crate::events2::Severity; + use crate::params::{ParamsHeapless, ParamsRaw}; + use crate::pus::test_util::{TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1}; + use std::format; + use std::sync::mpsc; + use std::vec::Vec; + + const TEST_GROUP_ID_0: u14 = u14::new(0); + + #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)] + pub enum TestEventIds { + TestInfo, + TestError, + } + + impl Event for TestEventIds { + fn id(&self) -> EventId { + match self { + TestEventIds::TestInfo => EventId::new( + Severity::Info, + TEST_GROUP_ID_0, + TestEventIds::TestInfo as u16, + ), + TestEventIds::TestError => EventId::new( + Severity::High, + TEST_GROUP_ID_0, + TestEventIds::TestError as u16, + ), + } + } + } + + fn check_next_event( + expected: EventDynParam, + receiver: &mpsc::Receiver>, + ) -> Option> { + if let Ok(event_msg) = receiver.try_recv() { + assert_eq!(event_msg.event, expected); + return event_msg.event.parameters().map(|p| p.to_vec()); + } + None + } + + fn check_handled_event( + res: EventRoutingResult, + expected: &EventDynParam, + expected_num_sent: u32, + expected_sender_id: ComponentId, + ) { + assert!(matches!(res, EventRoutingResult::Handled { .. })); + if let EventRoutingResult::Handled { + num_recipients, + event_msg, + } = res + { + assert_eq!(event_msg.event, *expected); + assert_eq!(event_msg.sender_id, expected_sender_id); + assert_eq!(num_recipients, expected_num_sent); + } + } + + fn generic_event_man() -> ( + mpsc::Sender>, + EventManagerWithMpsc, + ) { + let (event_sender, event_receiver) = mpsc::channel(); + (event_sender, EventManager::new(event_receiver)) + } + + #[test] + fn test_basic() { + let (event_sender, mut event_man) = generic_event_man(); + //let event_grp_0 = EventU::new(Severity::Info, 0, 0); + //let event_grp_1_0 = EventU32::new(Severity::High, 1, 0); + let (single_event_sender, single_event_receiver) = mpsc::channel(); + let single_event_listener = EventSenderMpsc::new(0, single_event_sender); + event_man.subscribe_single( + TestEventIds::TestInfo.id(), + single_event_listener.target_id(), + ); + event_man.add_sender(single_event_listener); + let (group_event_sender_0, group_event_receiver_0) = mpsc::channel(); + let group_event_listener = EventDynParamSenderMpsc::new(1, group_event_sender_0); + event_man.subscribe_group( + TestEventIds::TestError.id().group_id(), + group_event_listener.target_id(), + ); + event_man.add_sender(group_event_listener); + + let error_handler = |event_msg: &EventMessage, e: EventRoutingError| { + panic!("routing error occurred for event {:?}: {:?}", event_msg, e); + }; + let event_grp_0 = + EventDynParam::new_no_params(TestEventIds::TestInfo.id()); + let event_grp_1 = + EventDynParam::new_no_params(TestEventIds::TestError.id()); + // Test event with one listener + event_sender + .send(EventMessage::new( + TEST_COMPONENT_ID_0.id(), + &event_grp_0 + )) + .expect("Sending single error failed"); + let res = event_man.try_event_handling(&error_handler); + check_handled_event(res, &event_grp_0, 1, TEST_COMPONENT_ID_0.id()); + check_next_event(event_grp_0, &single_event_receiver); + + // Test event which is sent to all group listeners + event_sender + .send(EventMessage::new(TEST_COMPONENT_ID_1.id(), &event_grp_1)) + .expect("Sending group error failed"); + let res = event_man.try_event_handling(&error_handler); + check_handled_event(res, &event_grp_1, 1, TEST_COMPONENT_ID_1.id()); + check_next_event(event_grp_1, &group_event_receiver_0); + } + + #[test] + fn test_with_basic_params() { + let error_handler = |event_msg: &EventMessage, e: EventRoutingError| { + panic!("routing error occurred for event {:?}: {:?}", event_msg, e); + }; + let (event_sender, mut event_man) = generic_event_man(); + let event_grp_0 = EventU32::new(Severity::Info, 0, 0); + let (single_event_sender, single_event_receiver) = mpsc::channel(); + let single_event_listener = EventSenderMpsc::new(0, single_event_sender); + event_man.subscribe_single(&event_grp_0, single_event_listener.target_id()); + event_man.add_sender(single_event_listener); + event_sender + .send(EventMessage::new_with_params( + TEST_COMPONENT_ID_0.id(), + event_grp_0, + &Params::Heapless((2_u32, 3_u32).into()), + )) + .expect("Sending group error failed"); + let res = event_man.try_event_handling(&error_handler); + check_handled_event(res, event_grp_0, 1, TEST_COMPONENT_ID_0.id()); + let aux = check_next_event(event_grp_0, &single_event_receiver); + assert!(aux.is_some()); + let aux = aux.unwrap(); + if let Params::Heapless(ParamsHeapless::Raw(ParamsRaw::U32Pair(pair))) = aux { + assert_eq!(pair.0, 2); + assert_eq!(pair.1, 3); + } else { + panic!("{}", format!("Unexpected auxiliary value type {:?}", aux)); + } + } + + /// Test listening for multiple groups + #[test] + fn test_multi_group() { + let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| { + panic!("routing error occurred for event {:?}: {:?}", event_msg, e); + }; + let (event_sender, mut event_man) = generic_event_man(); + let res = event_man.try_event_handling(error_handler); + assert!(matches!(res, EventRoutingResult::Empty)); + + let event_grp_0 = EventU32::new(Severity::Info, 0, 0); + let event_grp_1_0 = EventU32::new(Severity::High, 1, 0); + let (event_grp_0_sender, event_grp_0_receiver) = mpsc::channel(); + let event_grp_0_and_1_listener = EventU32SenderMpsc::new(0, event_grp_0_sender); + event_man.subscribe_group( + event_grp_0.group_id(), + event_grp_0_and_1_listener.target_id(), + ); + event_man.subscribe_group( + event_grp_1_0.group_id(), + event_grp_0_and_1_listener.target_id(), + ); + event_man.add_sender(event_grp_0_and_1_listener); + + event_sender + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_grp_0)) + .expect("Sending Event Group 0 failed"); + event_sender + .send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_grp_1_0)) + .expect("Sendign Event Group 1 failed"); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_grp_0, 1, TEST_COMPONENT_ID_0.id()); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_grp_1_0, 1, TEST_COMPONENT_ID_1.id()); + + check_next_event(event_grp_0, &event_grp_0_receiver); + check_next_event(event_grp_1_0, &event_grp_0_receiver); + } + + /// Test listening to the same event from multiple listeners. Also test listening + /// to both group and single events from one listener + #[test] + fn test_listening_to_same_event_and_multi_type() { + let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| { + panic!("routing error occurred for event {:?}: {:?}", event_msg, e); + }; + let (event_sender, mut event_man) = generic_event_man(); + let event_0 = EventU32::new(Severity::Info, 0, 5); + let event_1 = EventU32::new(Severity::High, 1, 0); + let (event_0_tx_0, event_0_rx_0) = mpsc::channel(); + let (event_0_tx_1, event_0_rx_1) = mpsc::channel(); + let event_listener_0 = EventU32SenderMpsc::new(0, event_0_tx_0); + let event_listener_1 = EventU32SenderMpsc::new(1, event_0_tx_1); + let event_listener_0_sender_id = event_listener_0.target_id(); + event_man.subscribe_single(&event_0, event_listener_0_sender_id); + event_man.add_sender(event_listener_0); + let event_listener_1_sender_id = event_listener_1.target_id(); + event_man.subscribe_single(&event_0, event_listener_1_sender_id); + event_man.add_sender(event_listener_1); + event_sender + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_0)) + .expect("Triggering Event 0 failed"); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_0, 2, TEST_COMPONENT_ID_0.id()); + check_next_event(event_0, &event_0_rx_0); + check_next_event(event_0, &event_0_rx_1); + event_man.subscribe_group(event_1.group_id(), event_listener_0_sender_id); + event_sender + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_0)) + .expect("Triggering Event 0 failed"); + event_sender + .send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_1)) + .expect("Triggering Event 1 failed"); + + // 3 Events messages will be sent now + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_0, 2, TEST_COMPONENT_ID_0.id()); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_1, 1, TEST_COMPONENT_ID_1.id()); + // Both the single event and the group event should arrive now + check_next_event(event_0, &event_0_rx_0); + check_next_event(event_1, &event_0_rx_0); + + // Do double insertion and then remove duplicates + event_man.subscribe_group(event_1.group_id(), event_listener_0_sender_id); + event_man.remove_duplicates(&ListenerKey::Group(event_1.group_id())); + event_sender + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_1)) + .expect("Triggering Event 1 failed"); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_1, 1, TEST_COMPONENT_ID_0.id()); + } + + #[test] + fn test_all_events_listener() { + let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| { + panic!("routing error occurred for event {:?}: {:?}", event_msg, e); + }; + let (event_sender, event_receiver) = mpsc::channel(); + let mut event_man = EventManagerWithMpsc::new(event_receiver); + let event_0 = EventDynParam::new_no_params(TestEventIds::TestInfo.id()); + let event_1 = EventDynParam::new_no_params(TestEventIds::TestError.id()); + let (event_0_tx_0, all_events_rx) = mpsc::channel(); + let all_events_listener = EventU32SenderMpsc::new(0, event_0_tx_0); + event_man.subscribe_all(all_events_listener.target_id()); + event_man.add_sender(all_events_listener); + event_sender + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_0)) + .expect("Triggering event 0 failed"); + event_sender + .send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_1)) + .expect("Triggering event 1 failed"); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_0, 1, TEST_COMPONENT_ID_0.id()); + let res = event_man.try_event_handling(error_handler); + check_handled_event(res, event_1, 1, TEST_COMPONENT_ID_1.id()); + check_next_event(event_0, &all_events_rx); + check_next_event(event_1, &all_events_rx); + } + + #[test] + fn test_bounded_event_sender_queue_full() { + let (event_sender, _event_receiver) = mpsc::sync_channel(3); + let event_sender = EventU32SenderMpscBounded::new(1, event_sender, 3); + event_sender + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)) + .expect("sending test event failed"); + event_sender + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)) + .expect("sending test event failed"); + event_sender + .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)) + .expect("sending test event failed"); + let error = event_sender.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)); + if let Err(e) = error { + assert!(matches!(e, GenericSendError::QueueFull(Some(3)))); + } else { + panic!("unexpected error {error:?}"); + } + } + #[test] + fn test_bounded_event_sender_rx_dropped() { + let (event_sender, event_receiver) = mpsc::sync_channel(3); + let event_sender = EventU32SenderMpscBounded::new(1, event_sender, 3); + drop(event_receiver); + if let Err(e) = event_sender.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)) { + assert!(matches!(e, GenericSendError::RxDisconnected)); + } else { + panic!("Expected error"); + } + } +} diff --git a/satrs/src/events.rs b/satrs/src/events.rs index 1194087..f3fc88d 100644 --- a/satrs/src/events.rs +++ b/satrs/src/events.rs @@ -85,7 +85,7 @@ impl HasSeverity for SeverityHigh { const SEVERITY: Severity = Severity::High; } -pub trait GenericEvent: EcssEnumeration + Copy + Clone { +pub trait GenericEvent: Copy + Clone { type Raw; type GroupId; type UniqueId; diff --git a/satrs/src/events2.rs b/satrs/src/events2.rs new file mode 100644 index 0000000..53f7d12 --- /dev/null +++ b/satrs/src/events2.rs @@ -0,0 +1,222 @@ +//! Event support module +//! +//! This module includes the basic event structs [EventU32] and [EventU16] and versions with the +//! ECSS severity levels as a type parameter. These structs are simple abstractions on top of the +//! [u32] and [u16] types where the raw value is the unique identifier for a particular event. +//! The abstraction also allows to group related events using a group ID, and the severity +//! of an event is encoded inside the raw value itself with four possible [Severity] levels: +//! +//! - INFO +//! - LOW +//! - MEDIUM +//! - HIGH +//! +//! All event structs implement the [EcssEnumeration] trait and can be created as constants. +//! This allows to easily create a static list of constant events which can then be used to generate +//! event telemetry using the PUS event manager modules. +//! +//! # Examples +//! +//! ``` +//! use satrs::events::{EventU16, EventU32, EventU32TypedSev, Severity, SeverityHigh, SeverityInfo}; +//! +//! const MSG_RECVD: EventU32TypedSev = EventU32TypedSev::new(1, 0); +//! const MSG_FAILED: EventU32 = EventU32::new(Severity::Low, 1, 1); +//! +//! const TEMPERATURE_HIGH: EventU32TypedSev = EventU32TypedSev::new(2, 0); +//! +//! let small_event = EventU16::new(Severity::Info, 3, 0); +//! ``` +use core::fmt::Debug; +use core::hash::Hash; +use core::marker::PhantomData; +use core::option::Iter; + +use arbitrary_int::{prelude::*, u14}; + +use crate::ComponentId; +use crate::queue::GenericSendError; + +/// Using a type definition allows to change this to u64 in the future more easily +pub type LargestEventRaw = u32; +/// Using a type definition allows to change this to u32 in the future more easily +pub type LargestGroupIdRaw = u16; + +pub const MAX_GROUP_ID_U32_EVENT: u16 = u14::MAX.value(); + +#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum Severity { + Info = 0, + Low = 1, + Medium = 2, + High = 3, +} + +pub trait HasSeverity: Debug + PartialEq + Eq + Copy + Clone { + const SEVERITY: Severity; +} + +/// Type level support struct +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct SeverityInfo {} +impl HasSeverity for SeverityInfo { + const SEVERITY: Severity = Severity::Info; +} + +/// Type level support struct +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct SeverityLow {} +impl HasSeverity for SeverityLow { + const SEVERITY: Severity = Severity::Low; +} + +/// Type level support struct +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct SeverityMedium {} +impl HasSeverity for SeverityMedium { + const SEVERITY: Severity = Severity::Medium; +} + +/// Type level support struct +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct SeverityHigh {} +impl HasSeverity for SeverityHigh { + const SEVERITY: Severity = Severity::High; +} + +impl TryFrom for Severity { + type Error = (); + + fn try_from(value: u8) -> Result { + match value { + x if x == Severity::Info as u8 => Ok(Severity::Info), + x if x == Severity::Low as u8 => Ok(Severity::Low), + x if x == Severity::Medium as u8 => Ok(Severity::Medium), + x if x == Severity::High as u8 => Ok(Severity::High), + _ => Err(()), + } + } +} + +pub trait Event: Clone { + fn id(&self) -> EventId; + fn parameters(&self) -> Option<&[u8]> { + None + } +} + +pub type GroupId = u14; + +#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub struct EventId { + group_id: GroupId, + unique_id: u16, + severity: Severity, +} + +impl EventId { + pub fn new(severity: Severity, group_id: u14, unique_id: u16) -> Self { + Self { + severity, + group_id, + unique_id, + } + } + + #[inline] + pub fn unique_id(&self) -> u16 { + self.unique_id + } + + #[inline] + pub fn severity(&self) -> Severity { + self.severity + } + + #[inline] + pub fn group_id(&self) -> u14 { + self.group_id + } + + pub fn raw(&self) -> u32 { + ((self.severity as u32) << 30) + | ((self.group_id.as_u16() as u32) << 16) + | (self.unique_id as u32) + } +} + +impl From for EventId { + fn from(raw: u32) -> Self { + // Severity conversion from u8 should never fail + let severity = Severity::try_from(((raw >> 30) & 0b11) as u8).unwrap(); + let group_id = u14::new(((raw >> 16) & 0x3FFF) as u16); + let unique_id = (raw & 0xFFFF) as u16; + // Sanitized input, should never fail + Self::new(severity, group_id, unique_id) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg(feature = "alloc")] +pub struct EventDynParam { + id: EventId, + parameters: Option>, +} + +#[cfg(feature = "alloc")] +impl EventDynParam { + pub fn new(id: EventId, parameters: Option>) -> Self { + Self { id, parameters } + } + + pub fn new_no_params(id: EventId) -> Self { + Self { + id, + parameters: None, + } + } +} + +#[cfg(feature = "alloc")] +impl Event for EventDynParam { + fn id(&self) -> EventId { + self.id + } + + fn parameters(&self) -> Option<&[u8]> { + self.parameters.as_deref() + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct EventHeapless { + id: EventId, + parameters: Option>, +} + +impl Event for EventHeapless { + fn id(&self) -> EventId { + self.id + } + + fn parameters(&self) -> Option<&[u8]> { + self.parameters.as_deref() + } +} + +impl EventHeapless { + pub fn new(id: EventId, parameters: Option>) -> Self { + Self { id, parameters } + } + + pub fn new_no_params(id: EventId) -> Self { + Self { + id, + parameters: None, + } + } +} diff --git a/satrs/src/lib.rs b/satrs/src/lib.rs index 5b81fe2..c3e8f99 100644 --- a/satrs/src/lib.rs +++ b/satrs/src/lib.rs @@ -27,7 +27,10 @@ pub mod action; pub mod dev_mgmt; pub mod encoding; pub mod event_man; +pub mod event_man_2; +// pub mod event_man_new; pub mod events; +pub mod events2; #[cfg(feature = "std")] pub mod executable; pub mod hal;