new event module

This commit is contained in:
Robin Mueller
2025-10-16 15:22:44 +02:00
parent fe7e4fac59
commit 4b9de3976d
5 changed files with 198 additions and 165 deletions

View File

@@ -25,11 +25,12 @@ thiserror = { version = "2", default-features = false }
hashbrown = { version = ">=0.14, <=0.15", optional = true } hashbrown = { version = ">=0.14, <=0.15", optional = true }
static_cell = { version = "2" } static_cell = { version = "2" }
heapless = { version = "0.9" } heapless = { version = "0.9", optional = true }
dyn-clone = { version = "1", optional = true } dyn-clone = { version = "1", optional = true }
downcast-rs = { version = "2", default-features = false, optional = true } downcast-rs = { version = "2", default-features = false, optional = true }
bus = { version = "2.2", optional = true } bus = { version = "2.2", optional = true }
crossbeam-channel = { version = "0.5", default-features = false, optional = true } crossbeam-channel = { version = "0.5", default-features = false, optional = true }
postcard = { version = "1", features = ["alloc"] }
serde = { version = "1", default-features = false, optional = true } serde = { version = "1", default-features = false, optional = true }
socket2 = { version = "0.6", features = ["all"], optional = true } socket2 = { version = "0.6", features = ["all"], optional = true }
arbitrary-int = "2" arbitrary-int = "2"
@@ -48,7 +49,7 @@ tempfile = "3"
version = "1" version = "1"
[features] [features]
default = ["std"] default = ["std", "heapless"]
std = [ std = [
"downcast-rs/std", "downcast-rs/std",
"alloc", "alloc",

View File

@@ -48,13 +48,13 @@
//! The [PUS event](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/pus/event.rs) //! The [PUS event](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/pus/event.rs)
//! module and the generic [events module](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/events.rs) //! module and the generic [events module](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/events.rs)
//! show how the event management modules can be integrated into a more complex software. //! show how the event management modules can be integrated into a more complex software.
use core::{marker::PhantomData, option::Iter};
use crate::{ use crate::{
ComponentId, ComponentId,
events2::{Event, EventDynParam, EventId, GroupId}, events2::{Event, EventId, GroupId},
queue::GenericSendError, queue::GenericSendError,
}; };
use core::marker::PhantomData;
use core::slice::Iter;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub use alloc_mod::*; pub use alloc_mod::*;
@@ -70,17 +70,16 @@ pub enum ListenerKey {
All, All,
} }
#[derive(Debug)] #[derive(Debug, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "defmt", derive(defmt::Format))] #[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct EventMessage<EventInstance: Event> { pub struct EventMessage<EventInstance> {
sender_id: ComponentId, sender_id: ComponentId,
event: EventInstance, event: EventInstance,
} }
impl<EventInstance: Event> EventMessage<EventInstance> { impl<EventInstance: Event> EventMessage<EventInstance> {
pub fn new(sender_id: ComponentId, event: &EventInstance) -> Self { pub fn new(sender_id: ComponentId, event: EventInstance) -> Self {
EventMessage { sender_id, event: event.clone() } EventMessage { sender_id, event }
} }
pub fn sender_id(&self) -> ComponentId { pub fn sender_id(&self) -> ComponentId {
@@ -283,10 +282,9 @@ impl<
if let Some(ids) = self.listener_map.get_listener_ids(key) { if let Some(ids) = self.listener_map.get_listener_ids(key) {
for id in ids { for id in ids {
if let Some(sender) = self.sender_map.get_send_event_provider(id) { if let Some(sender) = self.sender_map.get_send_event_provider(id) {
if let Err(e) = sender.send(EventMessage::new( if let Err(e) = sender
event_msg.sender_id, .send(EventMessage::new(event_msg.sender_id, event_msg.event.clone()))
event_msg.event.clone(), {
)) {
error_handler(event_msg, EventRoutingError::Send(e)); error_handler(event_msg, EventRoutingError::Send(e));
} else { } else {
num_recipients += 1; num_recipients += 1;
@@ -320,11 +318,13 @@ pub mod alloc_mod {
use alloc::vec::Vec; use alloc::vec::Vec;
use hashbrown::HashMap; use hashbrown::HashMap;
use crate::events2::EventErasedAlloc;
use super::*; use super::*;
/// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap] /// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap]
/// and the [DefaultListenerMap]. It uses regular mpsc channels as the message queue backend. /// and the [DefaultListenerMap]. It uses regular mpsc channels as the message queue backend.
pub type EventManagerWithMpsc<EventInstance = EventDynParam> = EventManager< pub type EventManagerWithMpsc<EventInstance = EventErasedAlloc> = EventManager<
EventDynParamReceiverMpsc, EventDynParamReceiverMpsc,
DefaultSenderMap<EventSenderMpsc<EventInstance>, EventInstance>, DefaultSenderMap<EventSenderMpsc<EventInstance>, EventInstance>,
DefaultListenerMap, DefaultListenerMap,
@@ -336,7 +336,7 @@ pub mod alloc_mod {
/// and the [DefaultListenerMap]. It uses /// and the [DefaultListenerMap]. It uses
/// [bounded mpsc senders](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) as the /// [bounded mpsc senders](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) as the
/// message queue backend. /// message queue backend.
pub type EventManagerWithBoundedMpsc<EventInstance = EventDynParam> = EventManager< pub type EventManagerWithBoundedMpsc<EventInstance = EventErasedAlloc> = EventManager<
EventDynParamReceiverMpsc, EventDynParamReceiverMpsc,
DefaultSenderMap<EventSenderMpscBounded<EventInstance>, EventInstance>, DefaultSenderMap<EventSenderMpscBounded<EventInstance>, EventInstance>,
DefaultListenerMap, DefaultListenerMap,
@@ -417,7 +417,7 @@ pub mod alloc_mod {
/// Simple implementation which uses a [HashMap] internally. /// Simple implementation which uses a [HashMap] internally.
pub struct DefaultSenderMap< pub struct DefaultSenderMap<
EventSenderInstance: EventSender<EventInstance>, EventSenderInstance: EventSender<EventInstance>,
EventInstance: Event = EventDynParam, EventInstance: Event,
> { > {
senders: HashMap<ComponentId, EventSenderInstance>, senders: HashMap<ComponentId, EventSenderInstance>,
phantom: PhantomData<EventInstance>, phantom: PhantomData<EventInstance>,
@@ -460,7 +460,10 @@ pub mod alloc_mod {
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub mod std_mod { pub mod std_mod {
use crate::{events2::EventHeapless, queue::GenericReceiveError}; use crate::{
events2::{EventErasedAlloc, EventHeapless},
queue::GenericReceiveError,
};
use super::*; use super::*;
use std::sync::mpsc; use std::sync::mpsc;
@@ -483,7 +486,7 @@ pub mod std_mod {
} }
} }
pub type EventDynParamReceiverMpsc = mpsc::Receiver<EventMessage<EventDynParam>>; pub type EventDynParamReceiverMpsc = mpsc::Receiver<EventMessage<EventErasedAlloc>>;
pub type EventHeaplessReceiverMpsc<const N: usize> = pub type EventHeaplessReceiverMpsc<const N: usize> =
mpsc::Receiver<EventMessage<EventHeapless<N>>>; mpsc::Receiver<EventMessage<EventHeapless<N>>>;
@@ -563,9 +566,9 @@ pub mod std_mod {
} }
} }
pub type EventDynParamSenderMpsc = EventSenderMpsc<EventDynParam>; pub type EventDynParamSenderMpsc = EventSenderMpsc<EventErasedAlloc>;
pub type EventHeaplessSenderMpsc<const N: usize> = EventSenderMpsc<EventHeapless<N>>; pub type EventHeaplessSenderMpsc<const N: usize> = EventSenderMpsc<EventHeapless<N>>;
pub type EventDynParamSenderMpscBounded = EventSenderMpscBounded<EventDynParam>; pub type EventDynParamSenderMpscBounded = EventSenderMpscBounded<EventErasedAlloc>;
pub type EventHeaplessSenderMpscBounded<const N: usize> = pub type EventHeaplessSenderMpscBounded<const N: usize> =
EventSenderMpscBounded<EventHeapless<N>>; EventSenderMpscBounded<EventHeapless<N>>;
} }
@@ -575,52 +578,46 @@ mod tests {
use arbitrary_int::u14; use arbitrary_int::u14;
use super::*; use super::*;
use crate::events2::Severity; use crate::events2::{EventErasedAlloc, Severity};
use crate::params::{ParamsHeapless, ParamsRaw};
use crate::pus::test_util::{TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1}; use crate::pus::test_util::{TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1};
use std::format;
use std::sync::mpsc; use std::sync::mpsc;
use std::vec::Vec;
const TEST_GROUP_ID_0: u14 = u14::new(0); const TEST_GROUP_ID_0: u14 = u14::new(0);
const TEST_GROUP_ID_1: u14 = u14::new(1);
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)] #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, serde::Serialize, serde::Deserialize)]
pub enum TestEventIds { pub enum TestEvent {
TestInfo, Info,
TestError, Error,
InfoWithParams(u16),
OtherGroup,
} }
impl Event for TestEventIds { impl Event for TestEvent {
fn id(&self) -> EventId { fn id(&self) -> EventId {
match self { match self {
TestEventIds::TestInfo => EventId::new( TestEvent::Info => EventId::new(Severity::Info, TEST_GROUP_ID_0, 0),
Severity::Info, TestEvent::Error => EventId::new(Severity::High, TEST_GROUP_ID_0, 1),
TEST_GROUP_ID_0, TestEvent::InfoWithParams(_) => EventId::new(Severity::Info, TEST_GROUP_ID_0, 2),
TestEventIds::TestInfo as u16, TestEvent::OtherGroup => EventId::new(Severity::Info, TEST_GROUP_ID_1, 0),
),
TestEventIds::TestError => EventId::new(
Severity::High,
TEST_GROUP_ID_0,
TestEventIds::TestError as u16,
),
} }
} }
} }
fn check_next_event( fn check_next_event(
expected: EventDynParam, expected: TestEvent,
receiver: &mpsc::Receiver<EventMessage<EventDynParam>>, receiver: &mpsc::Receiver<EventMessage<EventErasedAlloc>>,
) -> Option<Vec<u8>> { ) {
if let Ok(event_msg) = receiver.try_recv() { if let Ok(event_msg) = receiver.try_recv() {
assert_eq!(event_msg.event, expected); assert_eq!(event_msg.event.id(), expected.id());
return event_msg.event.parameters().map(|p| p.to_vec()); let event: TestEvent = postcard::from_bytes(event_msg.event().raw()).unwrap();
assert_eq!(event, expected);
} }
None
} }
fn check_handled_event( fn check_handled_event(
res: EventRoutingResult<EventDynParam>, res: EventRoutingResult<EventErasedAlloc>,
expected: &EventDynParam, expected: &EventErasedAlloc,
expected_num_sent: u32, expected_num_sent: u32,
expected_sender_id: ComponentId, expected_sender_id: ComponentId,
) { ) {
@@ -637,7 +634,7 @@ mod tests {
} }
fn generic_event_man() -> ( fn generic_event_man() -> (
mpsc::Sender<EventMessage<EventDynParam>>, mpsc::Sender<EventMessage<EventErasedAlloc>>,
EventManagerWithMpsc, EventManagerWithMpsc,
) { ) {
let (event_sender, event_receiver) = mpsc::channel(); let (event_sender, event_receiver) = mpsc::channel();
@@ -647,197 +644,197 @@ mod tests {
#[test] #[test]
fn test_basic() { fn test_basic() {
let (event_sender, mut event_man) = generic_event_man(); let (event_sender, mut event_man) = generic_event_man();
//let event_grp_0 = EventU::new(Severity::Info, 0, 0);
//let event_grp_1_0 = EventU32::new(Severity::High, 1, 0);
let (single_event_sender, single_event_receiver) = mpsc::channel(); let (single_event_sender, single_event_receiver) = mpsc::channel();
let single_event_listener = EventSenderMpsc::new(0, single_event_sender); let single_event_listener = EventSenderMpsc::new(0, single_event_sender);
event_man.subscribe_single( event_man.subscribe_single(TestEvent::Info.id(), single_event_listener.target_id());
TestEventIds::TestInfo.id(),
single_event_listener.target_id(),
);
event_man.add_sender(single_event_listener); event_man.add_sender(single_event_listener);
let (group_event_sender_0, group_event_receiver_0) = mpsc::channel(); let (group_event_sender_0, group_event_receiver_0) = mpsc::channel();
let group_event_listener = EventDynParamSenderMpsc::new(1, group_event_sender_0); let group_event_listener = EventDynParamSenderMpsc::new(1, group_event_sender_0);
event_man.subscribe_group( event_man.subscribe_group(
TestEventIds::TestError.id().group_id(), TestEvent::OtherGroup.id().group_id(),
group_event_listener.target_id(), group_event_listener.target_id(),
); );
event_man.add_sender(group_event_listener); event_man.add_sender(group_event_listener);
let error_handler = |event_msg: &EventMessage<EventDynParam>, e: EventRoutingError| { let error_handler = |event_msg: &EventMessage<EventErasedAlloc>, e: EventRoutingError| {
panic!("routing error occurred for event {:?}: {:?}", event_msg, e); panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
}; };
let event_grp_0 = let event_grp_0 = EventErasedAlloc::new(&TestEvent::Info);
EventDynParam::new_no_params(TestEventIds::TestInfo.id()); let event_grp_1 = EventErasedAlloc::new(&TestEvent::OtherGroup);
let event_grp_1 =
EventDynParam::new_no_params(TestEventIds::TestError.id());
// Test event with one listener // Test event with one listener
event_sender event_sender
.send(EventMessage::new( .send(EventMessage::new(
TEST_COMPONENT_ID_0.id(), TEST_COMPONENT_ID_0.id(),
&event_grp_0 event_grp_0.clone(),
)) ))
.expect("Sending single error failed"); .expect("Sending single error failed");
let res = event_man.try_event_handling(&error_handler); let res = event_man.try_event_handling(&error_handler);
check_handled_event(res, &event_grp_0, 1, TEST_COMPONENT_ID_0.id()); check_handled_event(res, &event_grp_0, 1, TEST_COMPONENT_ID_0.id());
check_next_event(event_grp_0, &single_event_receiver); check_next_event(TestEvent::Info, &single_event_receiver);
// Test event which is sent to all group listeners // Test event which is sent to all group listeners
event_sender event_sender
.send(EventMessage::new(TEST_COMPONENT_ID_1.id(), &event_grp_1)) .send(EventMessage::new(
TEST_COMPONENT_ID_1.id(),
event_grp_1.clone(),
))
.expect("Sending group error failed"); .expect("Sending group error failed");
let res = event_man.try_event_handling(&error_handler); let res = event_man.try_event_handling(&error_handler);
check_handled_event(res, &event_grp_1, 1, TEST_COMPONENT_ID_1.id()); check_handled_event(res, &event_grp_1, 1, TEST_COMPONENT_ID_1.id());
check_next_event(event_grp_1, &group_event_receiver_0); check_next_event(TestEvent::OtherGroup, &group_event_receiver_0);
} }
#[test] #[test]
fn test_with_basic_params() { fn test_with_basic_params() {
let error_handler = |event_msg: &EventMessage<EventDynParam>, e: EventRoutingError| { let error_handler = |event_msg: &EventMessage<EventErasedAlloc>, e: EventRoutingError| {
panic!("routing error occurred for event {:?}: {:?}", event_msg, e); panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
}; };
let (event_sender, mut event_man) = generic_event_man(); let (event_sender, mut event_man) = generic_event_man();
let event_grp_0 = EventU32::new(Severity::Info, 0, 0); let event_0 = TestEvent::InfoWithParams(5);
let (single_event_sender, single_event_receiver) = mpsc::channel(); let (single_event_sender, single_event_receiver) = mpsc::channel();
let single_event_listener = EventSenderMpsc::new(0, single_event_sender); let single_event_listener = EventSenderMpsc::new(0, single_event_sender);
event_man.subscribe_single(&event_grp_0, single_event_listener.target_id()); event_man.subscribe_single(event_0.id(), single_event_listener.target_id());
event_man.add_sender(single_event_listener); event_man.add_sender(single_event_listener);
let event_0_erased = EventErasedAlloc::new(&event_0);
event_sender event_sender
.send(EventMessage::new_with_params( .send(EventMessage::new(
TEST_COMPONENT_ID_0.id(), TEST_COMPONENT_ID_0.id(),
event_grp_0, event_0_erased.clone(),
&Params::Heapless((2_u32, 3_u32).into()),
)) ))
.expect("Sending group error failed"); .expect("Sending group error failed");
let res = event_man.try_event_handling(&error_handler); let res = event_man.try_event_handling(&error_handler);
check_handled_event(res, event_grp_0, 1, TEST_COMPONENT_ID_0.id()); check_handled_event(res, &event_0_erased, 1, TEST_COMPONENT_ID_0.id());
let aux = check_next_event(event_grp_0, &single_event_receiver); check_next_event(event_0, &single_event_receiver);
assert!(aux.is_some());
let aux = aux.unwrap();
if let Params::Heapless(ParamsHeapless::Raw(ParamsRaw::U32Pair(pair))) = aux {
assert_eq!(pair.0, 2);
assert_eq!(pair.1, 3);
} else {
panic!("{}", format!("Unexpected auxiliary value type {:?}", aux));
}
} }
/// Test listening for multiple groups /// Test listening for multiple groups
#[test] #[test]
fn test_multi_group() { fn test_multi_group() {
let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| { let error_handler = |event_msg: &EventMessage<EventErasedAlloc>, e: EventRoutingError| {
panic!("routing error occurred for event {:?}: {:?}", event_msg, e); panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
}; };
let (event_sender, mut event_man) = generic_event_man(); let (event_sender, mut event_man) = generic_event_man();
let res = event_man.try_event_handling(error_handler); let res = event_man.try_event_handling(error_handler);
assert!(matches!(res, EventRoutingResult::Empty)); assert!(matches!(res, EventRoutingResult::Empty));
let event_grp_0 = EventU32::new(Severity::Info, 0, 0); let event_grp_0 = TestEvent::Info;
let event_grp_1_0 = EventU32::new(Severity::High, 1, 0); let event_grp_1 = TestEvent::OtherGroup;
let (event_grp_0_sender, event_grp_0_receiver) = mpsc::channel(); let (event_grp_0_sender, event_grp_0_receiver) = mpsc::channel();
let event_grp_0_and_1_listener = EventU32SenderMpsc::new(0, event_grp_0_sender); let event_grp_0_and_1_listener = EventSenderMpsc::new(0, event_grp_0_sender);
event_man.subscribe_group( event_man.subscribe_group(
event_grp_0.group_id(), event_grp_0.id().group_id(),
event_grp_0_and_1_listener.target_id(), event_grp_0_and_1_listener.target_id(),
); );
event_man.subscribe_group( event_man.subscribe_group(
event_grp_1_0.group_id(), event_grp_1.id().group_id(),
event_grp_0_and_1_listener.target_id(), event_grp_0_and_1_listener.target_id(),
); );
event_man.add_sender(event_grp_0_and_1_listener); event_man.add_sender(event_grp_0_and_1_listener);
let event_grp_0_erased = EventErasedAlloc::new(&event_grp_0);
let event_grp_1_erased = EventErasedAlloc::new(&event_grp_1);
event_sender event_sender
.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_grp_0)) .send(EventMessage::new(
TEST_COMPONENT_ID_0.id(),
event_grp_0_erased.clone(),
))
.expect("Sending Event Group 0 failed"); .expect("Sending Event Group 0 failed");
event_sender event_sender
.send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_grp_1_0)) .send(EventMessage::new(
TEST_COMPONENT_ID_1.id(),
event_grp_1_erased.clone(),
))
.expect("Sendign Event Group 1 failed"); .expect("Sendign Event Group 1 failed");
let res = event_man.try_event_handling(error_handler); let res = event_man.try_event_handling(error_handler);
check_handled_event(res, event_grp_0, 1, TEST_COMPONENT_ID_0.id()); check_handled_event(res, &event_grp_0_erased, 1, TEST_COMPONENT_ID_0.id());
let res = event_man.try_event_handling(error_handler); let res = event_man.try_event_handling(error_handler);
check_handled_event(res, event_grp_1_0, 1, TEST_COMPONENT_ID_1.id()); check_handled_event(res, &event_grp_1_erased, 1, TEST_COMPONENT_ID_1.id());
check_next_event(event_grp_0, &event_grp_0_receiver); check_next_event(event_grp_0, &event_grp_0_receiver);
check_next_event(event_grp_1_0, &event_grp_0_receiver); check_next_event(event_grp_1, &event_grp_0_receiver);
} }
/// Test listening to the same event from multiple listeners. Also test listening /// Test listening to the same event from multiple listeners. Also test listening
/// to both group and single events from one listener /// to both group and single events from one listener
#[test] #[test]
fn test_listening_to_same_event_and_multi_type() { fn test_listening_to_same_event_and_multi_type() {
let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| { let error_handler = |event_msg: &EventMessage<EventErasedAlloc>, e: EventRoutingError| {
panic!("routing error occurred for event {:?}: {:?}", event_msg, e); panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
}; };
let (event_sender, mut event_man) = generic_event_man(); let (event_sender, mut event_man) = generic_event_man();
let event_0 = EventU32::new(Severity::Info, 0, 5); let event_0 = TestEvent::Info;
let event_1 = EventU32::new(Severity::High, 1, 0); let event_1 = TestEvent::OtherGroup;
//let event_0 = EventU32::new(Severity::Info, 0, 5);
//let event_1 = EventU32::new(Severity::High, 1, 0);
let (event_0_tx_0, event_0_rx_0) = mpsc::channel(); let (event_0_tx_0, event_0_rx_0) = mpsc::channel();
let (event_0_tx_1, event_0_rx_1) = mpsc::channel(); let (event_0_tx_1, event_0_rx_1) = mpsc::channel();
let event_listener_0 = EventU32SenderMpsc::new(0, event_0_tx_0); let event_listener_0 = EventSenderMpsc::new(0, event_0_tx_0);
let event_listener_1 = EventU32SenderMpsc::new(1, event_0_tx_1); let event_listener_1 = EventSenderMpsc::new(1, event_0_tx_1);
let event_listener_0_sender_id = event_listener_0.target_id(); let event_listener_0_sender_id = event_listener_0.target_id();
event_man.subscribe_single(&event_0, event_listener_0_sender_id); event_man.subscribe_single(event_0.id(), event_listener_0_sender_id);
event_man.add_sender(event_listener_0); event_man.add_sender(event_listener_0);
let event_listener_1_sender_id = event_listener_1.target_id(); let event_listener_1_sender_id = event_listener_1.target_id();
event_man.subscribe_single(&event_0, event_listener_1_sender_id); event_man.subscribe_single(event_0.id(), event_listener_1_sender_id);
event_man.add_sender(event_listener_1); event_man.add_sender(event_listener_1);
let event_0_erased = EventErasedAlloc::new(&event_0);
let event_1_erased = EventErasedAlloc::new(&event_1);
event_sender event_sender
.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_0)) .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_0_erased.clone()))
.expect("Triggering Event 0 failed"); .expect("Triggering Event 0 failed");
let res = event_man.try_event_handling(error_handler); let res = event_man.try_event_handling(error_handler);
check_handled_event(res, event_0, 2, TEST_COMPONENT_ID_0.id()); check_handled_event(res, &event_0_erased, 2, TEST_COMPONENT_ID_0.id());
check_next_event(event_0, &event_0_rx_0); check_next_event(event_0, &event_0_rx_0);
check_next_event(event_0, &event_0_rx_1); check_next_event(event_0, &event_0_rx_1);
event_man.subscribe_group(event_1.group_id(), event_listener_0_sender_id); event_man.subscribe_group(event_1.id().group_id(), event_listener_0_sender_id);
event_sender event_sender
.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_0)) .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_0_erased.clone()))
.expect("Triggering Event 0 failed"); .expect("Triggering Event 0 failed");
event_sender event_sender
.send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_1)) .send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_1_erased.clone()))
.expect("Triggering Event 1 failed"); .expect("Triggering Event 1 failed");
// 3 Events messages will be sent now // 3 Events messages will be sent now
let res = event_man.try_event_handling(error_handler); let res = event_man.try_event_handling(error_handler);
check_handled_event(res, event_0, 2, TEST_COMPONENT_ID_0.id()); check_handled_event(res, &event_0_erased, 2, TEST_COMPONENT_ID_0.id());
let res = event_man.try_event_handling(error_handler); let res = event_man.try_event_handling(error_handler);
check_handled_event(res, event_1, 1, TEST_COMPONENT_ID_1.id()); check_handled_event(res, &event_1_erased, 1, TEST_COMPONENT_ID_1.id());
// Both the single event and the group event should arrive now // Both the single event and the group event should arrive now
check_next_event(event_0, &event_0_rx_0); check_next_event(event_0, &event_0_rx_0);
check_next_event(event_1, &event_0_rx_0); check_next_event(event_1, &event_0_rx_0);
// Do double insertion and then remove duplicates // Do double insertion and then remove duplicates
event_man.subscribe_group(event_1.group_id(), event_listener_0_sender_id); event_man.subscribe_group(event_1.id().group_id(), event_listener_0_sender_id);
event_man.remove_duplicates(&ListenerKey::Group(event_1.group_id())); event_man.remove_duplicates(&ListenerKey::Group(event_1.id().group_id()));
event_sender event_sender
.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_1)) .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_1_erased.clone()))
.expect("Triggering Event 1 failed"); .expect("Triggering Event 1 failed");
let res = event_man.try_event_handling(error_handler); let res = event_man.try_event_handling(error_handler);
check_handled_event(res, event_1, 1, TEST_COMPONENT_ID_0.id()); check_handled_event(res, &event_1_erased, 1, TEST_COMPONENT_ID_0.id());
} }
#[test] #[test]
fn test_all_events_listener() { fn test_all_events_listener() {
let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| { let error_handler = |event_msg: &EventMessage<EventErasedAlloc>, e: EventRoutingError| {
panic!("routing error occurred for event {:?}: {:?}", event_msg, e); panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
}; };
let (event_sender, event_receiver) = mpsc::channel(); let (event_sender, event_receiver) = mpsc::channel();
let mut event_man = EventManagerWithMpsc::new(event_receiver); let mut event_man = EventManagerWithMpsc::new(event_receiver);
let event_0 = EventDynParam::new_no_params(TestEventIds::TestInfo.id()); let event_0 = TestEvent::Info;
let event_1 = EventDynParam::new_no_params(TestEventIds::TestError.id()); let event_1 = TestEvent::Error;
let (event_0_tx_0, all_events_rx) = mpsc::channel(); let (event_0_tx_0, all_events_rx) = mpsc::channel();
let all_events_listener = EventU32SenderMpsc::new(0, event_0_tx_0); let all_events_listener = EventSenderMpsc::new(0, event_0_tx_0);
event_man.subscribe_all(all_events_listener.target_id()); event_man.subscribe_all(all_events_listener.target_id());
event_man.add_sender(all_events_listener); event_man.add_sender(all_events_listener);
event_sender event_sender
.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_0)) .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_0.into()))
.expect("Triggering event 0 failed"); .expect("Triggering event 0 failed");
event_sender event_sender
.send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_1)) .send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_1.into()))
.expect("Triggering event 1 failed"); .expect("Triggering event 1 failed");
let res = event_man.try_event_handling(error_handler); let res = event_man.try_event_handling(error_handler);
check_handled_event(res, event_0, 1, TEST_COMPONENT_ID_0.id()); check_handled_event(res, &event_0.into(), 1, TEST_COMPONENT_ID_0.id());
let res = event_man.try_event_handling(error_handler); let res = event_man.try_event_handling(error_handler);
check_handled_event(res, event_1, 1, TEST_COMPONENT_ID_1.id()); check_handled_event(res, &event_1.into(), 1, TEST_COMPONENT_ID_1.id());
check_next_event(event_0, &all_events_rx); check_next_event(event_0, &all_events_rx);
check_next_event(event_1, &all_events_rx); check_next_event(event_1, &all_events_rx);
} }
@@ -845,17 +842,18 @@ mod tests {
#[test] #[test]
fn test_bounded_event_sender_queue_full() { fn test_bounded_event_sender_queue_full() {
let (event_sender, _event_receiver) = mpsc::sync_channel(3); let (event_sender, _event_receiver) = mpsc::sync_channel(3);
let event_sender = EventU32SenderMpscBounded::new(1, event_sender, 3); let test_event = TestEvent::Info;
let event_sender = EventSenderMpscBounded::<EventErasedAlloc>::new(1, event_sender, 3);
event_sender event_sender
.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)) .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), test_event.into()))
.expect("sending test event failed"); .expect("sending test event failed");
event_sender event_sender
.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)) .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), test_event.into()))
.expect("sending test event failed"); .expect("sending test event failed");
event_sender event_sender
.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)) .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), test_event.into()))
.expect("sending test event failed"); .expect("sending test event failed");
let error = event_sender.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)); let error = event_sender.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), test_event.into()));
if let Err(e) = error { if let Err(e) = error {
assert!(matches!(e, GenericSendError::QueueFull(Some(3)))); assert!(matches!(e, GenericSendError::QueueFull(Some(3))));
} else { } else {
@@ -865,9 +863,10 @@ mod tests {
#[test] #[test]
fn test_bounded_event_sender_rx_dropped() { fn test_bounded_event_sender_rx_dropped() {
let (event_sender, event_receiver) = mpsc::sync_channel(3); let (event_sender, event_receiver) = mpsc::sync_channel(3);
let event_sender = EventU32SenderMpscBounded::new(1, event_sender, 3); let test_event = TestEvent::Info;
let event_sender = EventSenderMpscBounded::<EventErasedAlloc>::new(1, event_sender, 3);
drop(event_receiver); drop(event_receiver);
if let Err(e) = event_sender.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), TEST_EVENT)) { if let Err(e) = event_sender.send(EventMessage::new(TEST_COMPONENT_ID_0.id(), test_event.into())) {
assert!(matches!(e, GenericSendError::RxDisconnected)); assert!(matches!(e, GenericSendError::RxDisconnected));
} else { } else {
panic!("Expected error"); panic!("Expected error");

View File

@@ -85,7 +85,7 @@ impl HasSeverity for SeverityHigh {
const SEVERITY: Severity = Severity::High; const SEVERITY: Severity = Severity::High;
} }
pub trait GenericEvent: Copy + Clone { pub trait GenericEvent: EcssEnumeration + Copy + Clone {
type Raw; type Raw;
type GroupId; type GroupId;
type UniqueId; type UniqueId;

View File

@@ -29,13 +29,9 @@
//! ``` //! ```
use core::fmt::Debug; use core::fmt::Debug;
use core::hash::Hash; use core::hash::Hash;
use core::marker::PhantomData;
use core::option::Iter;
use arbitrary_int::{prelude::*, u14}; use arbitrary_int::{prelude::*, u14};
use spacepackets::ByteConversionError;
use crate::ComponentId;
use crate::queue::GenericSendError;
/// Using a type definition allows to change this to u64 in the future more easily /// Using a type definition allows to change this to u64 in the future more easily
pub type LargestEventRaw = u32; pub type LargestEventRaw = u32;
@@ -100,11 +96,8 @@ impl TryFrom<u8> for Severity {
} }
} }
pub trait Event: Clone { pub trait Event: Clone{
fn id(&self) -> EventId; fn id(&self) -> EventId;
fn parameters(&self) -> Option<&[u8]> {
None
}
} }
pub type GroupId = u14; pub type GroupId = u14;
@@ -162,61 +155,101 @@ impl From<u32> for EventId {
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub struct EventDynParam { pub struct EventErasedAlloc {
id: EventId, id: EventId,
parameters: Option<alloc::vec::Vec<u8>>, event_raw: alloc::vec::Vec<u8>,
} }
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
impl EventDynParam { impl EventErasedAlloc {
pub fn new(id: EventId, parameters: Option<alloc::vec::Vec<u8>>) -> Self { pub fn new(event: &(impl serde::Serialize + Event)) -> Self {
Self { id, parameters }
}
pub fn new_no_params(id: EventId) -> Self {
Self { Self {
id, id: event.id(),
parameters: None, event_raw: postcard::to_allocvec(event).unwrap(),
} }
} }
#[inline]
pub fn raw(&self) -> &[u8] {
&self.event_raw
}
} }
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
impl Event for EventDynParam { impl<T: serde::Serialize + Event> From<T> for EventErasedAlloc {
fn from(event: T) -> Self {
Self::new(&event)
}
}
#[cfg(feature = "alloc")]
impl Event for EventErasedAlloc {
fn id(&self) -> EventId { fn id(&self) -> EventId {
self.id self.id
} }
fn parameters(&self) -> Option<&[u8]> {
self.parameters.as_deref()
}
} }
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct EventHeapless<const N: usize> { pub struct EventHeapless<const N: usize> {
id: EventId, id: EventId,
parameters: Option<heapless::vec::Vec<u8, N>>, event_raw: heapless::vec::Vec<u8, N>,
} }
impl<const N: usize> Event for EventHeapless<N> { impl<const N: usize> Event for EventHeapless<N> {
fn id(&self) -> EventId { fn id(&self) -> EventId {
self.id self.id
} }
fn parameters(&self) -> Option<&[u8]> {
self.parameters.as_deref()
}
} }
impl<const N: usize> EventHeapless<N> { impl<const N: usize> EventHeapless<N> {
pub fn new(id: EventId, parameters: Option<heapless::Vec<u8, N>>) -> Self { pub fn new(event: &(impl serde::Serialize + Event)) -> Result<Self, ByteConversionError> {
Self { id, parameters } let ser_size = postcard::experimental::serialized_size(event).unwrap();
} if ser_size < N {
return Err(ByteConversionError::ToSliceTooSmall {
pub fn new_no_params(id: EventId) -> Self { found: N,
Self { expected: ser_size,
id, });
parameters: None,
} }
let mut vec = heapless::Vec::<u8, N>::new();
vec.resize(N, 0);
postcard::to_slice(event, vec.as_mut_slice()).unwrap();
Ok(Self {
id: event.id(),
event_raw: vec,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::mem::size_of;
fn assert_size<T>(_: T, val: usize) {
assert_eq!(size_of::<T>(), val);
}
#[derive(Debug, Copy, Clone)]
pub enum TestEvent {
Info,
Error,
}
impl Event for TestEvent {
fn id(&self) -> EventId {
match self {
TestEvent::Info => EventId::new(Severity::Info, u14::new(0), 0),
TestEvent::Error => EventId::new(Severity::High, u14::new(1), 1),
}
}
}
#[test]
fn test_normal_event_getters() {
assert_eq!(TestEvent::Info.id().severity(), Severity::Info);
assert_eq!(TestEvent::Info.id().unique_id(), 0);
assert_eq!(TestEvent::Info.id().group_id().value(), 0);
let raw_event = TestEvent::Info.id().raw();
assert_eq!(raw_event, 0x00000000);
} }
} }

View File

@@ -16,11 +16,11 @@
#![no_std] #![no_std]
#![cfg_attr(docsrs, feature(doc_auto_cfg))] #![cfg_attr(docsrs, feature(doc_auto_cfg))]
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
extern crate alloc;
#[cfg(feature = "alloc")]
extern crate downcast_rs; extern crate downcast_rs;
#[cfg(any(feature = "std", test))] #[cfg(any(feature = "std", test))]
extern crate std; extern crate std;
#[cfg(any(feature = "alloc", test))]
extern crate alloc;
pub mod action; pub mod action;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]