From 10849229e6237b5746f9c1ad2eb437100ac6a762 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 28 Oct 2022 02:02:28 +0200 Subject: [PATCH] more docs for event manager --- fsrc-core/src/event_man.rs | 170 +++++++++++++++++++++++-------------- 1 file changed, 107 insertions(+), 63 deletions(-) diff --git a/fsrc-core/src/event_man.rs b/fsrc-core/src/event_man.rs index fec1b6c..cce5e12 100644 --- a/fsrc-core/src/event_man.rs +++ b/fsrc-core/src/event_man.rs @@ -1,5 +1,25 @@ //! Event management and forwarding -use crate::events::{EventU16TypedSev, EventU32, GenericEvent, HasSeverity}; +//! +//! This module provides components to perform event routing. The most important component for this +//! task is the [EventManager]. It has a map of event listeners and uses a dynamic [EventReceiver] +//! instance to receive all events and then route them to event subscribers where appropriate. +//! +//! One common use case for satellite systems is to offer a light-weight publish-subscribe mechanism +//! and IPC mechanism for software and hardware events which are also packaged as telemetry. +//! This can be done with the [EventManager] like this: +//! +//! 1. Provide a concrete [SendEventProvider] implementation and a concrete [EventReceiver] +//! implementation. These abstraction allow to use different message queue backends. +//! A straightforward implementation where dynamic memory allocation is not a big concern could +//! use [std::sync::mpsc::channel] to do this. It is recommended that these implementations +//! derive [Clone]. +//! 2. Each event creator gets a sender component which allows it to send events to the manager. +//! 3. The event manager receives all receiver ends so all events are routed to the +//! manager. +//! 4. Each event receiver and/or subscriber gets a receiver component. The sender component is +//! used with the [SendEventProvider] trait and the subscription API provided by the +//! [EventManager] to subscribe for individual events or whole group of events. +use crate::events::{GenericEvent, LargestEventRaw, LargestGroupIdRaw}; use alloc::boxed::Box; use alloc::vec; use alloc::vec::Vec; @@ -7,32 +27,42 @@ use hashbrown::HashMap; #[derive(PartialEq, Eq, Hash, Copy, Clone)] enum ListenerType { - Single(u32), - Group(u16), + Single(LargestEventRaw), + Group(LargestGroupIdRaw), + All, } -pub trait EventListener { +pub trait SendEventProvider { type Error; fn id(&self) -> u32; - fn send_to_no_data(&mut self, event: Provider) -> Result<(), Self::Error> { - self.send_to(event, None) + fn send_no_data(&mut self, event: Provider) -> Result<(), Self::Error> { + self.send(event, None) } - fn send_to(&mut self, event: Provider, aux_data: Option<&[u8]>) -> Result<(), Self::Error>; + fn send(&mut self, event: Provider, aux_data: Option<&[u8]>) -> Result<(), Self::Error>; } -struct Listener { +struct Listener { ltype: ListenerType, - dest: Box>, + send_provider: Box>, } -pub trait ReceivesAllEvent { - fn receive(&mut self) -> Option<(Provider, Option<&[u8]>)>; +/// Generic abstraction for an event receiver. +pub trait EventReceiver { + /// This function has to be provided by any event receiver. A receive call may or may not return + /// an event. + /// + /// To allow returning arbitrary additional auxiliary data, a mutable slice is passed to the + /// [Self::receive] call as well. Receivers can write data to this slice, but care must be taken + /// to avoid panics due to size missmatches or out of bound writes. + fn receive(&mut self, aux_data: &mut [u8]) -> Option; } -pub struct EventManager { - listeners: HashMap>>, - event_receiver: Box>, +/// Generic event manager implementation. +pub struct EventManager { + aux_data_buf: Vec, + listeners: HashMap>>, + event_receiver: Box>, } pub enum HandlerResult { @@ -40,48 +70,50 @@ pub enum HandlerResult { Handled(u32, Provider), } -impl EventManager { - pub fn new(event_receiver: Box>) -> Self { +impl EventManager { + pub fn new(event_receiver: Box>, buf_len_aux_data: usize) -> Self { EventManager { + aux_data_buf: vec![0; buf_len_aux_data], listeners: HashMap::new(), event_receiver, } } -} - -impl EventManager { pub fn subscribe_single( &mut self, - event: EventU32, - dest: impl EventListener + 'static, + event: Event, + dest: impl SendEventProvider + 'static, ) { self.update_listeners(ListenerType::Single(event.raw_as_largest_type()), dest); } pub fn subscribe_group( &mut self, - group_id: ::GroupId, - dest: impl EventListener + 'static, + group_id: LargestGroupIdRaw, + dest: impl SendEventProvider + 'static, ) { self.update_listeners(ListenerType::Group(group_id), dest); } -} -impl EventManager> { - pub fn subscribe_single( - &mut self, - event: EventU16TypedSev, - dest: impl EventListener, Error = E> + 'static, - ) { - self.update_listeners(ListenerType::Single(event.raw_as_largest_type()), dest); + pub fn subscribe_all(&mut self, dest: impl SendEventProvider + 'static) { + self.update_listeners(ListenerType::All, dest); } - pub fn subscribe_group( + /// Helper function which removes single subscriptions for which a group subscription already + /// exists. + pub fn remove_single_subscriptions_for_group( &mut self, - group_id: as GenericEvent>::GroupId, - dest: impl EventListener, Error = E> + 'static, + group_id: LargestGroupIdRaw, + dest: impl SendEventProvider + 'static ) { - self.update_listeners(ListenerType::Group(group_id.into()), dest); + if self.listeners.contains_key(&ListenerType::Group(group_id)) { + for (ltype, listeners) in &mut self.listeners { + if let ListenerType::Single(_) = ltype { + listeners.retain(|f| { + f.send_provider.id() != dest.id() + }); + } + } + } } } @@ -89,27 +121,27 @@ impl EventManager { fn update_listeners( &mut self, key: ListenerType, - dest: impl EventListener + 'static, + dest: impl SendEventProvider + 'static, ) { if !self.listeners.contains_key(&key) { self.listeners.insert( key, vec![Listener { ltype: key, - dest: Box::new(dest), + send_provider: Box::new(dest), }], ); } else { let vec = self.listeners.get_mut(&key).unwrap(); // To prevent double insertions for entry in vec.iter() { - if entry.ltype == key && entry.dest.id() == dest.id() { + if entry.ltype == key && entry.send_provider.id() == dest.id() { return; } } vec.push(Listener { ltype: key, - dest: Box::new(dest), + send_provider: Box::new(dest), }); } } @@ -120,25 +152,35 @@ impl EventManager { let mut send_handler = |event: Provider, aux_data: Option<&[u8]>, llist: &mut Vec>| { for listener in llist.iter_mut() { - if let Err(e) = listener.dest.send_to(event, aux_data) { + if let Err(e) = listener.send_provider.send(event, aux_data) { err_status = Some(Err(e)); } else { num_recipients += 1; } } }; - if let Some((event, aux_data)) = self.event_receiver.receive() { + if let Some(event) = self + .event_receiver + .receive(self.aux_data_buf.as_mut_slice()) + { let single_key = ListenerType::Single(event.raw_as_largest_type()); if self.listeners.contains_key(&single_key) { send_handler( event, - aux_data, + Some(self.aux_data_buf.as_slice()), self.listeners.get_mut(&single_key).unwrap(), ); } let group_key = ListenerType::Group(event.group_id_as_largest_type()); if self.listeners.contains_key(&group_key) { - send_handler(event, aux_data, self.listeners.get_mut(&group_key).unwrap()); + send_handler( + event, + Some(self.aux_data_buf.as_slice()), + self.listeners.get_mut(&group_key).unwrap(), + ); + } + if let Some(all_receivers) = self.listeners.get_mut(&ListenerType::All) { + send_handler(event, Some(self.aux_data_buf.as_slice()), all_receivers); } if let Some(err) = err_status { return err; @@ -151,24 +193,30 @@ impl EventManager { #[cfg(test)] mod tests { - use super::{EventListener, HandlerResult, ReceivesAllEvent}; + use super::{EventReceiver, HandlerResult, SendEventProvider}; use crate::event_man::EventManager; use crate::events::{EventU32, GenericEvent, Severity}; use alloc::boxed::Box; use std::sync::mpsc::{channel, Receiver, SendError, Sender}; - use std::thread; use std::time::Duration; + use std::{thread, vec}; + use vec::Vec; type EventAndParams<'a> = (EventU32, Option<&'a [u8]>); - struct EventReceiver<'a> { - mpsc_receiver: Receiver>, + struct MpscEventReceiver { + mpsc_receiver: Receiver<(EventU32, Option>)>, } - impl ReceivesAllEvent for EventReceiver<'_> { - fn receive(&mut self) -> Option<(EventU32, Option<&[u8]>)> { - if let Some((event, _params)) = self.mpsc_receiver.try_recv().ok() { - return Some((event, None)); + impl EventReceiver for MpscEventReceiver { + fn receive(&mut self, aux_data: &mut [u8]) -> Option { + if let Some((event, params)) = self.mpsc_receiver.try_recv().ok() { + if let Some(params) = params { + if params.len() < aux_data.len() { + aux_data[0..params.len()].copy_from_slice(params.as_slice()) + } + } + return Some(event); } None } @@ -180,17 +228,13 @@ mod tests { mpsc_sender: Sender>, } - impl<'a> EventListener for MpscEventSenderQueue<'a> { + impl<'a> SendEventProvider for MpscEventSenderQueue<'a> { type Error = SendError>; fn id(&self) -> u32 { self.id } - fn send_to( - &mut self, - event: EventU32, - _aux_data: Option<&[u8]>, - ) -> Result<(), Self::Error> { + fn send(&mut self, event: EventU32, _aux_data: Option<&[u8]>) -> Result<(), Self::Error> { self.mpsc_sender.send((event, None)) } } @@ -220,11 +264,11 @@ mod tests { #[test] fn test_basic() { let (event_sender, manager_queue) = channel(); - let event_man_receiver = EventReceiver { + let event_man_receiver = MpscEventReceiver { mpsc_receiver: manager_queue, }; let mut event_man: EventManager, EventU32> = - EventManager::new(Box::new(event_man_receiver)); + EventManager::new(Box::new(event_man_receiver), 128); let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap(); let event_grp_1_0 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); let (single_event_sender, single_event_receiver) = channel(); @@ -263,11 +307,11 @@ mod tests { #[test] fn test_multi_group() { let (event_sender, manager_queue) = channel(); - let event_man_receiver = EventReceiver { + let event_man_receiver = MpscEventReceiver { mpsc_receiver: manager_queue, }; let mut event_man: EventManager, EventU32> = - EventManager::new(Box::new(event_man_receiver)); + EventManager::new(Box::new(event_man_receiver), 128); let res = event_man.try_event_handling(); assert!(res.is_ok()); let hres = res.unwrap(); @@ -305,11 +349,11 @@ mod tests { #[test] fn test_listening_to_same_event_and_multi_type() { let (event_sender, manager_queue) = channel(); - let event_man_receiver = EventReceiver { + let event_man_receiver = MpscEventReceiver { mpsc_receiver: manager_queue, }; let mut event_man: EventManager, EventU32> = - EventManager::new(Box::new(event_man_receiver)); + EventManager::new(Box::new(event_man_receiver), 128); let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap(); let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); let (event_0_tx_0, event_0_rx_0) = channel();