simplified event management
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good

This commit is contained in:
2024-04-24 13:06:12 +02:00
parent bfaddd0ebb
commit 4fe95edecd
5 changed files with 71 additions and 110 deletions

View File

@ -21,8 +21,8 @@
//!
//! 1. Provide a concrete [EventReceiveProvider] implementation. This abstraction allow to use different
//! message queue backends. A straightforward implementation where dynamic memory allocation is
//! not a big concern could use [std::sync::mpsc::channel] to do this and is provided in
//! form of the [MpscEventReceiver].
//! not a big concern would be to use the [std::sync::mpsc::Receiver] handle. The trait is
//! already implemented for this type.
//! 2. To set up event creators, create channel pairs using some message queue implementation.
//! Each event creator gets a (cloned) sender component which allows it to send events to the
//! manager.
@ -157,9 +157,10 @@ pub trait SenderMapProvider<
/// * `SenderMap`: [SenderMapProvider] which maps channel IDs to send providers.
/// * `ListenerMap`: [ListenerMapProvider] which maps listener keys to channel IDs.
/// * `EventSender`: [EventSendProvider] contained within the sender map which sends the events.
/// * `Ev`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32]
/// * `Event`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32]
/// and [EventU16] are supported.
/// * `Data`: Auxiliary data which is sent with the event to provide optional context information
/// * `ParamProvider`: Auxiliary data which is sent with the event to provide optional context
/// information
pub struct EventManager<
EventReceiver: EventReceiveProvider<Event, ParamProvider>,
SenderMap: SenderMapProvider<EventSender, Event, ParamProvider>,
@ -331,11 +332,11 @@ pub mod alloc_mod {
/// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap]
/// and the [DefaultListenerMap]. It uses regular mpsc channels as the message queue backend.
pub type EventManagerWithMpsc<EV = EventU32, AUX = Params> = EventManager<
MpscEventReceiver,
DefaultSenderMap<EventSenderMpsc<EV>, EV, AUX>,
pub type EventManagerWithMpsc<Event = EventU32, ParamProvider = Params> = EventManager<
EventU32ReceiverMpsc<ParamProvider>,
DefaultSenderMap<EventSenderMpsc<Event>, Event, ParamProvider>,
DefaultListenerMap,
EventSenderMpsc<EV>,
EventSenderMpsc<Event>,
>;
/// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap]
@ -343,7 +344,7 @@ pub mod alloc_mod {
/// [bounded mpsc senders](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) as the
/// message queue backend.
pub type EventManagerWithBoundedMpsc<Event = EventU32, ParamProvider = Params> = EventManager<
MpscEventReceiver,
EventU32ReceiverMpsc<ParamProvider>,
DefaultSenderMap<EventSenderMpscBounded<Event>, Event, ParamProvider>,
DefaultListenerMap,
EventSenderMpscBounded<Event>,
@ -479,20 +480,16 @@ pub mod std_mod {
use super::*;
use std::sync::mpsc;
pub struct MpscEventReceiver<Event: GenericEvent + Send = EventU32> {
receiver: mpsc::Receiver<EventMessage<Event>>,
}
impl<Event: GenericEvent + Send> MpscEventReceiver<Event> {
pub fn new(receiver: mpsc::Receiver<EventMessage<Event>>) -> Self {
Self { receiver }
}
}
impl<Event: GenericEvent + Send> EventReceiveProvider<Event> for MpscEventReceiver<Event> {
impl<Event: GenericEvent + Send, ParamProvider: Debug>
EventReceiveProvider<Event, ParamProvider>
for mpsc::Receiver<EventMessage<Event, ParamProvider>>
{
type Error = GenericReceiveError;
fn try_recv_event(&self) -> Result<Option<EventMessage<Event>>, Self::Error> {
match self.receiver.try_recv() {
fn try_recv_event(
&self,
) -> Result<Option<EventMessage<Event, ParamProvider>>, Self::Error> {
match self.try_recv() {
Ok(msg) => Ok(Some(msg)),
Err(e) => match e {
mpsc::TryRecvError::Empty => Ok(None),
@ -504,8 +501,10 @@ pub mod std_mod {
}
}
pub type MpscEventU32Receiver = MpscEventReceiver<EventU32>;
pub type MpscEventU16Receiver = MpscEventReceiver<EventU16>;
pub type EventU32ReceiverMpsc<ParamProvider = Params> =
mpsc::Receiver<EventMessage<EventU32, ParamProvider>>;
pub type EventU16ReceiverMpsc<ParamProvider = Params> =
mpsc::Receiver<EventMessage<EventU16, ParamProvider>>;
/// Generic event sender which uses a regular [mpsc::Sender] as the messaging backend to
/// send events.
@ -624,9 +623,8 @@ mod tests {
}
fn generic_event_man() -> (mpsc::Sender<EventMessageU32>, EventManagerWithMpsc) {
let (event_sender, manager_queue) = mpsc::channel();
let event_man_receiver = MpscEventReceiver::new(manager_queue);
(event_sender, EventManager::new(event_man_receiver))
let (event_sender, event_receiver) = mpsc::channel();
(event_sender, EventManager::new(event_receiver))
}
#[test]
@ -793,9 +791,8 @@ mod tests {
let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| {
panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
};
let (event_sender, manager_queue) = mpsc::channel();
let event_man_receiver = MpscEventReceiver::new(manager_queue);
let mut event_man = EventManagerWithMpsc::new(event_man_receiver);
let (event_sender, event_receiver) = mpsc::channel();
let mut event_man = EventManagerWithMpsc::new(event_receiver);
let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap();
let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
let (event_0_tx_0, all_events_rx) = mpsc::channel();

View File

@ -1,6 +1,6 @@
use satrs::event_man::{
EventManagerWithMpsc, EventMessage, EventMessageU32, EventRoutingError, EventSendProvider,
EventU32SenderMpsc, MpscEventU32Receiver,
EventU32SenderMpsc,
};
use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo};
use satrs::params::U32Pair;
@ -29,15 +29,14 @@ pub enum CustomTmSenderError {
#[test]
fn test_threaded_usage() {
let (event_sender, event_man_receiver) = mpsc::channel();
let event_receiver = MpscEventU32Receiver::new(event_man_receiver);
let mut event_man = EventManagerWithMpsc::new(event_receiver);
let (event_tx, event_rx) = mpsc::sync_channel(100);
let mut event_man = EventManagerWithMpsc::new(event_rx);
let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel();
let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx);
event_man.subscribe_all(pus_event_man_send_provider.target_id());
event_man.add_sender(pus_event_man_send_provider);
let (event_tx, event_rx) = mpsc::channel::<PacketAsVec>();
let (event_packet_tx, event_packet_rx) = mpsc::channel::<PacketAsVec>();
let reporter =
EventReporter::new(TEST_ID.raw(), 0x02, 0, 128).expect("Creating event reporter failed");
let pus_event_man = PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default());
@ -54,7 +53,7 @@ fn test_threaded_usage() {
Ok(event_msg) => {
let gen_event = |aux_data| {
pus_event_man.generate_pus_event_tm_generic(
&event_tx,
&event_packet_tx,
&EMPTY_STAMP,
event_msg.event(),
aux_data,
@ -100,14 +99,14 @@ fn test_threaded_usage() {
// Event sender and TM checker thread
let jh1 = thread::spawn(move || {
event_sender
event_tx
.send(EventMessage::new(
TEST_COMPONENT_ID_0.id(),
INFO_EVENT.into(),
))
.expect("Sending info event failed");
loop {
match event_rx.try_recv() {
match event_packet_rx.try_recv() {
// Event TM received successfully
Ok(event_tm) => {
let tm = PusTmReader::new(event_tm.packet.as_slice(), 7)
@ -129,7 +128,7 @@ fn test_threaded_usage() {
}
}
}
event_sender
event_tx
.send(EventMessage::new_with_params(
TEST_COMPONENT_ID_0.id(),
LOW_SEV_EVENT,
@ -137,7 +136,7 @@ fn test_threaded_usage() {
))
.expect("Sending low severity event failed");
loop {
match event_rx.try_recv() {
match event_packet_rx.try_recv() {
// Event TM received successfully
Ok(event_tm) => {
let tm = PusTmReader::new(event_tm.packet.as_slice(), 7)