From a690c7720d949495f1b0aeb408b3c7cd5e833446 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 23 Feb 2024 14:19:30 +0100 Subject: [PATCH] Refactored event manager --- satrs-example/Cargo.toml | 4 +- satrs-example/src/events.rs | 34 +- satrs/CHANGELOG.md | 8 + satrs/src/event_man.rs | 655 +++++++++++-------- satrs/src/hal/std/tcp_cobs_server.rs | 2 +- satrs/src/hal/std/tcp_spacepackets_server.rs | 2 +- satrs/src/lib.rs | 2 - satrs/src/params.rs | 95 +-- satrs/src/pus/action.rs | 2 +- satrs/src/pus/event_man.rs | 106 +-- satrs/src/pus/hk.rs | 6 +- satrs/src/pus/mod.rs | 4 +- satrs/tests/pus_events.rs | 20 +- 13 files changed, 525 insertions(+), 415 deletions(-) diff --git a/satrs-example/Cargo.toml b/satrs-example/Cargo.toml index ba70684..27ffee9 100644 --- a/satrs-example/Cargo.toml +++ b/satrs-example/Cargo.toml @@ -20,8 +20,8 @@ thiserror = "1" derive-new = "0.5" [dependencies.satrs] -version = "0.2.0-rc.0" -# path = "../satrs" +# version = "0.2.0-rc.0" +path = "../satrs" [dependencies.satrs-mib] version = "0.1.1" diff --git a/satrs-example/src/events.rs b/satrs-example/src/events.rs index 46ba6bb..dbe7c8d 100644 --- a/satrs-example/src/events.rs +++ b/satrs-example/src/events.rs @@ -1,16 +1,15 @@ -use std::sync::mpsc::{self, SendError}; +use std::sync::mpsc::{self}; use satrs::{ event_man::{ - EventManager, EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, - SendEventProvider, + EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded, + MpscEventReceiver, }, events::EventU32, params::Params, pus::{ event_man::{ - DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, - PusEventDispatcher, + DefaultPusEventU32Dispatcher, EventReporter, EventRequest, EventRequestWithToken, }, verification::{ TcStateStarted, VerificationReporterWithSender, VerificationReportingProvider, @@ -24,11 +23,9 @@ use satrs_example::config::PUS_APID; use crate::update_time; -pub type MpscEventManager = EventManager)>>; - pub struct PusEventHandler { event_request_rx: mpsc::Receiver, - pus_event_dispatcher: PusEventDispatcher<(), EventU32>, + pus_event_dispatcher: DefaultPusEventU32Dispatcher<()>, pus_event_man_rx: mpsc::Receiver<(EventU32, Option)>, tm_sender: Box, time_provider: TimeProvider, @@ -41,21 +38,22 @@ pub struct PusEventHandler { impl PusEventHandler { pub fn new( verif_handler: VerificationReporterWithSender, - event_manager: &mut MpscEventManager, + event_manager: &mut EventManagerWithBoundedMpsc, event_request_rx: mpsc::Receiver, tm_sender: impl EcssTmSender, ) -> Self { - let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel(); + let event_queue_cap = 30; + let (pus_event_man_tx, pus_event_man_rx) = mpsc::sync_channel(event_queue_cap); // All events sent to the manager are routed to the PUS event manager, which generates PUS event // telemetry for each event. let event_reporter = EventReporter::new(PUS_APID, 128).unwrap(); - let pus_tm_backend = DefaultPusMgmtBackendProvider::::default(); let pus_event_dispatcher = - PusEventDispatcher::new(event_reporter, Box::new(pus_tm_backend)); - let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx); + DefaultPusEventU32Dispatcher::new_with_default_backend(event_reporter); + let pus_event_man_send_provider = + EventU32SenderMpscBounded::new(1, pus_event_man_tx, event_queue_cap); - event_manager.subscribe_all(pus_event_man_send_provider.id()); + event_manager.subscribe_all(pus_event_man_send_provider.channel_id()); event_manager.add_sender(pus_event_man_send_provider); Self { @@ -117,7 +115,7 @@ impl PusEventHandler { } pub struct EventManagerWrapper { - event_manager: MpscEventManager, + event_manager: EventManagerWithBoundedMpsc, event_sender: mpsc::Sender<(EventU32, Option)>, } @@ -128,7 +126,7 @@ impl EventManagerWrapper { let (event_sender, event_man_rx) = mpsc::channel(); let event_recv = MpscEventReceiver::::new(event_man_rx); Self { - event_manager: EventManagerWithMpscQueue::new(Box::new(event_recv)), + event_manager: EventManagerWithBoundedMpsc::new(event_recv), event_sender, } } @@ -137,7 +135,7 @@ impl EventManagerWrapper { self.event_sender.clone() } - pub fn event_manager(&mut self) -> &mut MpscEventManager { + pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc { &mut self.event_manager } @@ -178,7 +176,7 @@ impl EventHandler { } #[allow(dead_code)] - pub fn event_manager(&mut self) -> &mut MpscEventManager { + pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc { self.event_man_wrapper.event_manager() } diff --git a/satrs/CHANGELOG.md b/satrs/CHANGELOG.md index 0566562..861c79b 100644 --- a/satrs/CHANGELOG.md +++ b/satrs/CHANGELOG.md @@ -8,6 +8,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/). # [unreleased] +## Changed + +- Refactored `EventManager` to heavily use generics instead of trait objects. + - `SendEventProvider` -> `EventSendProvider`. `id` trait method renamed to `channel_id`. + - `ListenerTable` -> `ListenerMapProvider` + - `SenderTable` -> `SenderMapProvider` + - There is an `EventManagerWithMpsc` and a `EventManagerWithBoundedMpsc` helper type now. + # [v0.2.0-rc.0] 2024-02-21 ## Added diff --git a/satrs/src/event_man.rs b/satrs/src/event_man.rs index 8cb549f..b0470dd 100644 --- a/satrs/src/event_man.rs +++ b/satrs/src/event_man.rs @@ -10,27 +10,27 @@ //! [sat-rs book chapter](https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/book/events.html) //! about events first: //! -//! The event manager has a listener table abstracted by the [ListenerTable], which maps +//! The event manager has a listener table abstracted by the [ListenerMapProvider], which maps //! listener groups identified by [ListenerKey]s to a [sender ID][ChannelId]. -//! It also contains a sender table abstracted by the [SenderTable] which maps these sender IDs -//! to a concrete [SendEventProvider]s. A simple approach would be to use one send event provider +//! It also contains a sender table abstracted by the [SenderMapProvider] which maps these sender +//! IDs to concrete [EventSendProvider]s. A simple approach would be to use one send event provider //! for each OBSW thread and then subscribe for all interesting events for a particular thread //! using the send event provider ID. //! //! This can be done with the [EventManager] like this: //! -//! 1. Provide a concrete [EventReceiver] implementation. This abstraction allow to use different +//! 1. Provide a concrete [EventReceiveProvider] implementation. This abstraction allow to use different //! message queue backends. A straightforward implementation where dynamic memory allocation is //! not a big concern could use [std::sync::mpsc::channel] to do this and is provided in //! form of the [MpscEventReceiver]. //! 2. To set up event creators, create channel pairs using some message queue implementation. //! Each event creator gets a (cloned) sender component which allows it to send events to the //! manager. -//! 3. The event manager receives the receiver component as part of a [EventReceiver] +//! 3. The event manager receives the receiver component as part of a [EventReceiveProvider] //! implementation so all events are routed to the manager. -//! 4. Create the [send event providers][SendEventProvider]s which allow routing events to -//! subscribers. You can now use their [sender IDs][SendEventProvider::id] to subscribe for -//! event groups, for example by using the [EventManager::subscribe_single] method. +//! 4. Create the [send event providers][EventSendProvider]s which allow routing events to +//! subscribers. You can now use their [sender IDs][EventSendProvider::channel_id] to subscribe +//! for event groups, for example by using the [EventManager::subscribe_single] method. //! 5. Add the send provider as well using the [EventManager::add_sender] call so the event //! manager can route listener groups to a the send provider. //! @@ -41,24 +41,22 @@ //! //! # Examples //! -//! You can check [integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/pus_events.rs) +//! You can check [integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/pus_events.rs) //! for a concrete example using multi-threading where events are routed to //! different threads. use crate::events::{EventU16, EventU32, GenericEvent, LargestEventRaw, LargestGroupIdRaw}; use crate::params::{Params, ParamsHeapless}; -#[cfg(feature = "alloc")] -use alloc::boxed::Box; -#[cfg(feature = "alloc")] -use alloc::vec; -#[cfg(feature = "alloc")] -use alloc::vec::Vec; +use crate::queue::GenericSendError; +use core::marker::PhantomData; use core::slice::Iter; -#[cfg(feature = "alloc")] -use hashbrown::HashMap; use crate::ChannelId; + +#[cfg(feature = "alloc")] +pub use alloc_mod::*; + #[cfg(feature = "std")] -pub use stdmod::*; +pub use std_mod::*; #[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)] pub enum ListenerKey { @@ -75,108 +73,114 @@ pub type EventWithAuxData = (Event, Option); pub type EventU32WithAuxData = EventWithAuxData; pub type EventU16WithAuxData = EventWithAuxData; -pub trait SendEventProvider { - type Error; +pub trait EventSendProvider { + fn channel_id(&self) -> ChannelId; - fn id(&self) -> ChannelId; - fn send_no_data(&self, event: Provider) -> Result<(), Self::Error> { + fn send_no_data(&self, event: EV) -> Result<(), GenericSendError> { self.send(event, None) } - fn send(&self, event: Provider, aux_data: Option) -> Result<(), Self::Error>; + + fn send(&self, event: EV, aux_data: Option) -> Result<(), GenericSendError>; } /// Generic abstraction for an event receiver. -pub trait EventReceiver { +pub trait EventReceiveProvider { /// This function has to be provided by any event receiver. A receive call may or may not return /// an event. /// /// To allow returning arbitrary additional auxiliary data, a mutable slice is passed to the /// [Self::receive] call as well. Receivers can write data to this slice, but care must be taken /// to avoid panics due to size missmatches or out of bound writes. - fn receive(&self) -> Option<(Event, Option)>; + fn try_recv_event(&self) -> Option<(Event, Option)>; } -pub trait ListenerTable { - fn get_listeners(&self) -> Vec; +pub trait ListenerMapProvider { + #[cfg(feature = "alloc")] + #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] + fn get_listeners(&self) -> alloc::vec::Vec; fn contains_listener(&self, key: &ListenerKey) -> bool; fn get_listener_ids(&self, key: &ListenerKey) -> Option>; fn add_listener(&mut self, key: ListenerKey, sender_id: ChannelId) -> bool; fn remove_duplicates(&mut self, key: &ListenerKey); } -pub trait SenderTable { +pub trait SenderMapProvider< + SP: EventSendProvider, + EV: GenericEvent = EventU32, + AUX = Params, +> +{ fn contains_send_event_provider(&self, id: &ChannelId) -> bool; - fn get_send_event_provider( - &self, - id: &ChannelId, - ) -> Option<&dyn SendEventProvider>; - fn add_send_event_provider( - &mut self, - send_provider: Box< - dyn SendEventProvider, - >, - ) -> bool; + + fn get_send_event_provider(&self, id: &ChannelId) -> Option<&SP>; + fn add_send_event_provider(&mut self, send_provider: SP) -> bool; } /// Generic event manager implementation. /// /// # Generics /// -/// * `SendProviderError`: [SendEventProvider] error type -/// * `Event`: Concrete event provider, currently either [EventU32] or [EventU16] -/// * `AuxDataProvider`: Concrete auxiliary data provider, currently either [Params] or -/// [ParamsHeapless] -pub struct EventManager -{ - listener_table: Box, - sender_table: Box>, - event_receiver: Box>, +/// * `ERP`: [EventReceiveProvider] used to receive all events. +/// * `SMP`: [SenderMapProvider] which maps channel IDs to send providers. +/// * `LTR`: [ListenerMapProvider] which maps listener keys to channel IDs. +/// * `SP`: [EventSendProvider] contained within the sender map which sends the events. +/// * `EV`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32] +/// and [EventU16] are supported. +/// * `AUX`: Auxiliary data which is sent with the event to provide optional context information +pub struct EventManager< + ERP: EventReceiveProvider, + SMP: SenderMapProvider, + LTR: ListenerMapProvider, + SP: EventSendProvider, + EV: GenericEvent = EventU32, + AUX = Params, +> { + event_receiver: ERP, + sender_map: SMP, + listener_map: LTR, + phantom: core::marker::PhantomData<(SP, EV, AUX)>, } -/// Safety: It is safe to implement [Send] because all fields in the [EventManager] are [Send] -/// as well -#[cfg(feature = "std")] -unsafe impl Send - for EventManager -{ -} - -#[cfg(feature = "std")] -pub type EventManagerWithMpscQueue = EventManager< - std::sync::mpsc::SendError<(Event, Option)>, - Event, - AuxDataProvider, ->; - #[derive(Debug)] -pub enum EventRoutingResult { +pub enum EventRoutingResult { /// No event was received Empty, - /// An event was received and routed. - /// The first tuple entry will contain the number of recipients. - Handled(u32, Event, Option), + /// An event was received and routed to listeners. + Handled { + num_recipients: u32, + event: EV, + aux_data: Option, + }, } #[derive(Debug)] -pub enum EventRoutingError { - SendError(E), +pub enum EventRoutingError { + Send(GenericSendError), NoSendersForKey(ListenerKey), NoSenderForId(ChannelId), } #[derive(Debug)] -pub struct EventRoutingErrorsWithResult { - pub result: EventRoutingResult, - pub errors: [Option>; 3], +pub struct EventRoutingErrorsWithResult { + pub result: EventRoutingResult, + pub errors: [Option; 3], } -impl EventManager { +impl< + ER: EventReceiveProvider, + S: SenderMapProvider, + L: ListenerMapProvider, + SP: EventSendProvider, + EV: GenericEvent + Copy, + AUX: Clone, + > EventManager +{ pub fn remove_duplicates(&mut self, key: &ListenerKey) { - self.listener_table.remove_duplicates(key) + self.listener_map.remove_duplicates(key) } /// Subscribe for a unique event. - pub fn subscribe_single(&mut self, event: &Event, sender_id: ChannelId) { + pub fn subscribe_single(&mut self, event: &EV, sender_id: ChannelId) { self.update_listeners(ListenerKey::Single(event.raw_as_largest_type()), sender_id); } @@ -194,49 +198,37 @@ impl EventManager { } } -impl - EventManager +impl< + ERP: EventReceiveProvider, + SMP: SenderMapProvider, + LTR: ListenerMapProvider, + SP: EventSendProvider, + EV: GenericEvent + Copy, + AUX: Clone, + > EventManager { - /// Create an event manager where the sender table will be the [DefaultSenderTableProvider] - /// and the listener table will be the [DefaultListenerTableProvider]. - pub fn new(event_receiver: Box>) -> Self { - let listener_table: Box = Box::default(); - let sender_table: Box> = - Box::default(); - Self::new_custom_tables(listener_table, sender_table, event_receiver) - } -} - -impl - EventManager -{ - pub fn new_custom_tables( - listener_table: Box, - sender_table: Box>, - event_receiver: Box>, - ) -> Self { + pub fn new_with_custom_maps(event_receiver: ERP, sender_map: SMP, listener_map: LTR) -> Self { EventManager { - listener_table, - sender_table, + listener_map, + sender_map, event_receiver, + phantom: PhantomData, } } - pub fn add_sender( - &mut self, - send_provider: impl SendEventProvider + 'static, - ) { + /// Add a new sender component which can be used to send events to subscribers. + pub fn add_sender(&mut self, send_provider: SP) { if !self - .sender_table - .contains_send_event_provider(&send_provider.id()) + .sender_map + .contains_send_event_provider(&send_provider.channel_id()) { - self.sender_table - .add_send_event_provider(Box::new(send_provider)); + self.sender_map.add_send_event_provider(send_provider); } } + /// Generic function to update the event subscribers. fn update_listeners(&mut self, key: ListenerKey, sender_id: ChannelId) { - self.listener_table.add_listener(key, sender_id); + self.listener_map.add_listener(key, sender_id); } /// This function will use the cached event receiver and try to receive one event. @@ -248,40 +240,36 @@ impl /// [EventRoutingErrorsWithResult] error struct. pub fn try_event_handling( &self, - ) -> Result< - EventRoutingResult, - EventRoutingErrorsWithResult, - > { + ) -> Result, EventRoutingErrorsWithResult> { let mut err_idx = 0; let mut err_slice = [None, None, None]; let mut num_recipients = 0; - let mut add_error = |error: EventRoutingError| { + let mut add_error = |error: EventRoutingError| { if err_idx < 3 { err_slice[err_idx] = Some(error); err_idx += 1; } }; - let mut send_handler = - |key: &ListenerKey, event: Event, aux_data: &Option| { - if self.listener_table.contains_listener(key) { - if let Some(ids) = self.listener_table.get_listener_ids(key) { - for id in ids { - if let Some(sender) = self.sender_table.get_send_event_provider(id) { - if let Err(e) = sender.send(event, aux_data.clone()) { - add_error(EventRoutingError::SendError(e)); - } else { - num_recipients += 1; - } + let mut send_handler = |key: &ListenerKey, event: EV, aux_data: &Option| { + if self.listener_map.contains_listener(key) { + if let Some(ids) = self.listener_map.get_listener_ids(key) { + for id in ids { + if let Some(sender) = self.sender_map.get_send_event_provider(id) { + if let Err(e) = sender.send(event, aux_data.clone()) { + add_error(EventRoutingError::Send(e)); } else { - add_error(EventRoutingError::NoSenderForId(*id)); + num_recipients += 1; } + } else { + add_error(EventRoutingError::NoSenderForId(*id)); } - } else { - add_error(EventRoutingError::NoSendersForKey(*key)); } + } else { + add_error(EventRoutingError::NoSendersForKey(*key)); } - }; - if let Some((event, aux_data)) = self.event_receiver.receive() { + } + }; + if let Some((event, aux_data)) = self.event_receiver.try_recv_event() { let single_key = ListenerKey::Single(event.raw_as_largest_type()); send_handler(&single_key, event, &aux_data); let group_key = ListenerKey::Group(event.group_id_as_largest_type()); @@ -289,130 +277,180 @@ impl send_handler(&ListenerKey::All, event, &aux_data); if err_idx > 0 { return Err(EventRoutingErrorsWithResult { - result: EventRoutingResult::Handled(num_recipients, event, aux_data), + result: EventRoutingResult::Handled { + num_recipients, + event, + aux_data, + }, errors: err_slice, }); } - return Ok(EventRoutingResult::Handled(num_recipients, event, aux_data)); + return Ok(EventRoutingResult::Handled { + num_recipients, + event, + aux_data, + }); } Ok(EventRoutingResult::Empty) } } -#[derive(Default)] -pub struct DefaultListenerTableProvider { - listeners: HashMap>, -} +#[cfg(feature = "alloc")] +pub mod alloc_mod { + use alloc::vec::Vec; + use hashbrown::HashMap; -pub struct DefaultSenderTableProvider< - SendProviderError, - Event: GenericEvent = EventU32, - AuxDataProvider = Params, -> { - senders: HashMap< - ChannelId, - Box>, - >, -} + use super::*; -impl Default - for DefaultSenderTableProvider -{ - fn default() -> Self { - Self { - senders: HashMap::new(), + /// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap] + /// and the [DefaultListenerMap]. It uses regular mpsc channels as the message queue backend. + pub type EventManagerWithMpsc = EventManager< + MpscEventReceiver, + DefaultSenderMap, EV, AUX>, + DefaultListenerMap, + EventSenderMpsc, + >; + + /// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap] + /// and the [DefaultListenerMap]. It uses + /// [bounded mpsc senders](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) as the + /// message queue backend. + pub type EventManagerWithBoundedMpsc = EventManager< + MpscEventReceiver, + DefaultSenderMap, EV, AUX>, + DefaultListenerMap, + EventSenderMpscBounded, + >; + + impl< + ER: EventReceiveProvider, + SP: EventSendProvider, + EV: GenericEvent + Copy, + AUX: 'static, + > EventManager, DefaultListenerMap, SP, EV, AUX> + { + /// Create an event manager where the sender table will be the [DefaultSenderMap] + /// and the listener table will be the [DefaultListenerMap]. + pub fn new(event_receiver: ER) -> Self { + Self { + listener_map: DefaultListenerMap::default(), + sender_map: DefaultSenderMap::default(), + event_receiver, + phantom: PhantomData, + } } } -} -impl ListenerTable for DefaultListenerTableProvider { - fn get_listeners(&self) -> Vec { - let mut key_list = Vec::new(); - for key in self.listeners.keys() { - key_list.push(*key); + /// Default listener map. + /// + /// Simple implementation which uses a [HashMap] and a [Vec] internally. + #[derive(Default)] + pub struct DefaultListenerMap { + listeners: HashMap>, + } + + impl ListenerMapProvider for DefaultListenerMap { + fn get_listeners(&self) -> Vec { + let mut key_list = Vec::new(); + for key in self.listeners.keys() { + key_list.push(*key); + } + key_list } - key_list - } - fn contains_listener(&self, key: &ListenerKey) -> bool { - self.listeners.contains_key(key) - } - - fn get_listener_ids(&self, key: &ListenerKey) -> Option> { - self.listeners.get(key).map(|vec| vec.iter()) - } - - fn add_listener(&mut self, key: ListenerKey, sender_id: ChannelId) -> bool { - if let Some(existing_list) = self.listeners.get_mut(&key) { - existing_list.push(sender_id); - } else { - let new_list = vec![sender_id]; - self.listeners.insert(key, new_list); + fn contains_listener(&self, key: &ListenerKey) -> bool { + self.listeners.contains_key(key) } - true - } - fn remove_duplicates(&mut self, key: &ListenerKey) { - if let Some(list) = self.listeners.get_mut(key) { - list.sort_unstable(); - list.dedup(); + fn get_listener_ids(&self, key: &ListenerKey) -> Option> { + self.listeners.get(key).map(|vec| vec.iter()) + } + + fn add_listener(&mut self, key: ListenerKey, sender_id: ChannelId) -> bool { + if let Some(existing_list) = self.listeners.get_mut(&key) { + existing_list.push(sender_id); + } else { + let new_list = alloc::vec![sender_id]; + self.listeners.insert(key, new_list); + } + true + } + + fn remove_duplicates(&mut self, key: &ListenerKey) { + if let Some(list) = self.listeners.get_mut(key) { + list.sort_unstable(); + list.dedup(); + } } } -} -impl - SenderTable - for DefaultSenderTableProvider -{ - fn contains_send_event_provider(&self, id: &ChannelId) -> bool { - self.senders.contains_key(id) + /// Default sender map. + /// + /// Simple implementation which uses a [HashMap] internally. + pub struct DefaultSenderMap< + SP: EventSendProvider, + EV: GenericEvent = EventU32, + AUX = Params, + > { + senders: HashMap, + phantom: PhantomData<(EV, AUX)>, } - fn get_send_event_provider( - &self, - id: &ChannelId, - ) -> Option<&dyn SendEventProvider> { - self.senders - .get(id) - .filter(|sender| sender.id() == *id) - .map(|v| v.as_ref()) - } - - fn add_send_event_provider( - &mut self, - send_provider: Box< - dyn SendEventProvider, - >, - ) -> bool { - let id = send_provider.id(); - if self.senders.contains_key(&id) { - return false; + impl, EV: GenericEvent, AUX> Default + for DefaultSenderMap + { + fn default() -> Self { + Self { + senders: Default::default(), + phantom: Default::default(), + } + } + } + + impl, EV: GenericEvent, AUX> SenderMapProvider + for DefaultSenderMap + { + fn contains_send_event_provider(&self, id: &ChannelId) -> bool { + self.senders.contains_key(id) + } + + fn get_send_event_provider(&self, id: &ChannelId) -> Option<&SP> { + self.senders + .get(id) + .filter(|sender| sender.channel_id() == *id) + } + + fn add_send_event_provider(&mut self, send_provider: SP) -> bool { + let id = send_provider.channel_id(); + if self.senders.contains_key(&id) { + return false; + } + self.senders.insert(id, send_provider).is_none() } - self.senders.insert(id, send_provider).is_none() } } #[cfg(feature = "std")] -pub mod stdmod { +pub mod std_mod { use super::*; - use crate::event_man::{EventReceiver, EventWithAuxData}; + use crate::event_man::{EventReceiveProvider, EventWithAuxData}; use crate::events::{EventU16, EventU32, GenericEvent}; use crate::params::Params; - use std::sync::mpsc::{Receiver, SendError, Sender}; + use std::sync::mpsc::{self}; pub struct MpscEventReceiver { - mpsc_receiver: Receiver<(Event, Option)>, + mpsc_receiver: mpsc::Receiver<(Event, Option)>, } impl MpscEventReceiver { - pub fn new(receiver: Receiver<(Event, Option)>) -> Self { + pub fn new(receiver: mpsc::Receiver<(Event, Option)>) -> Self { Self { mpsc_receiver: receiver, } } } - impl EventReceiver for MpscEventReceiver { - fn receive(&self) -> Option> { + impl EventReceiveProvider for MpscEventReceiver { + fn try_recv_event(&self) -> Option> { if let Ok(event_and_data) = self.mpsc_receiver.try_recv() { return Some(event_and_data); } @@ -423,31 +461,75 @@ pub mod stdmod { pub type MpscEventU32Receiver = MpscEventReceiver; pub type MpscEventU16Receiver = MpscEventReceiver; + /// Generic event sender which uses a regular [mpsc::Sender] as the messaging backend to + /// send events. #[derive(Clone)] - pub struct MpscEventSendProvider { + pub struct EventSenderMpsc { id: u32, - sender: Sender<(Event, Option)>, + sender: mpsc::Sender<(Event, Option)>, } - impl MpscEventSendProvider { - pub fn new(id: u32, sender: Sender<(Event, Option)>) -> Self { + impl EventSenderMpsc { + pub fn new(id: u32, sender: mpsc::Sender<(Event, Option)>) -> Self { Self { id, sender } } } - impl SendEventProvider for MpscEventSendProvider { - type Error = SendError<(Event, Option)>; - - fn id(&self) -> u32 { + impl EventSendProvider for EventSenderMpsc { + fn channel_id(&self) -> u32 { self.id } - fn send(&self, event: Event, aux_data: Option) -> Result<(), Self::Error> { - self.sender.send((event, aux_data)) + fn send(&self, event: Event, aux_data: Option) -> Result<(), GenericSendError> { + self.sender + .send((event, aux_data)) + .map_err(|_| GenericSendError::RxDisconnected) } } - pub type MpscEventU32SendProvider = MpscEventSendProvider; - pub type MpscEventU16SendProvider = MpscEventSendProvider; + /// Generic event sender which uses the [mpsc::SyncSender] as the messaging backend to send + /// events. This has the advantage that the channel is bounded and thus more deterministic. + #[derive(Clone)] + pub struct EventSenderMpscBounded { + channel_id: u32, + sender: mpsc::SyncSender<(Event, Option)>, + capacity: usize, + } + + impl EventSenderMpscBounded { + pub fn new( + channel_id: u32, + sender: mpsc::SyncSender<(Event, Option)>, + capacity: usize, + ) -> Self { + Self { + channel_id, + sender, + capacity, + } + } + } + + impl EventSendProvider for EventSenderMpscBounded { + fn channel_id(&self) -> u32 { + self.channel_id + } + fn send(&self, event: Event, aux_data: Option) -> Result<(), GenericSendError> { + if let Err(e) = self.sender.try_send((event, aux_data)) { + return match e { + mpsc::TrySendError::Full(_) => { + Err(GenericSendError::QueueFull(Some(self.capacity as u32))) + } + mpsc::TrySendError::Disconnected(_) => Err(GenericSendError::RxDisconnected), + }; + } + Ok(()) + } + } + + pub type EventU32SenderMpsc = EventSenderMpsc; + pub type EventU16SenderMpsc = EventSenderMpsc; + pub type EventU32SenderMpscBounded = EventSenderMpscBounded; + pub type EventU16SenderMpscBounded = EventSenderMpscBounded; } #[cfg(test)] @@ -456,32 +538,10 @@ mod tests { use crate::event_man::EventManager; use crate::events::{EventU32, GenericEvent, Severity}; use crate::params::ParamsRaw; - use alloc::boxed::Box; use std::format; - use std::sync::mpsc::{channel, Receiver, SendError, Sender}; + use std::sync::mpsc::{self, channel, Receiver, Sender}; - #[derive(Clone)] - struct MpscEventSenderQueue { - id: u32, - mpsc_sender: Sender, - } - - impl MpscEventSenderQueue { - fn new(id: u32, mpsc_sender: Sender) -> Self { - Self { id, mpsc_sender } - } - } - - impl SendEventProvider for MpscEventSenderQueue { - type Error = SendError; - - fn id(&self) -> u32 { - self.id - } - fn send(&self, event: EventU32, aux_data: Option) -> Result<(), Self::Error> { - self.mpsc_sender.send((event, aux_data)) - } - } + const TEST_EVENT: EventU32 = EventU32::const_new(Severity::INFO, 0, 5); fn check_next_event( expected: EventU32, @@ -500,22 +560,21 @@ mod tests { expected_num_sent: u32, ) { assert!(matches!(res, EventRoutingResult::Handled { .. })); - if let EventRoutingResult::Handled(num_recipients, event, _aux_data) = res { + if let EventRoutingResult::Handled { + num_recipients, + event, + .. + } = res + { assert_eq!(event, expected); assert_eq!(num_recipients, expected_num_sent); } } - fn generic_event_man() -> ( - Sender, - EventManager>, - ) { + fn generic_event_man() -> (Sender, EventManagerWithMpsc) { let (event_sender, manager_queue) = channel(); let event_man_receiver = MpscEventReceiver::new(manager_queue); - ( - event_sender, - EventManager::new(Box::new(event_man_receiver)), - ) + (event_sender, EventManager::new(event_man_receiver)) } #[test] @@ -524,15 +583,12 @@ mod tests { let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap(); let event_grp_1_0 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); let (single_event_sender, single_event_receiver) = channel(); - let single_event_listener = MpscEventSenderQueue::new(0, single_event_sender); - event_man.subscribe_single(&event_grp_0, single_event_listener.id()); + let single_event_listener = EventSenderMpsc::new(0, single_event_sender); + event_man.subscribe_single(&event_grp_0, single_event_listener.channel_id()); event_man.add_sender(single_event_listener); let (group_event_sender_0, group_event_receiver_0) = channel(); - let group_event_listener = MpscEventSenderQueue { - id: 1, - mpsc_sender: group_event_sender_0, - }; - event_man.subscribe_group(event_grp_1_0.group_id(), group_event_listener.id()); + let group_event_listener = EventU32SenderMpsc::new(1, group_event_sender_0); + event_man.subscribe_group(event_grp_1_0.group_id(), group_event_listener.channel_id()); event_man.add_sender(group_event_listener); // Test event with one listener @@ -559,8 +615,8 @@ mod tests { let (event_sender, mut event_man) = generic_event_man(); let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap(); let (single_event_sender, single_event_receiver) = channel(); - let single_event_listener = MpscEventSenderQueue::new(0, single_event_sender); - event_man.subscribe_single(&event_grp_0, single_event_listener.id()); + let single_event_listener = EventSenderMpsc::new(0, single_event_sender); + event_man.subscribe_single(&event_grp_0, single_event_listener.channel_id()); event_man.add_sender(single_event_listener); event_sender .send((event_grp_0, Some(Params::Heapless((2_u32, 3_u32).into())))) @@ -591,12 +647,15 @@ mod tests { let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap(); let event_grp_1_0 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); let (event_grp_0_sender, event_grp_0_receiver) = channel(); - let event_grp_0_and_1_listener = MpscEventSenderQueue { - id: 0, - mpsc_sender: event_grp_0_sender, - }; - event_man.subscribe_group(event_grp_0.group_id(), event_grp_0_and_1_listener.id()); - event_man.subscribe_group(event_grp_1_0.group_id(), event_grp_0_and_1_listener.id()); + let event_grp_0_and_1_listener = EventU32SenderMpsc::new(0, event_grp_0_sender); + event_man.subscribe_group( + event_grp_0.group_id(), + event_grp_0_and_1_listener.channel_id(), + ); + event_man.subscribe_group( + event_grp_1_0.group_id(), + event_grp_0_and_1_listener.channel_id(), + ); event_man.add_sender(event_grp_0_and_1_listener); event_sender @@ -625,18 +684,12 @@ mod tests { let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); let (event_0_tx_0, event_0_rx_0) = channel(); let (event_0_tx_1, event_0_rx_1) = channel(); - let event_listener_0 = MpscEventSenderQueue { - id: 0, - mpsc_sender: event_0_tx_0, - }; - let event_listener_1 = MpscEventSenderQueue { - id: 1, - mpsc_sender: event_0_tx_1, - }; - let event_listener_0_sender_id = event_listener_0.id(); + let event_listener_0 = EventU32SenderMpsc::new(0, event_0_tx_0); + let event_listener_1 = EventU32SenderMpsc::new(1, event_0_tx_1); + let event_listener_0_sender_id = event_listener_0.channel_id(); event_man.subscribe_single(&event_0, event_listener_0_sender_id); event_man.add_sender(event_listener_0); - let event_listener_1_sender_id = event_listener_1.id(); + let event_listener_1_sender_id = event_listener_1.channel_id(); event_man.subscribe_single(&event_0, event_listener_1_sender_id); event_man.add_sender(event_listener_1); event_sender @@ -681,16 +734,12 @@ mod tests { fn test_all_events_listener() { let (event_sender, manager_queue) = channel(); let event_man_receiver = MpscEventReceiver::new(manager_queue); - let mut event_man: EventManager> = - EventManager::new(Box::new(event_man_receiver)); + let mut event_man = EventManagerWithMpsc::new(event_man_receiver); let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap(); let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); let (event_0_tx_0, all_events_rx) = channel(); - let all_events_listener = MpscEventSenderQueue { - id: 0, - mpsc_sender: event_0_tx_0, - }; - event_man.subscribe_all(all_events_listener.id()); + let all_events_listener = EventU32SenderMpsc::new(0, event_0_tx_0); + event_man.subscribe_all(all_events_listener.channel_id()); event_man.add_sender(all_events_listener); event_sender .send((event_0, None)) @@ -707,4 +756,36 @@ mod tests { check_next_event(event_0, &all_events_rx); check_next_event(event_1, &all_events_rx); } + + #[test] + fn test_bounded_event_sender_queue_full() { + let (event_sender, _event_receiver) = mpsc::sync_channel(3); + let event_sender = EventU32SenderMpscBounded::new(1, event_sender, 3); + event_sender + .send_no_data(TEST_EVENT) + .expect("sending test event failed"); + event_sender + .send_no_data(TEST_EVENT) + .expect("sending test event failed"); + event_sender + .send_no_data(TEST_EVENT) + .expect("sending test event failed"); + let error = event_sender.send_no_data(TEST_EVENT); + if let Err(e) = error { + assert!(matches!(e, GenericSendError::QueueFull(Some(3)))); + } else { + panic!("unexpected error {error:?}"); + } + } + #[test] + fn test_bounded_event_sender_rx_dropped() { + let (event_sender, event_receiver) = mpsc::sync_channel(3); + let event_sender = EventU32SenderMpscBounded::new(1, event_sender, 3); + drop(event_receiver); + if let Err(e) = event_sender.send_no_data(TEST_EVENT) { + assert!(matches!(e, GenericSendError::RxDisconnected)); + } else { + panic!("Expected error"); + } + } } diff --git a/satrs/src/hal/std/tcp_cobs_server.rs b/satrs/src/hal/std/tcp_cobs_server.rs index 2d14589..7e7036f 100644 --- a/satrs/src/hal/std/tcp_cobs_server.rs +++ b/satrs/src/hal/std/tcp_cobs_server.rs @@ -107,7 +107,7 @@ impl TcpTmSender for CobsTmSender { /// /// ## Example /// -/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs) +/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/tcp_servers.rs) /// test also serves as the example application for this module. pub struct TcpTmtcInCobsServer< TmError, diff --git a/satrs/src/hal/std/tcp_spacepackets_server.rs b/satrs/src/hal/std/tcp_spacepackets_server.rs index 6ade0d1..257f0c1 100644 --- a/satrs/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs/src/hal/std/tcp_spacepackets_server.rs @@ -88,7 +88,7 @@ impl TcpTmSender for SpacepacketsTmSender { /// [spacepackets::PacketId]s as part of the server configuration for that purpose. /// /// ## Example -/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs) +/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/tcp_servers.rs) /// also serves as the example application for this module. pub struct TcpSpacepacketsServer< TmError, diff --git a/satrs/src/lib.rs b/satrs/src/lib.rs index c31df8d..5040d58 100644 --- a/satrs/src/lib.rs +++ b/satrs/src/lib.rs @@ -26,8 +26,6 @@ extern crate std; #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] pub mod cfdp; pub mod encoding; -#[cfg(feature = "alloc")] -#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] pub mod event_man; pub mod events; #[cfg(feature = "std")] diff --git a/satrs/src/params.rs b/satrs/src/params.rs index 5cad28b..1279015 100644 --- a/satrs/src/params.rs +++ b/satrs/src/params.rs @@ -43,22 +43,19 @@ //! This includes the [ParamsHeapless] enumeration for contained values which do not require heap //! allocation, and the [Params] which enumerates [ParamsHeapless] and some additional types which //! require [alloc] support but allow for more flexbility. -#[cfg(feature = "alloc")] use crate::pool::StoreAddr; -#[cfg(feature = "alloc")] -use alloc::string::{String, ToString}; -#[cfg(feature = "alloc")] -use alloc::vec::Vec; use core::fmt::Debug; use core::mem::size_of; use paste::paste; use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU64, EcssEnumU8}; +pub use spacepackets::util::ToBeBytes; use spacepackets::util::UnsignedEnum; use spacepackets::ByteConversionError; #[cfg(feature = "alloc")] -pub use alloc_mod::*; -pub use spacepackets::util::ToBeBytes; +use alloc::string::{String, ToString}; +#[cfg(feature = "alloc")] +use alloc::vec::Vec; /// Generic trait which is used for objects which can be converted into a raw network (big) endian /// byte format. @@ -560,56 +557,64 @@ from_conversions_for_raw!( (f64, Self::F64), ); -#[cfg(feature = "alloc")] -mod alloc_mod { - use super::*; - /// Generic enumeration for additional parameters, including parameters which rely on heap - /// allocations. +/// Generic enumeration for additional parameters, including parameters which rely on heap +/// allocations. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub enum Params { + Heapless(ParamsHeapless), + Store(StoreAddr), + #[cfg(feature = "alloc")] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] - #[derive(Debug, Clone)] - pub enum Params { - Heapless(ParamsHeapless), - Store(StoreAddr), - Vec(Vec), - String(String), - } + Vec(Vec), + #[cfg(feature = "alloc")] + #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] + String(String), +} - impl From for Params { - fn from(x: StoreAddr) -> Self { - Self::Store(x) - } +impl From for Params { + fn from(x: StoreAddr) -> Self { + Self::Store(x) } +} - impl From for Params { - fn from(x: ParamsHeapless) -> Self { - Self::Heapless(x) - } +impl From for Params { + fn from(x: ParamsHeapless) -> Self { + Self::Heapless(x) } +} - impl From> for Params { - fn from(val: Vec) -> Self { - Self::Vec(val) - } +#[cfg(feature = "alloc")] +#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] +impl From> for Params { + fn from(val: Vec) -> Self { + Self::Vec(val) } +} - /// Converts a byte slice into the [Params::Vec] variant - impl From<&[u8]> for Params { - fn from(val: &[u8]) -> Self { - Self::Vec(val.to_vec()) - } +/// Converts a byte slice into the [Params::Vec] variant +#[cfg(feature = "alloc")] +#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] +impl From<&[u8]> for Params { + fn from(val: &[u8]) -> Self { + Self::Vec(val.to_vec()) } +} - impl From for Params { - fn from(val: String) -> Self { - Self::String(val) - } +#[cfg(feature = "alloc")] +#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] +impl From for Params { + fn from(val: String) -> Self { + Self::String(val) } +} - /// Converts a string slice into the [Params::String] variant - impl From<&str> for Params { - fn from(val: &str) -> Self { - Self::String(val.to_string()) - } +#[cfg(feature = "alloc")] +#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] +/// Converts a string slice into the [Params::String] variant +impl From<&str> for Params { + fn from(val: &str) -> Self { + Self::String(val.to_string()) } } diff --git a/satrs/src/pus/action.rs b/satrs/src/pus/action.rs index cd6de15..6fc7629 100644 --- a/satrs/src/pus/action.rs +++ b/satrs/src/pus/action.rs @@ -44,7 +44,7 @@ pub mod alloc_mod { /// - Checking the validity of the APID, service ID, subservice ID. /// - Checking the validity of the user data. /// - /// A [VerificationReporterWithSender] instance is passed to the user to also allow handling + /// A [VerificationReportingProvider] instance is passed to the user to also allow handling /// of the verification process as part of the PUS standard requirements. pub trait PusActionToRequestConverter { type Error; diff --git a/satrs/src/pus/event_man.rs b/satrs/src/pus/event_man.rs index d51135a..720d787 100644 --- a/satrs/src/pus/event_man.rs +++ b/satrs/src/pus/event_man.rs @@ -2,8 +2,6 @@ use crate::events::{EventU32, GenericEvent, Severity}; #[cfg(feature = "alloc")] use crate::events::{EventU32TypedSev, HasSeverity}; #[cfg(feature = "alloc")] -use alloc::boxed::Box; -#[cfg(feature = "alloc")] use core::hash::Hash; #[cfg(feature = "alloc")] use hashbrown::HashSet; @@ -32,12 +30,12 @@ pub use heapless_mod::*; /// structure to track disabled events. A more primitive and embedded friendly /// solution could track this information in a static or pre-allocated list which contains /// the disabled events. -pub trait PusEventMgmtBackendProvider { +pub trait PusEventMgmtBackendProvider { type Error; - fn event_enabled(&self, event: &Provider) -> bool; - fn enable_event_reporting(&mut self, event: &Provider) -> Result; - fn disable_event_reporting(&mut self, event: &Provider) -> Result; + fn event_enabled(&self, event: &Event) -> bool; + fn enable_event_reporting(&mut self, event: &Event) -> Result; + fn disable_event_reporting(&mut self, event: &Event) -> Result; } #[cfg(feature = "heapless")] @@ -108,6 +106,10 @@ impl From for EventManError { #[cfg(feature = "alloc")] pub mod alloc_mod { + use core::marker::PhantomData; + + use crate::events::EventU16; + use super::*; /// Default backend provider which uses a hash set as the event reporting status container @@ -115,14 +117,11 @@ pub mod alloc_mod { /// /// This provider is a good option for host systems or larger embedded systems where /// the expected occasional memory allocation performed by the [HashSet] is not an issue. - pub struct DefaultPusMgmtBackendProvider { + pub struct DefaultPusEventMgmtBackend { disabled: HashSet, } - /// Safety: All contained field are [Send] as well - unsafe impl Send for DefaultPusMgmtBackendProvider {} - - impl Default for DefaultPusMgmtBackendProvider { + impl Default for DefaultPusEventMgmtBackend { fn default() -> Self { Self { disabled: HashSet::default(), @@ -130,46 +129,50 @@ pub mod alloc_mod { } } - impl - PusEventMgmtBackendProvider for DefaultPusMgmtBackendProvider + impl PusEventMgmtBackendProvider + for DefaultPusEventMgmtBackend { type Error = (); - fn event_enabled(&self, event: &Provider) -> bool { + + fn event_enabled(&self, event: &EV) -> bool { !self.disabled.contains(event) } - fn enable_event_reporting(&mut self, event: &Provider) -> Result { + fn enable_event_reporting(&mut self, event: &EV) -> Result { Ok(self.disabled.remove(event)) } - fn disable_event_reporting(&mut self, event: &Provider) -> Result { + fn disable_event_reporting(&mut self, event: &EV) -> Result { Ok(self.disabled.insert(*event)) } } - pub struct PusEventDispatcher { + pub struct PusEventDispatcher< + B: PusEventMgmtBackendProvider, + EV: GenericEvent, + E, + > { reporter: EventReporter, - backend: Box>, + backend: B, + phantom: PhantomData<(E, EV)>, } - /// Safety: All contained fields are send as well. - unsafe impl Send for PusEventDispatcher {} - - impl PusEventDispatcher { - pub fn new( - reporter: EventReporter, - backend: Box>, - ) -> Self { - Self { reporter, backend } + impl, EV: GenericEvent, E> + PusEventDispatcher + { + pub fn new(reporter: EventReporter, backend: B) -> Self { + Self { + reporter, + backend, + phantom: PhantomData, + } } - } - impl PusEventDispatcher { - pub fn enable_tm_for_event(&mut self, event: &Event) -> Result { + pub fn enable_tm_for_event(&mut self, event: &EV) -> Result { self.backend.enable_event_reporting(event) } - pub fn disable_tm_for_event(&mut self, event: &Event) -> Result { + pub fn disable_tm_for_event(&mut self, event: &EV) -> Result { self.backend.disable_event_reporting(event) } @@ -177,7 +180,7 @@ pub mod alloc_mod { &mut self, sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: &[u8], - event: Event, + event: EV, aux_data: Option<&[u8]>, ) -> Result { if !self.backend.event_enabled(&event) { @@ -208,18 +211,30 @@ pub mod alloc_mod { } } - impl PusEventDispatcher { + impl + PusEventDispatcher, EV, ()> + { + pub fn new_with_default_backend(reporter: EventReporter) -> Self { + Self { + reporter, + backend: DefaultPusEventMgmtBackend::default(), + phantom: PhantomData, + } + } + } + + impl, E> PusEventDispatcher { pub fn enable_tm_for_event_with_sev( &mut self, event: &EventU32TypedSev, - ) -> Result { + ) -> Result { self.backend.enable_event_reporting(event.as_ref()) } pub fn disable_tm_for_event_with_sev( &mut self, event: &EventU32TypedSev, - ) -> Result { + ) -> Result { self.backend.disable_event_reporting(event.as_ref()) } @@ -233,6 +248,11 @@ pub mod alloc_mod { self.generate_pus_event_tm_generic(sender, time_stamp, event.into(), aux_data) } } + + pub type DefaultPusEventU16Dispatcher = + PusEventDispatcher, EventU16, E>; + pub type DefaultPusEventU32Dispatcher = + PusEventDispatcher, EventU32, E>; } #[cfg(test)] mod tests { @@ -246,15 +266,19 @@ mod tests { const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5); const EMPTY_STAMP: [u8; 7] = [0; 7]; - fn create_basic_man() -> PusEventDispatcher<(), EventU32> { + fn create_basic_man_1() -> DefaultPusEventU32Dispatcher<()> { let reporter = EventReporter::new(0x02, 128).expect("Creating event repoter failed"); - let backend = DefaultPusMgmtBackendProvider::::default(); - PusEventDispatcher::new(reporter, Box::new(backend)) + PusEventDispatcher::new_with_default_backend(reporter) + } + fn create_basic_man_2() -> DefaultPusEventU32Dispatcher<()> { + let reporter = EventReporter::new(0x02, 128).expect("Creating event repoter failed"); + let backend = DefaultPusEventMgmtBackend::default(); + PusEventDispatcher::new(reporter, backend) } #[test] fn test_basic() { - let mut event_man = create_basic_man(); + let mut event_man = create_basic_man_1(); let (event_tx, event_rx) = channel(); let mut sender = MpscTmAsVecSender::new(0, "test_sender", event_tx); let event_sent = event_man @@ -268,7 +292,7 @@ mod tests { #[test] fn test_disable_event() { - let mut event_man = create_basic_man(); + let mut event_man = create_basic_man_2(); let (event_tx, event_rx) = channel(); let mut sender = MpscTmAsVecSender::new(0, "test", event_tx); let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT); @@ -291,7 +315,7 @@ mod tests { #[test] fn test_reenable_event() { - let mut event_man = create_basic_man(); + let mut event_man = create_basic_man_1(); let (event_tx, event_rx) = channel(); let mut sender = MpscTmAsVecSender::new(0, "test", event_tx); let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT); diff --git a/satrs/src/pus/hk.rs b/satrs/src/pus/hk.rs index a2a5354..b371f03 100644 --- a/satrs/src/pus/hk.rs +++ b/satrs/src/pus/hk.rs @@ -46,7 +46,7 @@ pub mod alloc_mod { /// - Checking the validity of the APID, service ID, subservice ID. /// - Checking the validity of the user data. /// - /// A [VerificationReporterWithSender] instance is passed to the user to also allow handling + /// A [VerificationReportingProvider] is passed to the user to also allow handling /// of the verification process as part of the PUS standard requirements. pub trait PusHkToRequestConverter { type Error; @@ -78,9 +78,9 @@ pub mod std_mod { /// 1. Retrieve the next TC packet from the [PusServiceHelper]. The [EcssTcInMemConverter] /// allows to configure the used telecommand memory backend. /// 2. Convert the TC to a targeted action request using the provided - /// [PusActionToRequestConverter]. The generic error type is constrained to the + /// [PusHkToRequestConverter]. The generic error type is constrained to the /// [PusPacketHandlerResult] for the concrete implementation which offers a packet handler. - /// 3. Route the action request using the provided [PusActionRequestRouter]. The generic error + /// 3. Route the action request using the provided [PusHkRequestRouter]. The generic error /// type is constrained to the [GenericRoutingError] for the concrete implementation. /// 4. Handle all routing errors using the provided [PusRoutingErrorHandler]. The generic error /// type is constrained to the [GenericRoutingError] for the concrete implementation. diff --git a/satrs/src/pus/mod.rs b/satrs/src/pus/mod.rs index 91be068..24fc17c 100644 --- a/satrs/src/pus/mod.rs +++ b/satrs/src/pus/mod.rs @@ -815,8 +815,6 @@ pub mod std_mod { pub tc_receiver: Box, pub tm_sender: Box, pub tm_apid: u16, - /// The verification handler is wrapped in a [RefCell] to allow the interior mutability - /// pattern. This makes writing methods which are not mutable a lot easier. pub verification_handler: VerificationReporter, } @@ -886,7 +884,7 @@ pub mod std_mod { /// This function can be used to poll the internal [EcssTcReceiver] object for the next /// telecommand packet. It will return `Ok(None)` if there are not packets available. /// In any other case, it will perform the acceptance of the ECSS TC packet using the - /// internal [VerificationReporterWithSender] object. It will then return the telecommand + /// internal [VerificationReportingProvider] object. It will then return the telecommand /// and the according accepted token. pub fn retrieve_and_accept_next_packet( &mut self, diff --git a/satrs/tests/pus_events.rs b/satrs/tests/pus_events.rs index dbd778c..4c6cc9a 100644 --- a/satrs/tests/pus_events.rs +++ b/satrs/tests/pus_events.rs @@ -1,10 +1,10 @@ use satrs::event_man::{ - EventManagerWithMpscQueue, MpscEventU32Receiver, MpscEventU32SendProvider, SendEventProvider, + EventManagerWithMpsc, EventSendProvider, EventU32SenderMpsc, MpscEventU32Receiver, }; use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo}; use satrs::params::U32Pair; use satrs::params::{Params, ParamsHeapless, WritableToBeBytes}; -use satrs::pus::event_man::{DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher}; +use satrs::pus::event_man::{DefaultPusEventMgmtBackend, EventReporter, PusEventDispatcher}; use satrs::pus::MpscTmAsVecSender; use spacepackets::ecss::tm::PusTmReader; use spacepackets::ecss::{PusError, PusPacket}; @@ -26,16 +26,16 @@ pub enum CustomTmSenderError { fn test_threaded_usage() { let (event_sender, event_man_receiver) = channel(); let event_receiver = MpscEventU32Receiver::new(event_man_receiver); - let mut event_man = EventManagerWithMpscQueue::new(Box::new(event_receiver)); + let mut event_man = EventManagerWithMpsc::new(event_receiver); let (pus_event_man_tx, pus_event_man_rx) = channel(); - let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx); - event_man.subscribe_all(pus_event_man_send_provider.id()); + let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx); + event_man.subscribe_all(pus_event_man_send_provider.channel_id()); event_man.add_sender(pus_event_man_send_provider); let (event_tx, event_rx) = channel(); let reporter = EventReporter::new(0x02, 128).expect("Creating event reporter failed"); - let backend = DefaultPusMgmtBackendProvider::::default(); - let mut pus_event_man = PusEventDispatcher::new(reporter, Box::new(backend)); + let mut pus_event_man = + PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default()); // PUS + Generic event manager thread let jh0 = thread::spawn(move || { let mut sender = MpscTmAsVecSender::new(0, "event_sender", event_tx); @@ -71,6 +71,7 @@ fn test_threaded_usage() { Params::Vec(vec) => gen_event(Some(vec.as_slice())), Params::String(str) => gen_event(Some(str.as_bytes())), Params::Store(_) => gen_event(None), + _ => panic!("unsupported parameter type"), } } else { gen_event(None) @@ -120,10 +121,7 @@ fn test_threaded_usage() { } } event_sender - .send(( - LOW_SEV_EVENT.into(), - Some(Params::Heapless((2_u32, 3_u32).into())), - )) + .send((LOW_SEV_EVENT, Some(Params::Heapless((2_u32, 3_u32).into())))) .expect("Sending low severity event failed"); loop { match event_rx.try_recv() { -- 2.34.1