refactored event manager
Some checks failed
Rust/sat-rs/pipeline/head There was a failure building this commit
Some checks failed
Rust/sat-rs/pipeline/head There was a failure building this commit
This commit is contained in:
parent
b48b5b8caa
commit
bb7caed1d5
@ -20,8 +20,8 @@ thiserror = "1"
|
|||||||
derive-new = "0.5"
|
derive-new = "0.5"
|
||||||
|
|
||||||
[dependencies.satrs]
|
[dependencies.satrs]
|
||||||
version = "0.2.0-rc.0"
|
# version = "0.2.0-rc.0"
|
||||||
# path = "../satrs"
|
path = "../satrs"
|
||||||
|
|
||||||
[dependencies.satrs-mib]
|
[dependencies.satrs-mib]
|
||||||
version = "0.1.1"
|
version = "0.1.1"
|
||||||
|
@ -1,16 +1,15 @@
|
|||||||
use std::sync::mpsc::{self, SendError};
|
use std::sync::mpsc::{self};
|
||||||
|
|
||||||
use satrs::{
|
use satrs::{
|
||||||
event_man::{
|
event_man::{
|
||||||
EventManager, EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider,
|
EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded,
|
||||||
SendEventProvider,
|
MpscEventReceiver,
|
||||||
},
|
},
|
||||||
events::EventU32,
|
events::EventU32,
|
||||||
params::Params,
|
params::Params,
|
||||||
pus::{
|
pus::{
|
||||||
event_man::{
|
event_man::{
|
||||||
DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken,
|
DefaultPusEventU32Dispatcher, EventReporter, EventRequest, EventRequestWithToken,
|
||||||
PusEventDispatcher,
|
|
||||||
},
|
},
|
||||||
verification::{
|
verification::{
|
||||||
TcStateStarted, VerificationReporterWithSender, VerificationReportingProvider,
|
TcStateStarted, VerificationReporterWithSender, VerificationReportingProvider,
|
||||||
@ -24,11 +23,9 @@ use satrs_example::config::PUS_APID;
|
|||||||
|
|
||||||
use crate::update_time;
|
use crate::update_time;
|
||||||
|
|
||||||
pub type MpscEventManager = EventManager<SendError<(EventU32, Option<Params>)>>;
|
|
||||||
|
|
||||||
pub struct PusEventHandler {
|
pub struct PusEventHandler {
|
||||||
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
|
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
|
||||||
pus_event_dispatcher: PusEventDispatcher<(), EventU32>,
|
pus_event_dispatcher: DefaultPusEventU32Dispatcher<()>,
|
||||||
pus_event_man_rx: mpsc::Receiver<(EventU32, Option<Params>)>,
|
pus_event_man_rx: mpsc::Receiver<(EventU32, Option<Params>)>,
|
||||||
tm_sender: Box<dyn EcssTmSender>,
|
tm_sender: Box<dyn EcssTmSender>,
|
||||||
time_provider: TimeProvider,
|
time_provider: TimeProvider,
|
||||||
@ -41,19 +38,18 @@ pub struct PusEventHandler {
|
|||||||
impl PusEventHandler {
|
impl PusEventHandler {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
verif_handler: VerificationReporterWithSender,
|
verif_handler: VerificationReporterWithSender,
|
||||||
event_manager: &mut MpscEventManager,
|
event_manager: &mut EventManagerWithBoundedMpsc,
|
||||||
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
|
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
|
||||||
tm_sender: impl EcssTmSender,
|
tm_sender: impl EcssTmSender,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel();
|
let (pus_event_man_tx, pus_event_man_rx) = mpsc::sync_channel(30);
|
||||||
|
|
||||||
// All events sent to the manager are routed to the PUS event manager, which generates PUS event
|
// All events sent to the manager are routed to the PUS event manager, which generates PUS event
|
||||||
// telemetry for each event.
|
// telemetry for each event.
|
||||||
let event_reporter = EventReporter::new(PUS_APID, 128).unwrap();
|
let event_reporter = EventReporter::new(PUS_APID, 128).unwrap();
|
||||||
let pus_tm_backend = DefaultPusMgmtBackendProvider::<EventU32>::default();
|
|
||||||
let pus_event_dispatcher =
|
let pus_event_dispatcher =
|
||||||
PusEventDispatcher::new(event_reporter, Box::new(pus_tm_backend));
|
DefaultPusEventU32Dispatcher::new_with_default_backend(event_reporter);
|
||||||
let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx);
|
let pus_event_man_send_provider = EventU32SenderMpscBounded::new(1, pus_event_man_tx);
|
||||||
|
|
||||||
event_manager.subscribe_all(pus_event_man_send_provider.id());
|
event_manager.subscribe_all(pus_event_man_send_provider.id());
|
||||||
event_manager.add_sender(pus_event_man_send_provider);
|
event_manager.add_sender(pus_event_man_send_provider);
|
||||||
@ -117,7 +113,7 @@ impl PusEventHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct EventManagerWrapper {
|
pub struct EventManagerWrapper {
|
||||||
event_manager: MpscEventManager,
|
event_manager: EventManagerWithBoundedMpsc,
|
||||||
event_sender: mpsc::Sender<(EventU32, Option<Params>)>,
|
event_sender: mpsc::Sender<(EventU32, Option<Params>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -128,7 +124,7 @@ impl EventManagerWrapper {
|
|||||||
let (event_sender, event_man_rx) = mpsc::channel();
|
let (event_sender, event_man_rx) = mpsc::channel();
|
||||||
let event_recv = MpscEventReceiver::<EventU32>::new(event_man_rx);
|
let event_recv = MpscEventReceiver::<EventU32>::new(event_man_rx);
|
||||||
Self {
|
Self {
|
||||||
event_manager: EventManagerWithMpscQueue::new(Box::new(event_recv)),
|
event_manager: EventManagerWithBoundedMpsc::new(event_recv),
|
||||||
event_sender,
|
event_sender,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -137,7 +133,7 @@ impl EventManagerWrapper {
|
|||||||
self.event_sender.clone()
|
self.event_sender.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn event_manager(&mut self) -> &mut MpscEventManager {
|
pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc {
|
||||||
&mut self.event_manager
|
&mut self.event_manager
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -178,7 +174,7 @@ impl EventHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn event_manager(&mut self) -> &mut MpscEventManager {
|
pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc {
|
||||||
self.event_man_wrapper.event_manager()
|
self.event_man_wrapper.event_manager()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8,6 +8,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
|
|||||||
|
|
||||||
# [unreleased]
|
# [unreleased]
|
||||||
|
|
||||||
|
## Changed
|
||||||
|
|
||||||
|
- Refactored `EventManager` to heavily use generics instead of trait objects.
|
||||||
|
- `SendEventProvider` -> `EventSendProvider`
|
||||||
|
- `ListenerTable` -> `ListenerMapProvider`
|
||||||
|
- `SenderTable` -> `SenderTableProvider`
|
||||||
|
- There is an `EventManagerWithMpsc` and a `EventManagerWithBoundedMpsc` helper type now.
|
||||||
|
|
||||||
# [v0.2.0-rc.0] 2024-02-21
|
# [v0.2.0-rc.0] 2024-02-21
|
||||||
|
|
||||||
## Added
|
## Added
|
||||||
|
@ -10,26 +10,26 @@
|
|||||||
//! [sat-rs book chapter](https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/book/events.html)
|
//! [sat-rs book chapter](https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/book/events.html)
|
||||||
//! about events first:
|
//! about events first:
|
||||||
//!
|
//!
|
||||||
//! The event manager has a listener table abstracted by the [ListenerTable], which maps
|
//! The event manager has a listener table abstracted by the [ListenerMapProvider], which maps
|
||||||
//! listener groups identified by [ListenerKey]s to a [sender ID][ChannelId].
|
//! listener groups identified by [ListenerKey]s to a [sender ID][ChannelId].
|
||||||
//! It also contains a sender table abstracted by the [SenderTable] which maps these sender IDs
|
//! It also contains a sender table abstracted by the [SenderMapProvider] which maps these sender
|
||||||
//! to a concrete [SendEventProvider]s. A simple approach would be to use one send event provider
|
//! IDs to concrete [EventSendProvider]s. A simple approach would be to use one send event provider
|
||||||
//! for each OBSW thread and then subscribe for all interesting events for a particular thread
|
//! for each OBSW thread and then subscribe for all interesting events for a particular thread
|
||||||
//! using the send event provider ID.
|
//! using the send event provider ID.
|
||||||
//!
|
//!
|
||||||
//! This can be done with the [EventManager] like this:
|
//! This can be done with the [EventManager] like this:
|
||||||
//!
|
//!
|
||||||
//! 1. Provide a concrete [EventReceiver] implementation. This abstraction allow to use different
|
//! 1. Provide a concrete [EventReceiveProvider] implementation. This abstraction allow to use different
|
||||||
//! message queue backends. A straightforward implementation where dynamic memory allocation is
|
//! message queue backends. A straightforward implementation where dynamic memory allocation is
|
||||||
//! not a big concern could use [std::sync::mpsc::channel] to do this and is provided in
|
//! not a big concern could use [std::sync::mpsc::channel] to do this and is provided in
|
||||||
//! form of the [MpscEventReceiver].
|
//! form of the [MpscEventReceiver].
|
||||||
//! 2. To set up event creators, create channel pairs using some message queue implementation.
|
//! 2. To set up event creators, create channel pairs using some message queue implementation.
|
||||||
//! Each event creator gets a (cloned) sender component which allows it to send events to the
|
//! Each event creator gets a (cloned) sender component which allows it to send events to the
|
||||||
//! manager.
|
//! manager.
|
||||||
//! 3. The event manager receives the receiver component as part of a [EventReceiver]
|
//! 3. The event manager receives the receiver component as part of a [EventReceiveProvider]
|
||||||
//! implementation so all events are routed to the manager.
|
//! implementation so all events are routed to the manager.
|
||||||
//! 4. Create the [send event providers][SendEventProvider]s which allow routing events to
|
//! 4. Create the [send event providers][EventSendProvider]s which allow routing events to
|
||||||
//! subscribers. You can now use their [sender IDs][SendEventProvider::id] to subscribe for
|
//! subscribers. You can now use their [sender IDs][EventSendProvider::id] to subscribe for
|
||||||
//! event groups, for example by using the [EventManager::subscribe_single] method.
|
//! event groups, for example by using the [EventManager::subscribe_single] method.
|
||||||
//! 5. Add the send provider as well using the [EventManager::add_sender] call so the event
|
//! 5. Add the send provider as well using the [EventManager::add_sender] call so the event
|
||||||
//! manager can route listener groups to a the send provider.
|
//! manager can route listener groups to a the send provider.
|
||||||
@ -41,24 +41,22 @@
|
|||||||
//!
|
//!
|
||||||
//! # Examples
|
//! # Examples
|
||||||
//!
|
//!
|
||||||
//! You can check [integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/pus_events.rs)
|
//! You can check [integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/pus_events.rs)
|
||||||
//! for a concrete example using multi-threading where events are routed to
|
//! for a concrete example using multi-threading where events are routed to
|
||||||
//! different threads.
|
//! different threads.
|
||||||
use crate::events::{EventU16, EventU32, GenericEvent, LargestEventRaw, LargestGroupIdRaw};
|
use crate::events::{EventU16, EventU32, GenericEvent, LargestEventRaw, LargestGroupIdRaw};
|
||||||
use crate::params::{Params, ParamsHeapless};
|
use crate::params::{Params, ParamsHeapless};
|
||||||
#[cfg(feature = "alloc")]
|
use crate::queue::GenericSendError;
|
||||||
use alloc::boxed::Box;
|
use core::marker::PhantomData;
|
||||||
#[cfg(feature = "alloc")]
|
|
||||||
use alloc::vec;
|
|
||||||
#[cfg(feature = "alloc")]
|
|
||||||
use alloc::vec::Vec;
|
|
||||||
use core::slice::Iter;
|
use core::slice::Iter;
|
||||||
#[cfg(feature = "alloc")]
|
|
||||||
use hashbrown::HashMap;
|
|
||||||
|
|
||||||
use crate::ChannelId;
|
use crate::ChannelId;
|
||||||
|
|
||||||
|
#[cfg(feature = "alloc")]
|
||||||
|
pub use alloc_mod::*;
|
||||||
|
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
pub use stdmod::*;
|
pub use std_mod::*;
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)]
|
#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)]
|
||||||
pub enum ListenerKey {
|
pub enum ListenerKey {
|
||||||
@ -75,18 +73,18 @@ pub type EventWithAuxData<Event> = (Event, Option<Params>);
|
|||||||
pub type EventU32WithAuxData = EventWithAuxData<EventU32>;
|
pub type EventU32WithAuxData = EventWithAuxData<EventU32>;
|
||||||
pub type EventU16WithAuxData = EventWithAuxData<EventU16>;
|
pub type EventU16WithAuxData = EventWithAuxData<EventU16>;
|
||||||
|
|
||||||
pub trait SendEventProvider<Provider: GenericEvent, AuxDataProvider = Params> {
|
pub trait EventSendProvider<EV: GenericEvent, AuxDataProvider = Params> {
|
||||||
type Error;
|
// type Error;
|
||||||
|
|
||||||
fn id(&self) -> ChannelId;
|
fn id(&self) -> ChannelId;
|
||||||
fn send_no_data(&self, event: Provider) -> Result<(), Self::Error> {
|
fn send_no_data(&self, event: EV) -> Result<(), GenericSendError> {
|
||||||
self.send(event, None)
|
self.send(event, None)
|
||||||
}
|
}
|
||||||
fn send(&self, event: Provider, aux_data: Option<AuxDataProvider>) -> Result<(), Self::Error>;
|
fn send(&self, event: EV, aux_data: Option<AuxDataProvider>) -> Result<(), GenericSendError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generic abstraction for an event receiver.
|
/// Generic abstraction for an event receiver.
|
||||||
pub trait EventReceiver<Event: GenericEvent, AuxDataProvider = Params> {
|
pub trait EventReceiveProvider<Event: GenericEvent, AuxDataProvider = Params> {
|
||||||
/// This function has to be provided by any event receiver. A receive call may or may not return
|
/// This function has to be provided by any event receiver. A receive call may or may not return
|
||||||
/// an event.
|
/// an event.
|
||||||
///
|
///
|
||||||
@ -96,87 +94,113 @@ pub trait EventReceiver<Event: GenericEvent, AuxDataProvider = Params> {
|
|||||||
fn receive(&self) -> Option<(Event, Option<AuxDataProvider>)>;
|
fn receive(&self) -> Option<(Event, Option<AuxDataProvider>)>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait ListenerTable {
|
pub trait ListenerMapProvider {
|
||||||
fn get_listeners(&self) -> Vec<ListenerKey>;
|
#[cfg(feature = "alloc")]
|
||||||
|
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
|
||||||
|
fn get_listeners(&self) -> alloc::vec::Vec<ListenerKey>;
|
||||||
fn contains_listener(&self, key: &ListenerKey) -> bool;
|
fn contains_listener(&self, key: &ListenerKey) -> bool;
|
||||||
fn get_listener_ids(&self, key: &ListenerKey) -> Option<Iter<ChannelId>>;
|
fn get_listener_ids(&self, key: &ListenerKey) -> Option<Iter<ChannelId>>;
|
||||||
fn add_listener(&mut self, key: ListenerKey, sender_id: ChannelId) -> bool;
|
fn add_listener(&mut self, key: ListenerKey, sender_id: ChannelId) -> bool;
|
||||||
fn remove_duplicates(&mut self, key: &ListenerKey);
|
fn remove_duplicates(&mut self, key: &ListenerKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait SenderTable<SendProviderError, Event: GenericEvent = EventU32, AuxDataProvider = Params> {
|
pub trait SenderMapProvider<
|
||||||
|
SP: EventSendProvider<EV, AUX>,
|
||||||
|
EV: GenericEvent = EventU32,
|
||||||
|
AUX = Params,
|
||||||
|
>
|
||||||
|
{
|
||||||
fn contains_send_event_provider(&self, id: &ChannelId) -> bool;
|
fn contains_send_event_provider(&self, id: &ChannelId) -> bool;
|
||||||
fn get_send_event_provider(
|
|
||||||
&self,
|
fn get_send_event_provider(&self, id: &ChannelId) -> Option<&SP>;
|
||||||
id: &ChannelId,
|
fn add_send_event_provider(&mut self, send_provider: SP) -> bool;
|
||||||
) -> Option<&dyn SendEventProvider<Event, AuxDataProvider, Error = SendProviderError>>;
|
|
||||||
fn add_send_event_provider(
|
|
||||||
&mut self,
|
|
||||||
send_provider: Box<
|
|
||||||
dyn SendEventProvider<Event, AuxDataProvider, Error = SendProviderError>,
|
|
||||||
>,
|
|
||||||
) -> bool;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generic event manager implementation.
|
/// Generic event manager implementation.
|
||||||
///
|
///
|
||||||
/// # Generics
|
/// # Generics
|
||||||
///
|
///
|
||||||
/// * `SendProviderError`: [SendEventProvider] error type
|
/// * `ERP`: [EventReceiveProvider] used to receive all events.
|
||||||
/// * `Event`: Concrete event provider, currently either [EventU32] or [EventU16]
|
/// * `SMP`: [SenderMapProvider] which maps channel IDs to send providers.
|
||||||
/// * `AuxDataProvider`: Concrete auxiliary data provider, currently either [Params] or
|
/// * `LTR`: [ListenerMapProvider] which maps listener keys to channel IDs.
|
||||||
/// [ParamsHeapless]
|
/// * `SP`: [EventSendProvider] contained within the sender map which sends the events.
|
||||||
pub struct EventManager<SendProviderError, Event: GenericEvent = EventU32, AuxDataProvider = Params>
|
/// * `EV`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32]
|
||||||
{
|
/// and [EventU16] are supported.
|
||||||
listener_table: Box<dyn ListenerTable>,
|
/// * `AUX`: Auxiliary data which is sent with the event to provide optional context information
|
||||||
sender_table: Box<dyn SenderTable<SendProviderError, Event, AuxDataProvider>>,
|
pub struct EventManager<
|
||||||
event_receiver: Box<dyn EventReceiver<Event, AuxDataProvider>>,
|
ERP: EventReceiveProvider<EV, AUX>,
|
||||||
|
SMP: SenderMapProvider<SP, EV, AUX>,
|
||||||
|
LTR: ListenerMapProvider,
|
||||||
|
SP: EventSendProvider<EV, AUX>,
|
||||||
|
EV: GenericEvent = EventU32,
|
||||||
|
AUX = Params,
|
||||||
|
> {
|
||||||
|
event_receiver: ERP,
|
||||||
|
sender_map: SMP,
|
||||||
|
listener_map: LTR,
|
||||||
|
phantom: core::marker::PhantomData<(SP, EV, AUX)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Safety: It is safe to implement [Send] because all fields in the [EventManager] are [Send]
|
/// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap]
|
||||||
/// as well
|
/// and the [DefaultListenerMap]. It uses regular mpsc channels as the message queue backend.
|
||||||
#[cfg(feature = "std")]
|
pub type EventManagerWithMpsc<EV = EventU32, AUX = Params> = EventManager<
|
||||||
unsafe impl<E, Event: GenericEvent + Send, AuxDataProvider: Send> Send
|
MpscEventReceiver,
|
||||||
for EventManager<E, Event, AuxDataProvider>
|
DefaultSenderMap<EventSenderMpsc<EV>, EV, AUX>,
|
||||||
{
|
DefaultListenerMap,
|
||||||
}
|
EventSenderMpsc<EV>,
|
||||||
|
>;
|
||||||
|
|
||||||
#[cfg(feature = "std")]
|
/// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap]
|
||||||
pub type EventManagerWithMpscQueue<Event, AuxDataProvider> = EventManager<
|
/// and the [DefaultListenerMap]. It uses
|
||||||
std::sync::mpsc::SendError<(Event, Option<AuxDataProvider>)>,
|
/// [bounded mpsc senders](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) as the
|
||||||
Event,
|
/// message queue backend.
|
||||||
AuxDataProvider,
|
pub type EventManagerWithBoundedMpsc<EV = EventU32, AUX = Params> = EventManager<
|
||||||
|
MpscEventReceiver,
|
||||||
|
DefaultSenderMap<EventSenderMpscBounded<EV>, EV, AUX>,
|
||||||
|
DefaultListenerMap,
|
||||||
|
EventSenderMpscBounded<EV>,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum EventRoutingResult<Event: GenericEvent, AuxDataProvider> {
|
pub enum EventRoutingResult<EV: GenericEvent, AUX> {
|
||||||
/// No event was received
|
/// No event was received
|
||||||
Empty,
|
Empty,
|
||||||
/// An event was received and routed.
|
/// An event was received and routed to listeners.
|
||||||
/// The first tuple entry will contain the number of recipients.
|
Handled {
|
||||||
Handled(u32, Event, Option<AuxDataProvider>),
|
num_recipients: u32,
|
||||||
|
event: EV,
|
||||||
|
aux_data: Option<AUX>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum EventRoutingError<E> {
|
pub enum EventRoutingError {
|
||||||
SendError(E),
|
Send(GenericSendError),
|
||||||
NoSendersForKey(ListenerKey),
|
NoSendersForKey(ListenerKey),
|
||||||
NoSenderForId(ChannelId),
|
NoSenderForId(ChannelId),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct EventRoutingErrorsWithResult<Event: GenericEvent, AuxDataProvider, E> {
|
pub struct EventRoutingErrorsWithResult<EV: GenericEvent, AUX> {
|
||||||
pub result: EventRoutingResult<Event, AuxDataProvider>,
|
pub result: EventRoutingResult<EV, AUX>,
|
||||||
pub errors: [Option<EventRoutingError<E>>; 3],
|
pub errors: [Option<EventRoutingError>; 3],
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E, Event: GenericEvent + Copy> EventManager<E, Event> {
|
impl<
|
||||||
|
ER: EventReceiveProvider<EV, AUX>,
|
||||||
|
S: SenderMapProvider<SP, EV, AUX>,
|
||||||
|
L: ListenerMapProvider,
|
||||||
|
SP: EventSendProvider<EV, AUX>,
|
||||||
|
EV: GenericEvent + Copy,
|
||||||
|
AUX: Clone,
|
||||||
|
> EventManager<ER, S, L, SP, EV, AUX>
|
||||||
|
{
|
||||||
pub fn remove_duplicates(&mut self, key: &ListenerKey) {
|
pub fn remove_duplicates(&mut self, key: &ListenerKey) {
|
||||||
self.listener_table.remove_duplicates(key)
|
self.listener_map.remove_duplicates(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Subscribe for a unique event.
|
/// Subscribe for a unique event.
|
||||||
pub fn subscribe_single(&mut self, event: &Event, sender_id: ChannelId) {
|
pub fn subscribe_single(&mut self, event: &EV, sender_id: ChannelId) {
|
||||||
self.update_listeners(ListenerKey::Single(event.raw_as_largest_type()), sender_id);
|
self.update_listeners(ListenerKey::Single(event.raw_as_largest_type()), sender_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,49 +218,35 @@ impl<E, Event: GenericEvent + Copy> EventManager<E, Event> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E: 'static, Event: GenericEvent + Copy + 'static, AuxDataProvider: Clone + 'static>
|
impl<
|
||||||
EventManager<E, Event, AuxDataProvider>
|
ERP: EventReceiveProvider<EV, AUX>,
|
||||||
|
SMP: SenderMapProvider<SP, EV, AUX>,
|
||||||
|
LTR: ListenerMapProvider,
|
||||||
|
SP: EventSendProvider<EV, AUX>,
|
||||||
|
EV: GenericEvent + Copy,
|
||||||
|
AUX: Clone,
|
||||||
|
> EventManager<ERP, SMP, LTR, SP, EV, AUX>
|
||||||
{
|
{
|
||||||
/// Create an event manager where the sender table will be the [DefaultSenderTableProvider]
|
pub fn new_with_custom_maps(event_receiver: ERP, sender_map: SMP, listener_map: LTR) -> Self {
|
||||||
/// and the listener table will be the [DefaultListenerTableProvider].
|
|
||||||
pub fn new(event_receiver: Box<dyn EventReceiver<Event, AuxDataProvider>>) -> Self {
|
|
||||||
let listener_table: Box<DefaultListenerTableProvider> = Box::default();
|
|
||||||
let sender_table: Box<DefaultSenderTableProvider<E, Event, AuxDataProvider>> =
|
|
||||||
Box::default();
|
|
||||||
Self::new_custom_tables(listener_table, sender_table, event_receiver)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<E, Event: GenericEvent + Copy, AuxDataProvider: Clone>
|
|
||||||
EventManager<E, Event, AuxDataProvider>
|
|
||||||
{
|
|
||||||
pub fn new_custom_tables(
|
|
||||||
listener_table: Box<dyn ListenerTable>,
|
|
||||||
sender_table: Box<dyn SenderTable<E, Event, AuxDataProvider>>,
|
|
||||||
event_receiver: Box<dyn EventReceiver<Event, AuxDataProvider>>,
|
|
||||||
) -> Self {
|
|
||||||
EventManager {
|
EventManager {
|
||||||
listener_table,
|
listener_map,
|
||||||
sender_table,
|
sender_map,
|
||||||
event_receiver,
|
event_receiver,
|
||||||
|
phantom: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_sender(
|
pub fn add_sender(&mut self, send_provider: SP) {
|
||||||
&mut self,
|
|
||||||
send_provider: impl SendEventProvider<Event, AuxDataProvider, Error = E> + 'static,
|
|
||||||
) {
|
|
||||||
if !self
|
if !self
|
||||||
.sender_table
|
.sender_map
|
||||||
.contains_send_event_provider(&send_provider.id())
|
.contains_send_event_provider(&send_provider.id())
|
||||||
{
|
{
|
||||||
self.sender_table
|
self.sender_map.add_send_event_provider(send_provider);
|
||||||
.add_send_event_provider(Box::new(send_provider));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_listeners(&mut self, key: ListenerKey, sender_id: ChannelId) {
|
fn update_listeners(&mut self, key: ListenerKey, sender_id: ChannelId) {
|
||||||
self.listener_table.add_listener(key, sender_id);
|
self.listener_map.add_listener(key, sender_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This function will use the cached event receiver and try to receive one event.
|
/// This function will use the cached event receiver and try to receive one event.
|
||||||
@ -248,39 +258,35 @@ impl<E, Event: GenericEvent + Copy, AuxDataProvider: Clone>
|
|||||||
/// [EventRoutingErrorsWithResult] error struct.
|
/// [EventRoutingErrorsWithResult] error struct.
|
||||||
pub fn try_event_handling(
|
pub fn try_event_handling(
|
||||||
&self,
|
&self,
|
||||||
) -> Result<
|
) -> Result<EventRoutingResult<EV, AUX>, EventRoutingErrorsWithResult<EV, AUX>> {
|
||||||
EventRoutingResult<Event, AuxDataProvider>,
|
|
||||||
EventRoutingErrorsWithResult<Event, AuxDataProvider, E>,
|
|
||||||
> {
|
|
||||||
let mut err_idx = 0;
|
let mut err_idx = 0;
|
||||||
let mut err_slice = [None, None, None];
|
let mut err_slice = [None, None, None];
|
||||||
let mut num_recipients = 0;
|
let mut num_recipients = 0;
|
||||||
let mut add_error = |error: EventRoutingError<E>| {
|
let mut add_error = |error: EventRoutingError| {
|
||||||
if err_idx < 3 {
|
if err_idx < 3 {
|
||||||
err_slice[err_idx] = Some(error);
|
err_slice[err_idx] = Some(error);
|
||||||
err_idx += 1;
|
err_idx += 1;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let mut send_handler =
|
let mut send_handler = |key: &ListenerKey, event: EV, aux_data: &Option<AUX>| {
|
||||||
|key: &ListenerKey, event: Event, aux_data: &Option<AuxDataProvider>| {
|
if self.listener_map.contains_listener(key) {
|
||||||
if self.listener_table.contains_listener(key) {
|
if let Some(ids) = self.listener_map.get_listener_ids(key) {
|
||||||
if let Some(ids) = self.listener_table.get_listener_ids(key) {
|
for id in ids {
|
||||||
for id in ids {
|
if let Some(sender) = self.sender_map.get_send_event_provider(id) {
|
||||||
if let Some(sender) = self.sender_table.get_send_event_provider(id) {
|
if let Err(e) = sender.send(event, aux_data.clone()) {
|
||||||
if let Err(e) = sender.send(event, aux_data.clone()) {
|
add_error(EventRoutingError::Send(e));
|
||||||
add_error(EventRoutingError::SendError(e));
|
|
||||||
} else {
|
|
||||||
num_recipients += 1;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
add_error(EventRoutingError::NoSenderForId(*id));
|
num_recipients += 1;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
add_error(EventRoutingError::NoSenderForId(*id));
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
add_error(EventRoutingError::NoSendersForKey(*key));
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
add_error(EventRoutingError::NoSendersForKey(*key));
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
};
|
||||||
if let Some((event, aux_data)) = self.event_receiver.receive() {
|
if let Some((event, aux_data)) = self.event_receiver.receive() {
|
||||||
let single_key = ListenerKey::Single(event.raw_as_largest_type());
|
let single_key = ListenerKey::Single(event.raw_as_largest_type());
|
||||||
send_handler(&single_key, event, &aux_data);
|
send_handler(&single_key, event, &aux_data);
|
||||||
@ -289,129 +295,151 @@ impl<E, Event: GenericEvent + Copy, AuxDataProvider: Clone>
|
|||||||
send_handler(&ListenerKey::All, event, &aux_data);
|
send_handler(&ListenerKey::All, event, &aux_data);
|
||||||
if err_idx > 0 {
|
if err_idx > 0 {
|
||||||
return Err(EventRoutingErrorsWithResult {
|
return Err(EventRoutingErrorsWithResult {
|
||||||
result: EventRoutingResult::Handled(num_recipients, event, aux_data),
|
result: EventRoutingResult::Handled {
|
||||||
|
num_recipients,
|
||||||
|
event,
|
||||||
|
aux_data,
|
||||||
|
},
|
||||||
errors: err_slice,
|
errors: err_slice,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
return Ok(EventRoutingResult::Handled(num_recipients, event, aux_data));
|
return Ok(EventRoutingResult::Handled {
|
||||||
|
num_recipients,
|
||||||
|
event,
|
||||||
|
aux_data,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
Ok(EventRoutingResult::Empty)
|
Ok(EventRoutingResult::Empty)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[cfg(feature = "alloc")]
|
||||||
pub struct DefaultListenerTableProvider {
|
pub mod alloc_mod {
|
||||||
listeners: HashMap<ListenerKey, Vec<ChannelId>>,
|
use alloc::vec::Vec;
|
||||||
}
|
use hashbrown::HashMap;
|
||||||
|
|
||||||
pub struct DefaultSenderTableProvider<
|
use super::*;
|
||||||
SendProviderError,
|
|
||||||
Event: GenericEvent = EventU32,
|
|
||||||
AuxDataProvider = Params,
|
|
||||||
> {
|
|
||||||
senders: HashMap<
|
|
||||||
ChannelId,
|
|
||||||
Box<dyn SendEventProvider<Event, AuxDataProvider, Error = SendProviderError>>,
|
|
||||||
>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<SendProviderError, Event: GenericEvent, AuxDataProvider> Default
|
impl<
|
||||||
for DefaultSenderTableProvider<SendProviderError, Event, AuxDataProvider>
|
ER: EventReceiveProvider<EV, AUX>,
|
||||||
{
|
SP: EventSendProvider<EV, AUX>,
|
||||||
fn default() -> Self {
|
EV: GenericEvent + Copy,
|
||||||
Self {
|
AUX: 'static,
|
||||||
senders: HashMap::new(),
|
> EventManager<ER, DefaultSenderMap<SP, EV, AUX>, DefaultListenerMap, SP, EV, AUX>
|
||||||
|
{
|
||||||
|
/// Create an event manager where the sender table will be the [DefaultSenderMap]
|
||||||
|
/// and the listener table will be the [DefaultListenerMap].
|
||||||
|
pub fn new(event_receiver: ER) -> Self {
|
||||||
|
Self {
|
||||||
|
listener_map: DefaultListenerMap::default(),
|
||||||
|
sender_map: DefaultSenderMap::default(),
|
||||||
|
event_receiver,
|
||||||
|
phantom: PhantomData,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl ListenerTable for DefaultListenerTableProvider {
|
#[derive(Default)]
|
||||||
fn get_listeners(&self) -> Vec<ListenerKey> {
|
pub struct DefaultListenerMap {
|
||||||
let mut key_list = Vec::new();
|
listeners: HashMap<ListenerKey, Vec<ChannelId>>,
|
||||||
for key in self.listeners.keys() {
|
|
||||||
key_list.push(*key);
|
|
||||||
}
|
|
||||||
key_list
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn contains_listener(&self, key: &ListenerKey) -> bool {
|
pub struct DefaultSenderMap<
|
||||||
self.listeners.contains_key(key)
|
SP: EventSendProvider<EV, AUX>,
|
||||||
|
EV: GenericEvent = EventU32,
|
||||||
|
AUX = Params,
|
||||||
|
> {
|
||||||
|
senders: HashMap<ChannelId, SP>,
|
||||||
|
phantom: PhantomData<(EV, AUX)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_listener_ids(&self, key: &ListenerKey) -> Option<Iter<ChannelId>> {
|
impl<SP: EventSendProvider<EV, AUX>, EV: GenericEvent, AUX> Default
|
||||||
self.listeners.get(key).map(|vec| vec.iter())
|
for DefaultSenderMap<SP, EV, AUX>
|
||||||
}
|
{
|
||||||
|
fn default() -> Self {
|
||||||
fn add_listener(&mut self, key: ListenerKey, sender_id: ChannelId) -> bool {
|
Self {
|
||||||
if let Some(existing_list) = self.listeners.get_mut(&key) {
|
senders: Default::default(),
|
||||||
existing_list.push(sender_id);
|
phantom: Default::default(),
|
||||||
} else {
|
}
|
||||||
let new_list = vec![sender_id];
|
|
||||||
self.listeners.insert(key, new_list);
|
|
||||||
}
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove_duplicates(&mut self, key: &ListenerKey) {
|
|
||||||
if let Some(list) = self.listeners.get_mut(key) {
|
|
||||||
list.sort_unstable();
|
|
||||||
list.dedup();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<SendProviderError, Event: GenericEvent, AuxDataProvider>
|
impl ListenerMapProvider for DefaultListenerMap {
|
||||||
SenderTable<SendProviderError, Event, AuxDataProvider>
|
fn get_listeners(&self) -> Vec<ListenerKey> {
|
||||||
for DefaultSenderTableProvider<SendProviderError, Event, AuxDataProvider>
|
let mut key_list = Vec::new();
|
||||||
{
|
for key in self.listeners.keys() {
|
||||||
fn contains_send_event_provider(&self, id: &ChannelId) -> bool {
|
key_list.push(*key);
|
||||||
self.senders.contains_key(id)
|
}
|
||||||
}
|
key_list
|
||||||
|
}
|
||||||
fn get_send_event_provider(
|
|
||||||
&self,
|
fn contains_listener(&self, key: &ListenerKey) -> bool {
|
||||||
id: &ChannelId,
|
self.listeners.contains_key(key)
|
||||||
) -> Option<&dyn SendEventProvider<Event, AuxDataProvider, Error = SendProviderError>> {
|
}
|
||||||
self.senders
|
|
||||||
.get(id)
|
fn get_listener_ids(&self, key: &ListenerKey) -> Option<Iter<ChannelId>> {
|
||||||
.filter(|sender| sender.id() == *id)
|
self.listeners.get(key).map(|vec| vec.iter())
|
||||||
.map(|v| v.as_ref())
|
}
|
||||||
}
|
|
||||||
|
fn add_listener(&mut self, key: ListenerKey, sender_id: ChannelId) -> bool {
|
||||||
fn add_send_event_provider(
|
if let Some(existing_list) = self.listeners.get_mut(&key) {
|
||||||
&mut self,
|
existing_list.push(sender_id);
|
||||||
send_provider: Box<
|
} else {
|
||||||
dyn SendEventProvider<Event, AuxDataProvider, Error = SendProviderError>,
|
let new_list = alloc::vec![sender_id];
|
||||||
>,
|
self.listeners.insert(key, new_list);
|
||||||
) -> bool {
|
}
|
||||||
let id = send_provider.id();
|
true
|
||||||
if self.senders.contains_key(&id) {
|
}
|
||||||
return false;
|
|
||||||
|
fn remove_duplicates(&mut self, key: &ListenerKey) {
|
||||||
|
if let Some(list) = self.listeners.get_mut(key) {
|
||||||
|
list.sort_unstable();
|
||||||
|
list.dedup();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<SP: EventSendProvider<EV, AUX>, EV: GenericEvent, AUX> SenderMapProvider<SP, EV, AUX>
|
||||||
|
for DefaultSenderMap<SP, EV, AUX>
|
||||||
|
{
|
||||||
|
fn contains_send_event_provider(&self, id: &ChannelId) -> bool {
|
||||||
|
self.senders.contains_key(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_send_event_provider(&self, id: &ChannelId) -> Option<&SP> {
|
||||||
|
self.senders.get(id).filter(|sender| sender.id() == *id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_send_event_provider(&mut self, send_provider: SP) -> bool {
|
||||||
|
let id = send_provider.id();
|
||||||
|
if self.senders.contains_key(&id) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
self.senders.insert(id, send_provider).is_none()
|
||||||
}
|
}
|
||||||
self.senders.insert(id, send_provider).is_none()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
pub mod stdmod {
|
pub mod std_mod {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::event_man::{EventReceiver, EventWithAuxData};
|
use crate::event_man::{EventReceiveProvider, EventWithAuxData};
|
||||||
use crate::events::{EventU16, EventU32, GenericEvent};
|
use crate::events::{EventU16, EventU32, GenericEvent};
|
||||||
use crate::params::Params;
|
use crate::params::Params;
|
||||||
use std::sync::mpsc::{Receiver, SendError, Sender};
|
use std::sync::mpsc::{self};
|
||||||
|
|
||||||
pub struct MpscEventReceiver<Event: GenericEvent + Send = EventU32> {
|
pub struct MpscEventReceiver<Event: GenericEvent + Send = EventU32> {
|
||||||
mpsc_receiver: Receiver<(Event, Option<Params>)>,
|
mpsc_receiver: mpsc::Receiver<(Event, Option<Params>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Event: GenericEvent + Send> MpscEventReceiver<Event> {
|
impl<Event: GenericEvent + Send> MpscEventReceiver<Event> {
|
||||||
pub fn new(receiver: Receiver<(Event, Option<Params>)>) -> Self {
|
pub fn new(receiver: mpsc::Receiver<(Event, Option<Params>)>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
mpsc_receiver: receiver,
|
mpsc_receiver: receiver,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
impl<Event: GenericEvent + Send> EventReceiver<Event> for MpscEventReceiver<Event> {
|
impl<Event: GenericEvent + Send> EventReceiveProvider<Event> for MpscEventReceiver<Event> {
|
||||||
fn receive(&self) -> Option<EventWithAuxData<Event>> {
|
fn receive(&self) -> Option<EventWithAuxData<Event>> {
|
||||||
if let Ok(event_and_data) = self.mpsc_receiver.try_recv() {
|
if let Ok(event_and_data) = self.mpsc_receiver.try_recv() {
|
||||||
return Some(event_and_data);
|
return Some(event_and_data);
|
||||||
@ -424,30 +452,59 @@ pub mod stdmod {
|
|||||||
pub type MpscEventU16Receiver = MpscEventReceiver<EventU16>;
|
pub type MpscEventU16Receiver = MpscEventReceiver<EventU16>;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct MpscEventSendProvider<Event: GenericEvent + Send> {
|
pub struct EventSenderMpsc<Event: GenericEvent + Send> {
|
||||||
id: u32,
|
id: u32,
|
||||||
sender: Sender<(Event, Option<Params>)>,
|
sender: mpsc::Sender<(Event, Option<Params>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Event: GenericEvent + Send> MpscEventSendProvider<Event> {
|
impl<Event: GenericEvent + Send> EventSenderMpsc<Event> {
|
||||||
pub fn new(id: u32, sender: Sender<(Event, Option<Params>)>) -> Self {
|
pub fn new(id: u32, sender: mpsc::Sender<(Event, Option<Params>)>) -> Self {
|
||||||
Self { id, sender }
|
Self { id, sender }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Event: GenericEvent + Send> SendEventProvider<Event> for MpscEventSendProvider<Event> {
|
impl<Event: GenericEvent + Send> EventSendProvider<Event> for EventSenderMpsc<Event> {
|
||||||
type Error = SendError<(Event, Option<Params>)>;
|
|
||||||
|
|
||||||
fn id(&self) -> u32 {
|
fn id(&self) -> u32 {
|
||||||
self.id
|
self.id
|
||||||
}
|
}
|
||||||
fn send(&self, event: Event, aux_data: Option<Params>) -> Result<(), Self::Error> {
|
fn send(&self, event: Event, aux_data: Option<Params>) -> Result<(), GenericSendError> {
|
||||||
self.sender.send((event, aux_data))
|
self.sender
|
||||||
|
.send((event, aux_data))
|
||||||
|
.map_err(|_| GenericSendError::RxDisconnected)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type MpscEventU32SendProvider = MpscEventSendProvider<EventU32>;
|
#[derive(Clone)]
|
||||||
pub type MpscEventU16SendProvider = MpscEventSendProvider<EventU16>;
|
pub struct EventSenderMpscBounded<Event: GenericEvent + Send> {
|
||||||
|
id: u32,
|
||||||
|
sender: mpsc::SyncSender<(Event, Option<Params>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Event: GenericEvent + Send> EventSenderMpscBounded<Event> {
|
||||||
|
pub fn new(id: u32, sender: mpsc::SyncSender<(Event, Option<Params>)>) -> Self {
|
||||||
|
Self { id, sender }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Event: GenericEvent + Send> EventSendProvider<Event> for EventSenderMpscBounded<Event> {
|
||||||
|
fn id(&self) -> u32 {
|
||||||
|
self.id
|
||||||
|
}
|
||||||
|
fn send(&self, event: Event, aux_data: Option<Params>) -> Result<(), GenericSendError> {
|
||||||
|
if let Err(e) = self.sender.try_send((event, aux_data)) {
|
||||||
|
return match e {
|
||||||
|
mpsc::TrySendError::Full(_) => Err(GenericSendError::QueueFull(None)),
|
||||||
|
mpsc::TrySendError::Disconnected(_) => Err(GenericSendError::RxDisconnected),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type EventU32SenderMpsc = EventSenderMpsc<EventU32>;
|
||||||
|
pub type EventU16SenderMpsc = EventSenderMpsc<EventU16>;
|
||||||
|
pub type EventU32SenderMpscBounded = EventSenderMpscBounded<EventU32>;
|
||||||
|
pub type EventU16SenderMpscBounded = EventSenderMpscBounded<EventU16>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -456,32 +513,8 @@ mod tests {
|
|||||||
use crate::event_man::EventManager;
|
use crate::event_man::EventManager;
|
||||||
use crate::events::{EventU32, GenericEvent, Severity};
|
use crate::events::{EventU32, GenericEvent, Severity};
|
||||||
use crate::params::ParamsRaw;
|
use crate::params::ParamsRaw;
|
||||||
use alloc::boxed::Box;
|
|
||||||
use std::format;
|
use std::format;
|
||||||
use std::sync::mpsc::{channel, Receiver, SendError, Sender};
|
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct MpscEventSenderQueue {
|
|
||||||
id: u32,
|
|
||||||
mpsc_sender: Sender<EventU32WithAuxData>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MpscEventSenderQueue {
|
|
||||||
fn new(id: u32, mpsc_sender: Sender<EventU32WithAuxData>) -> Self {
|
|
||||||
Self { id, mpsc_sender }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SendEventProvider<EventU32> for MpscEventSenderQueue {
|
|
||||||
type Error = SendError<EventU32WithAuxData>;
|
|
||||||
|
|
||||||
fn id(&self) -> u32 {
|
|
||||||
self.id
|
|
||||||
}
|
|
||||||
fn send(&self, event: EventU32, aux_data: Option<Params>) -> Result<(), Self::Error> {
|
|
||||||
self.mpsc_sender.send((event, aux_data))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn check_next_event(
|
fn check_next_event(
|
||||||
expected: EventU32,
|
expected: EventU32,
|
||||||
@ -500,22 +533,21 @@ mod tests {
|
|||||||
expected_num_sent: u32,
|
expected_num_sent: u32,
|
||||||
) {
|
) {
|
||||||
assert!(matches!(res, EventRoutingResult::Handled { .. }));
|
assert!(matches!(res, EventRoutingResult::Handled { .. }));
|
||||||
if let EventRoutingResult::Handled(num_recipients, event, _aux_data) = res {
|
if let EventRoutingResult::Handled {
|
||||||
|
num_recipients,
|
||||||
|
event,
|
||||||
|
..
|
||||||
|
} = res
|
||||||
|
{
|
||||||
assert_eq!(event, expected);
|
assert_eq!(event, expected);
|
||||||
assert_eq!(num_recipients, expected_num_sent);
|
assert_eq!(num_recipients, expected_num_sent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn generic_event_man() -> (
|
fn generic_event_man() -> (Sender<EventU32WithAuxData>, EventManagerWithMpsc) {
|
||||||
Sender<EventU32WithAuxData>,
|
|
||||||
EventManager<SendError<EventU32WithAuxData>>,
|
|
||||||
) {
|
|
||||||
let (event_sender, manager_queue) = channel();
|
let (event_sender, manager_queue) = channel();
|
||||||
let event_man_receiver = MpscEventReceiver::new(manager_queue);
|
let event_man_receiver = MpscEventReceiver::new(manager_queue);
|
||||||
(
|
(event_sender, EventManager::new(event_man_receiver))
|
||||||
event_sender,
|
|
||||||
EventManager::new(Box::new(event_man_receiver)),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -524,14 +556,11 @@ mod tests {
|
|||||||
let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap();
|
let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap();
|
||||||
let event_grp_1_0 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
|
let event_grp_1_0 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
|
||||||
let (single_event_sender, single_event_receiver) = channel();
|
let (single_event_sender, single_event_receiver) = channel();
|
||||||
let single_event_listener = MpscEventSenderQueue::new(0, single_event_sender);
|
let single_event_listener = EventSenderMpsc::new(0, single_event_sender);
|
||||||
event_man.subscribe_single(&event_grp_0, single_event_listener.id());
|
event_man.subscribe_single(&event_grp_0, single_event_listener.id());
|
||||||
event_man.add_sender(single_event_listener);
|
event_man.add_sender(single_event_listener);
|
||||||
let (group_event_sender_0, group_event_receiver_0) = channel();
|
let (group_event_sender_0, group_event_receiver_0) = channel();
|
||||||
let group_event_listener = MpscEventSenderQueue {
|
let group_event_listener = EventU32SenderMpsc::new(1, group_event_sender_0);
|
||||||
id: 1,
|
|
||||||
mpsc_sender: group_event_sender_0,
|
|
||||||
};
|
|
||||||
event_man.subscribe_group(event_grp_1_0.group_id(), group_event_listener.id());
|
event_man.subscribe_group(event_grp_1_0.group_id(), group_event_listener.id());
|
||||||
event_man.add_sender(group_event_listener);
|
event_man.add_sender(group_event_listener);
|
||||||
|
|
||||||
@ -559,7 +588,7 @@ mod tests {
|
|||||||
let (event_sender, mut event_man) = generic_event_man();
|
let (event_sender, mut event_man) = generic_event_man();
|
||||||
let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap();
|
let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap();
|
||||||
let (single_event_sender, single_event_receiver) = channel();
|
let (single_event_sender, single_event_receiver) = channel();
|
||||||
let single_event_listener = MpscEventSenderQueue::new(0, single_event_sender);
|
let single_event_listener = EventSenderMpsc::new(0, single_event_sender);
|
||||||
event_man.subscribe_single(&event_grp_0, single_event_listener.id());
|
event_man.subscribe_single(&event_grp_0, single_event_listener.id());
|
||||||
event_man.add_sender(single_event_listener);
|
event_man.add_sender(single_event_listener);
|
||||||
event_sender
|
event_sender
|
||||||
@ -591,10 +620,7 @@ mod tests {
|
|||||||
let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap();
|
let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap();
|
||||||
let event_grp_1_0 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
|
let event_grp_1_0 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
|
||||||
let (event_grp_0_sender, event_grp_0_receiver) = channel();
|
let (event_grp_0_sender, event_grp_0_receiver) = channel();
|
||||||
let event_grp_0_and_1_listener = MpscEventSenderQueue {
|
let event_grp_0_and_1_listener = EventU32SenderMpsc::new(0, event_grp_0_sender);
|
||||||
id: 0,
|
|
||||||
mpsc_sender: event_grp_0_sender,
|
|
||||||
};
|
|
||||||
event_man.subscribe_group(event_grp_0.group_id(), event_grp_0_and_1_listener.id());
|
event_man.subscribe_group(event_grp_0.group_id(), event_grp_0_and_1_listener.id());
|
||||||
event_man.subscribe_group(event_grp_1_0.group_id(), event_grp_0_and_1_listener.id());
|
event_man.subscribe_group(event_grp_1_0.group_id(), event_grp_0_and_1_listener.id());
|
||||||
event_man.add_sender(event_grp_0_and_1_listener);
|
event_man.add_sender(event_grp_0_and_1_listener);
|
||||||
@ -625,14 +651,8 @@ mod tests {
|
|||||||
let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
|
let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
|
||||||
let (event_0_tx_0, event_0_rx_0) = channel();
|
let (event_0_tx_0, event_0_rx_0) = channel();
|
||||||
let (event_0_tx_1, event_0_rx_1) = channel();
|
let (event_0_tx_1, event_0_rx_1) = channel();
|
||||||
let event_listener_0 = MpscEventSenderQueue {
|
let event_listener_0 = EventU32SenderMpsc::new(0, event_0_tx_0);
|
||||||
id: 0,
|
let event_listener_1 = EventU32SenderMpsc::new(1, event_0_tx_1);
|
||||||
mpsc_sender: event_0_tx_0,
|
|
||||||
};
|
|
||||||
let event_listener_1 = MpscEventSenderQueue {
|
|
||||||
id: 1,
|
|
||||||
mpsc_sender: event_0_tx_1,
|
|
||||||
};
|
|
||||||
let event_listener_0_sender_id = event_listener_0.id();
|
let event_listener_0_sender_id = event_listener_0.id();
|
||||||
event_man.subscribe_single(&event_0, event_listener_0_sender_id);
|
event_man.subscribe_single(&event_0, event_listener_0_sender_id);
|
||||||
event_man.add_sender(event_listener_0);
|
event_man.add_sender(event_listener_0);
|
||||||
@ -681,15 +701,11 @@ mod tests {
|
|||||||
fn test_all_events_listener() {
|
fn test_all_events_listener() {
|
||||||
let (event_sender, manager_queue) = channel();
|
let (event_sender, manager_queue) = channel();
|
||||||
let event_man_receiver = MpscEventReceiver::new(manager_queue);
|
let event_man_receiver = MpscEventReceiver::new(manager_queue);
|
||||||
let mut event_man: EventManager<SendError<EventU32WithAuxData>> =
|
let mut event_man = EventManagerWithMpsc::new(event_man_receiver);
|
||||||
EventManager::new(Box::new(event_man_receiver));
|
|
||||||
let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap();
|
let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap();
|
||||||
let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
|
let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
|
||||||
let (event_0_tx_0, all_events_rx) = channel();
|
let (event_0_tx_0, all_events_rx) = channel();
|
||||||
let all_events_listener = MpscEventSenderQueue {
|
let all_events_listener = EventU32SenderMpsc::new(0, event_0_tx_0);
|
||||||
id: 0,
|
|
||||||
mpsc_sender: event_0_tx_0,
|
|
||||||
};
|
|
||||||
event_man.subscribe_all(all_events_listener.id());
|
event_man.subscribe_all(all_events_listener.id());
|
||||||
event_man.add_sender(all_events_listener);
|
event_man.add_sender(all_events_listener);
|
||||||
event_sender
|
event_sender
|
||||||
@ -707,4 +723,9 @@ mod tests {
|
|||||||
check_next_event(event_0, &all_events_rx);
|
check_next_event(event_0, &all_events_rx);
|
||||||
check_next_event(event_1, &all_events_rx);
|
check_next_event(event_1, &all_events_rx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_bounded_event_sender() {
|
||||||
|
// TODO: Test with bounded sender
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -107,7 +107,7 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for CobsTmSender {
|
|||||||
///
|
///
|
||||||
/// ## Example
|
/// ## Example
|
||||||
///
|
///
|
||||||
/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
|
/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/tcp_servers.rs)
|
||||||
/// test also serves as the example application for this module.
|
/// test also serves as the example application for this module.
|
||||||
pub struct TcpTmtcInCobsServer<
|
pub struct TcpTmtcInCobsServer<
|
||||||
TmError,
|
TmError,
|
||||||
|
@ -88,7 +88,7 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for SpacepacketsTmSender {
|
|||||||
/// [spacepackets::PacketId]s as part of the server configuration for that purpose.
|
/// [spacepackets::PacketId]s as part of the server configuration for that purpose.
|
||||||
///
|
///
|
||||||
/// ## Example
|
/// ## Example
|
||||||
/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
|
/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/tcp_servers.rs)
|
||||||
/// also serves as the example application for this module.
|
/// also serves as the example application for this module.
|
||||||
pub struct TcpSpacepacketsServer<
|
pub struct TcpSpacepacketsServer<
|
||||||
TmError,
|
TmError,
|
||||||
|
@ -26,8 +26,6 @@ extern crate std;
|
|||||||
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
|
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
|
||||||
pub mod cfdp;
|
pub mod cfdp;
|
||||||
pub mod encoding;
|
pub mod encoding;
|
||||||
#[cfg(feature = "alloc")]
|
|
||||||
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
|
|
||||||
pub mod event_man;
|
pub mod event_man;
|
||||||
pub mod events;
|
pub mod events;
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
|
@ -56,8 +56,6 @@ use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU64, EcssEnumU8};
|
|||||||
use spacepackets::util::UnsignedEnum;
|
use spacepackets::util::UnsignedEnum;
|
||||||
use spacepackets::ByteConversionError;
|
use spacepackets::ByteConversionError;
|
||||||
|
|
||||||
#[cfg(feature = "alloc")]
|
|
||||||
pub use alloc_mod::*;
|
|
||||||
pub use spacepackets::util::ToBeBytes;
|
pub use spacepackets::util::ToBeBytes;
|
||||||
|
|
||||||
/// Generic trait which is used for objects which can be converted into a raw network (big) endian
|
/// Generic trait which is used for objects which can be converted into a raw network (big) endian
|
||||||
@ -560,56 +558,64 @@ from_conversions_for_raw!(
|
|||||||
(f64, Self::F64),
|
(f64, Self::F64),
|
||||||
);
|
);
|
||||||
|
|
||||||
#[cfg(feature = "alloc")]
|
/// Generic enumeration for additional parameters, including parameters which rely on heap
|
||||||
mod alloc_mod {
|
/// allocations.
|
||||||
use super::*;
|
#[derive(Debug, Clone)]
|
||||||
/// Generic enumeration for additional parameters, including parameters which rely on heap
|
#[non_exhaustive]
|
||||||
/// allocations.
|
pub enum Params {
|
||||||
|
Heapless(ParamsHeapless),
|
||||||
|
Store(StoreAddr),
|
||||||
|
#[cfg(feature = "alloc")]
|
||||||
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
|
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
|
||||||
#[derive(Debug, Clone)]
|
Vec(Vec<u8>),
|
||||||
pub enum Params {
|
#[cfg(feature = "alloc")]
|
||||||
Heapless(ParamsHeapless),
|
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
|
||||||
Store(StoreAddr),
|
String(String),
|
||||||
Vec(Vec<u8>),
|
}
|
||||||
String(String),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<StoreAddr> for Params {
|
impl From<StoreAddr> for Params {
|
||||||
fn from(x: StoreAddr) -> Self {
|
fn from(x: StoreAddr) -> Self {
|
||||||
Self::Store(x)
|
Self::Store(x)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<ParamsHeapless> for Params {
|
impl From<ParamsHeapless> for Params {
|
||||||
fn from(x: ParamsHeapless) -> Self {
|
fn from(x: ParamsHeapless) -> Self {
|
||||||
Self::Heapless(x)
|
Self::Heapless(x)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<Vec<u8>> for Params {
|
#[cfg(feature = "alloc")]
|
||||||
fn from(val: Vec<u8>) -> Self {
|
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
|
||||||
Self::Vec(val)
|
impl From<Vec<u8>> for Params {
|
||||||
}
|
fn from(val: Vec<u8>) -> Self {
|
||||||
|
Self::Vec(val)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Converts a byte slice into the [Params::Vec] variant
|
/// Converts a byte slice into the [Params::Vec] variant
|
||||||
impl From<&[u8]> for Params {
|
#[cfg(feature = "alloc")]
|
||||||
fn from(val: &[u8]) -> Self {
|
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
|
||||||
Self::Vec(val.to_vec())
|
impl From<&[u8]> for Params {
|
||||||
}
|
fn from(val: &[u8]) -> Self {
|
||||||
|
Self::Vec(val.to_vec())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<String> for Params {
|
#[cfg(feature = "alloc")]
|
||||||
fn from(val: String) -> Self {
|
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
|
||||||
Self::String(val)
|
impl From<String> for Params {
|
||||||
}
|
fn from(val: String) -> Self {
|
||||||
|
Self::String(val)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Converts a string slice into the [Params::String] variant
|
#[cfg(feature = "alloc")]
|
||||||
impl From<&str> for Params {
|
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
|
||||||
fn from(val: &str) -> Self {
|
/// Converts a string slice into the [Params::String] variant
|
||||||
Self::String(val.to_string())
|
impl From<&str> for Params {
|
||||||
}
|
fn from(val: &str) -> Self {
|
||||||
|
Self::String(val.to_string())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,7 +44,7 @@ pub mod alloc_mod {
|
|||||||
/// - Checking the validity of the APID, service ID, subservice ID.
|
/// - Checking the validity of the APID, service ID, subservice ID.
|
||||||
/// - Checking the validity of the user data.
|
/// - Checking the validity of the user data.
|
||||||
///
|
///
|
||||||
/// A [VerificationReporterWithSender] instance is passed to the user to also allow handling
|
/// A [VerificationReportingProvider] instance is passed to the user to also allow handling
|
||||||
/// of the verification process as part of the PUS standard requirements.
|
/// of the verification process as part of the PUS standard requirements.
|
||||||
pub trait PusActionToRequestConverter {
|
pub trait PusActionToRequestConverter {
|
||||||
type Error;
|
type Error;
|
||||||
|
@ -2,8 +2,6 @@ use crate::events::{EventU32, GenericEvent, Severity};
|
|||||||
#[cfg(feature = "alloc")]
|
#[cfg(feature = "alloc")]
|
||||||
use crate::events::{EventU32TypedSev, HasSeverity};
|
use crate::events::{EventU32TypedSev, HasSeverity};
|
||||||
#[cfg(feature = "alloc")]
|
#[cfg(feature = "alloc")]
|
||||||
use alloc::boxed::Box;
|
|
||||||
#[cfg(feature = "alloc")]
|
|
||||||
use core::hash::Hash;
|
use core::hash::Hash;
|
||||||
#[cfg(feature = "alloc")]
|
#[cfg(feature = "alloc")]
|
||||||
use hashbrown::HashSet;
|
use hashbrown::HashSet;
|
||||||
@ -32,12 +30,12 @@ pub use heapless_mod::*;
|
|||||||
/// structure to track disabled events. A more primitive and embedded friendly
|
/// structure to track disabled events. A more primitive and embedded friendly
|
||||||
/// solution could track this information in a static or pre-allocated list which contains
|
/// solution could track this information in a static or pre-allocated list which contains
|
||||||
/// the disabled events.
|
/// the disabled events.
|
||||||
pub trait PusEventMgmtBackendProvider<Provider: GenericEvent> {
|
pub trait PusEventMgmtBackendProvider<Event: GenericEvent> {
|
||||||
type Error;
|
type Error;
|
||||||
|
|
||||||
fn event_enabled(&self, event: &Provider) -> bool;
|
fn event_enabled(&self, event: &Event) -> bool;
|
||||||
fn enable_event_reporting(&mut self, event: &Provider) -> Result<bool, Self::Error>;
|
fn enable_event_reporting(&mut self, event: &Event) -> Result<bool, Self::Error>;
|
||||||
fn disable_event_reporting(&mut self, event: &Provider) -> Result<bool, Self::Error>;
|
fn disable_event_reporting(&mut self, event: &Event) -> Result<bool, Self::Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "heapless")]
|
#[cfg(feature = "heapless")]
|
||||||
@ -108,6 +106,10 @@ impl From<EcssTmtcError> for EventManError {
|
|||||||
|
|
||||||
#[cfg(feature = "alloc")]
|
#[cfg(feature = "alloc")]
|
||||||
pub mod alloc_mod {
|
pub mod alloc_mod {
|
||||||
|
use core::marker::PhantomData;
|
||||||
|
|
||||||
|
use crate::events::EventU16;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
/// Default backend provider which uses a hash set as the event reporting status container
|
/// Default backend provider which uses a hash set as the event reporting status container
|
||||||
@ -115,14 +117,11 @@ pub mod alloc_mod {
|
|||||||
///
|
///
|
||||||
/// This provider is a good option for host systems or larger embedded systems where
|
/// This provider is a good option for host systems or larger embedded systems where
|
||||||
/// the expected occasional memory allocation performed by the [HashSet] is not an issue.
|
/// the expected occasional memory allocation performed by the [HashSet] is not an issue.
|
||||||
pub struct DefaultPusMgmtBackendProvider<Event: GenericEvent = EventU32> {
|
pub struct DefaultPusEventMgmtBackend<Event: GenericEvent = EventU32> {
|
||||||
disabled: HashSet<Event>,
|
disabled: HashSet<Event>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Safety: All contained field are [Send] as well
|
impl<Event: GenericEvent> Default for DefaultPusEventMgmtBackend<Event> {
|
||||||
unsafe impl<Event: GenericEvent + Send> Send for DefaultPusMgmtBackendProvider<Event> {}
|
|
||||||
|
|
||||||
impl<Event: GenericEvent> Default for DefaultPusMgmtBackendProvider<Event> {
|
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
disabled: HashSet::default(),
|
disabled: HashSet::default(),
|
||||||
@ -130,46 +129,50 @@ pub mod alloc_mod {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Provider: GenericEvent + PartialEq + Eq + Hash + Copy + Clone>
|
impl<EV: GenericEvent + PartialEq + Eq + Hash + Copy + Clone> PusEventMgmtBackendProvider<EV>
|
||||||
PusEventMgmtBackendProvider<Provider> for DefaultPusMgmtBackendProvider<Provider>
|
for DefaultPusEventMgmtBackend<EV>
|
||||||
{
|
{
|
||||||
type Error = ();
|
type Error = ();
|
||||||
fn event_enabled(&self, event: &Provider) -> bool {
|
|
||||||
|
fn event_enabled(&self, event: &EV) -> bool {
|
||||||
!self.disabled.contains(event)
|
!self.disabled.contains(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn enable_event_reporting(&mut self, event: &Provider) -> Result<bool, Self::Error> {
|
fn enable_event_reporting(&mut self, event: &EV) -> Result<bool, Self::Error> {
|
||||||
Ok(self.disabled.remove(event))
|
Ok(self.disabled.remove(event))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn disable_event_reporting(&mut self, event: &Provider) -> Result<bool, Self::Error> {
|
fn disable_event_reporting(&mut self, event: &EV) -> Result<bool, Self::Error> {
|
||||||
Ok(self.disabled.insert(*event))
|
Ok(self.disabled.insert(*event))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct PusEventDispatcher<BackendError, Provider: GenericEvent> {
|
pub struct PusEventDispatcher<
|
||||||
|
B: PusEventMgmtBackendProvider<EV, Error = E>,
|
||||||
|
EV: GenericEvent,
|
||||||
|
E,
|
||||||
|
> {
|
||||||
reporter: EventReporter,
|
reporter: EventReporter,
|
||||||
backend: Box<dyn PusEventMgmtBackendProvider<Provider, Error = BackendError>>,
|
backend: B,
|
||||||
|
phantom: PhantomData<(E, EV)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Safety: All contained fields are send as well.
|
impl<B: PusEventMgmtBackendProvider<EV, Error = E>, EV: GenericEvent, E>
|
||||||
unsafe impl<E: Send, Event: GenericEvent + Send> Send for PusEventDispatcher<E, Event> {}
|
PusEventDispatcher<B, EV, E>
|
||||||
|
{
|
||||||
impl<BackendError, Provider: GenericEvent> PusEventDispatcher<BackendError, Provider> {
|
pub fn new(reporter: EventReporter, backend: B) -> Self {
|
||||||
pub fn new(
|
Self {
|
||||||
reporter: EventReporter,
|
reporter,
|
||||||
backend: Box<dyn PusEventMgmtBackendProvider<Provider, Error = BackendError>>,
|
backend,
|
||||||
) -> Self {
|
phantom: PhantomData,
|
||||||
Self { reporter, backend }
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<BackendError, Event: GenericEvent> PusEventDispatcher<BackendError, Event> {
|
pub fn enable_tm_for_event(&mut self, event: &EV) -> Result<bool, E> {
|
||||||
pub fn enable_tm_for_event(&mut self, event: &Event) -> Result<bool, BackendError> {
|
|
||||||
self.backend.enable_event_reporting(event)
|
self.backend.enable_event_reporting(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn disable_tm_for_event(&mut self, event: &Event) -> Result<bool, BackendError> {
|
pub fn disable_tm_for_event(&mut self, event: &EV) -> Result<bool, E> {
|
||||||
self.backend.disable_event_reporting(event)
|
self.backend.disable_event_reporting(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -177,7 +180,7 @@ pub mod alloc_mod {
|
|||||||
&mut self,
|
&mut self,
|
||||||
sender: &mut (impl EcssTmSenderCore + ?Sized),
|
sender: &mut (impl EcssTmSenderCore + ?Sized),
|
||||||
time_stamp: &[u8],
|
time_stamp: &[u8],
|
||||||
event: Event,
|
event: EV,
|
||||||
aux_data: Option<&[u8]>,
|
aux_data: Option<&[u8]>,
|
||||||
) -> Result<bool, EventManError> {
|
) -> Result<bool, EventManError> {
|
||||||
if !self.backend.event_enabled(&event) {
|
if !self.backend.event_enabled(&event) {
|
||||||
@ -208,18 +211,30 @@ pub mod alloc_mod {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<BackendError> PusEventDispatcher<BackendError, EventU32> {
|
impl<EV: GenericEvent + Copy + PartialEq + Eq + Hash>
|
||||||
|
PusEventDispatcher<DefaultPusEventMgmtBackend<EV>, EV, ()>
|
||||||
|
{
|
||||||
|
pub fn new_with_default_backend(reporter: EventReporter) -> Self {
|
||||||
|
Self {
|
||||||
|
reporter,
|
||||||
|
backend: DefaultPusEventMgmtBackend::default(),
|
||||||
|
phantom: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: PusEventMgmtBackendProvider<EventU32, Error = E>, E> PusEventDispatcher<B, EventU32, E> {
|
||||||
pub fn enable_tm_for_event_with_sev<Severity: HasSeverity>(
|
pub fn enable_tm_for_event_with_sev<Severity: HasSeverity>(
|
||||||
&mut self,
|
&mut self,
|
||||||
event: &EventU32TypedSev<Severity>,
|
event: &EventU32TypedSev<Severity>,
|
||||||
) -> Result<bool, BackendError> {
|
) -> Result<bool, E> {
|
||||||
self.backend.enable_event_reporting(event.as_ref())
|
self.backend.enable_event_reporting(event.as_ref())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn disable_tm_for_event_with_sev<Severity: HasSeverity>(
|
pub fn disable_tm_for_event_with_sev<Severity: HasSeverity>(
|
||||||
&mut self,
|
&mut self,
|
||||||
event: &EventU32TypedSev<Severity>,
|
event: &EventU32TypedSev<Severity>,
|
||||||
) -> Result<bool, BackendError> {
|
) -> Result<bool, E> {
|
||||||
self.backend.disable_event_reporting(event.as_ref())
|
self.backend.disable_event_reporting(event.as_ref())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -233,6 +248,11 @@ pub mod alloc_mod {
|
|||||||
self.generate_pus_event_tm_generic(sender, time_stamp, event.into(), aux_data)
|
self.generate_pus_event_tm_generic(sender, time_stamp, event.into(), aux_data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub type DefaultPusEventU16Dispatcher<E> =
|
||||||
|
PusEventDispatcher<DefaultPusEventMgmtBackend<EventU16>, EventU16, E>;
|
||||||
|
pub type DefaultPusEventU32Dispatcher<E> =
|
||||||
|
PusEventDispatcher<DefaultPusEventMgmtBackend<EventU32>, EventU32, E>;
|
||||||
}
|
}
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
@ -246,15 +266,19 @@ mod tests {
|
|||||||
const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5);
|
const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5);
|
||||||
const EMPTY_STAMP: [u8; 7] = [0; 7];
|
const EMPTY_STAMP: [u8; 7] = [0; 7];
|
||||||
|
|
||||||
fn create_basic_man() -> PusEventDispatcher<(), EventU32> {
|
fn create_basic_man_1() -> DefaultPusEventU32Dispatcher<()> {
|
||||||
let reporter = EventReporter::new(0x02, 128).expect("Creating event repoter failed");
|
let reporter = EventReporter::new(0x02, 128).expect("Creating event repoter failed");
|
||||||
let backend = DefaultPusMgmtBackendProvider::<EventU32>::default();
|
PusEventDispatcher::new_with_default_backend(reporter)
|
||||||
PusEventDispatcher::new(reporter, Box::new(backend))
|
}
|
||||||
|
fn create_basic_man_2() -> DefaultPusEventU32Dispatcher<()> {
|
||||||
|
let reporter = EventReporter::new(0x02, 128).expect("Creating event repoter failed");
|
||||||
|
let backend = DefaultPusEventMgmtBackend::default();
|
||||||
|
PusEventDispatcher::new(reporter, backend)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_basic() {
|
fn test_basic() {
|
||||||
let mut event_man = create_basic_man();
|
let mut event_man = create_basic_man_1();
|
||||||
let (event_tx, event_rx) = channel();
|
let (event_tx, event_rx) = channel();
|
||||||
let mut sender = MpscTmAsVecSender::new(0, "test_sender", event_tx);
|
let mut sender = MpscTmAsVecSender::new(0, "test_sender", event_tx);
|
||||||
let event_sent = event_man
|
let event_sent = event_man
|
||||||
@ -268,7 +292,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_disable_event() {
|
fn test_disable_event() {
|
||||||
let mut event_man = create_basic_man();
|
let mut event_man = create_basic_man_2();
|
||||||
let (event_tx, event_rx) = channel();
|
let (event_tx, event_rx) = channel();
|
||||||
let mut sender = MpscTmAsVecSender::new(0, "test", event_tx);
|
let mut sender = MpscTmAsVecSender::new(0, "test", event_tx);
|
||||||
let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT);
|
let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT);
|
||||||
@ -291,7 +315,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_reenable_event() {
|
fn test_reenable_event() {
|
||||||
let mut event_man = create_basic_man();
|
let mut event_man = create_basic_man_1();
|
||||||
let (event_tx, event_rx) = channel();
|
let (event_tx, event_rx) = channel();
|
||||||
let mut sender = MpscTmAsVecSender::new(0, "test", event_tx);
|
let mut sender = MpscTmAsVecSender::new(0, "test", event_tx);
|
||||||
let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT);
|
let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT);
|
||||||
|
@ -46,7 +46,7 @@ pub mod alloc_mod {
|
|||||||
/// - Checking the validity of the APID, service ID, subservice ID.
|
/// - Checking the validity of the APID, service ID, subservice ID.
|
||||||
/// - Checking the validity of the user data.
|
/// - Checking the validity of the user data.
|
||||||
///
|
///
|
||||||
/// A [VerificationReporterWithSender] instance is passed to the user to also allow handling
|
/// A [VerificationReportingProvider] is passed to the user to also allow handling
|
||||||
/// of the verification process as part of the PUS standard requirements.
|
/// of the verification process as part of the PUS standard requirements.
|
||||||
pub trait PusHkToRequestConverter {
|
pub trait PusHkToRequestConverter {
|
||||||
type Error;
|
type Error;
|
||||||
@ -78,9 +78,9 @@ pub mod std_mod {
|
|||||||
/// 1. Retrieve the next TC packet from the [PusServiceHelper]. The [EcssTcInMemConverter]
|
/// 1. Retrieve the next TC packet from the [PusServiceHelper]. The [EcssTcInMemConverter]
|
||||||
/// allows to configure the used telecommand memory backend.
|
/// allows to configure the used telecommand memory backend.
|
||||||
/// 2. Convert the TC to a targeted action request using the provided
|
/// 2. Convert the TC to a targeted action request using the provided
|
||||||
/// [PusActionToRequestConverter]. The generic error type is constrained to the
|
/// [PusHkToRequestConverter]. The generic error type is constrained to the
|
||||||
/// [PusPacketHandlerResult] for the concrete implementation which offers a packet handler.
|
/// [PusPacketHandlerResult] for the concrete implementation which offers a packet handler.
|
||||||
/// 3. Route the action request using the provided [PusActionRequestRouter]. The generic error
|
/// 3. Route the action request using the provided [PusHkRequestRouter]. The generic error
|
||||||
/// type is constrained to the [GenericRoutingError] for the concrete implementation.
|
/// type is constrained to the [GenericRoutingError] for the concrete implementation.
|
||||||
/// 4. Handle all routing errors using the provided [PusRoutingErrorHandler]. The generic error
|
/// 4. Handle all routing errors using the provided [PusRoutingErrorHandler]. The generic error
|
||||||
/// type is constrained to the [GenericRoutingError] for the concrete implementation.
|
/// type is constrained to the [GenericRoutingError] for the concrete implementation.
|
||||||
|
@ -815,8 +815,6 @@ pub mod std_mod {
|
|||||||
pub tc_receiver: Box<dyn EcssTcReceiver>,
|
pub tc_receiver: Box<dyn EcssTcReceiver>,
|
||||||
pub tm_sender: Box<dyn EcssTmSender>,
|
pub tm_sender: Box<dyn EcssTmSender>,
|
||||||
pub tm_apid: u16,
|
pub tm_apid: u16,
|
||||||
/// The verification handler is wrapped in a [RefCell] to allow the interior mutability
|
|
||||||
/// pattern. This makes writing methods which are not mutable a lot easier.
|
|
||||||
pub verification_handler: VerificationReporter,
|
pub verification_handler: VerificationReporter,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -886,7 +884,7 @@ pub mod std_mod {
|
|||||||
/// This function can be used to poll the internal [EcssTcReceiver] object for the next
|
/// This function can be used to poll the internal [EcssTcReceiver] object for the next
|
||||||
/// telecommand packet. It will return `Ok(None)` if there are not packets available.
|
/// telecommand packet. It will return `Ok(None)` if there are not packets available.
|
||||||
/// In any other case, it will perform the acceptance of the ECSS TC packet using the
|
/// In any other case, it will perform the acceptance of the ECSS TC packet using the
|
||||||
/// internal [VerificationReporterWithSender] object. It will then return the telecommand
|
/// internal [VerificationReportingProvider] object. It will then return the telecommand
|
||||||
/// and the according accepted token.
|
/// and the according accepted token.
|
||||||
pub fn retrieve_and_accept_next_packet(
|
pub fn retrieve_and_accept_next_packet(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
use satrs::event_man::{
|
use satrs::event_man::{
|
||||||
EventManagerWithMpscQueue, MpscEventU32Receiver, MpscEventU32SendProvider, SendEventProvider,
|
EventManagerWithMpsc, EventSendProvider, EventU32SenderMpsc, MpscEventU32Receiver,
|
||||||
};
|
};
|
||||||
use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo};
|
use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo};
|
||||||
use satrs::params::U32Pair;
|
use satrs::params::U32Pair;
|
||||||
use satrs::params::{Params, ParamsHeapless, WritableToBeBytes};
|
use satrs::params::{Params, ParamsHeapless, WritableToBeBytes};
|
||||||
use satrs::pus::event_man::{DefaultPusMgmtBackendProvider, EventReporter, PusEventDispatcher};
|
use satrs::pus::event_man::{DefaultPusEventMgmtBackend, EventReporter, PusEventDispatcher};
|
||||||
use satrs::pus::MpscTmAsVecSender;
|
use satrs::pus::MpscTmAsVecSender;
|
||||||
use spacepackets::ecss::tm::PusTmReader;
|
use spacepackets::ecss::tm::PusTmReader;
|
||||||
use spacepackets::ecss::{PusError, PusPacket};
|
use spacepackets::ecss::{PusError, PusPacket};
|
||||||
@ -26,16 +26,16 @@ pub enum CustomTmSenderError {
|
|||||||
fn test_threaded_usage() {
|
fn test_threaded_usage() {
|
||||||
let (event_sender, event_man_receiver) = channel();
|
let (event_sender, event_man_receiver) = channel();
|
||||||
let event_receiver = MpscEventU32Receiver::new(event_man_receiver);
|
let event_receiver = MpscEventU32Receiver::new(event_man_receiver);
|
||||||
let mut event_man = EventManagerWithMpscQueue::new(Box::new(event_receiver));
|
let mut event_man = EventManagerWithMpsc::new(event_receiver);
|
||||||
|
|
||||||
let (pus_event_man_tx, pus_event_man_rx) = channel();
|
let (pus_event_man_tx, pus_event_man_rx) = channel();
|
||||||
let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx);
|
let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx);
|
||||||
event_man.subscribe_all(pus_event_man_send_provider.id());
|
event_man.subscribe_all(pus_event_man_send_provider.id());
|
||||||
event_man.add_sender(pus_event_man_send_provider);
|
event_man.add_sender(pus_event_man_send_provider);
|
||||||
let (event_tx, event_rx) = channel();
|
let (event_tx, event_rx) = channel();
|
||||||
let reporter = EventReporter::new(0x02, 128).expect("Creating event reporter failed");
|
let reporter = EventReporter::new(0x02, 128).expect("Creating event reporter failed");
|
||||||
let backend = DefaultPusMgmtBackendProvider::<EventU32>::default();
|
let mut pus_event_man =
|
||||||
let mut pus_event_man = PusEventDispatcher::new(reporter, Box::new(backend));
|
PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default());
|
||||||
// PUS + Generic event manager thread
|
// PUS + Generic event manager thread
|
||||||
let jh0 = thread::spawn(move || {
|
let jh0 = thread::spawn(move || {
|
||||||
let mut sender = MpscTmAsVecSender::new(0, "event_sender", event_tx);
|
let mut sender = MpscTmAsVecSender::new(0, "event_sender", event_tx);
|
||||||
@ -71,6 +71,7 @@ fn test_threaded_usage() {
|
|||||||
Params::Vec(vec) => gen_event(Some(vec.as_slice())),
|
Params::Vec(vec) => gen_event(Some(vec.as_slice())),
|
||||||
Params::String(str) => gen_event(Some(str.as_bytes())),
|
Params::String(str) => gen_event(Some(str.as_bytes())),
|
||||||
Params::Store(_) => gen_event(None),
|
Params::Store(_) => gen_event(None),
|
||||||
|
_ => panic!("unsupported parameter type"),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
gen_event(None)
|
gen_event(None)
|
||||||
@ -120,10 +121,7 @@ fn test_threaded_usage() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
event_sender
|
event_sender
|
||||||
.send((
|
.send((LOW_SEV_EVENT, Some(Params::Heapless((2_u32, 3_u32).into()))))
|
||||||
LOW_SEV_EVENT.into(),
|
|
||||||
Some(Params::Heapless((2_u32, 3_u32).into())),
|
|
||||||
))
|
|
||||||
.expect("Sending low severity event failed");
|
.expect("Sending low severity event failed");
|
||||||
loop {
|
loop {
|
||||||
match event_rx.try_recv() {
|
match event_rx.try_recv() {
|
||||||
|
Loading…
Reference in New Issue
Block a user