improve closure for error handler
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good
This commit is contained in:
parent
4e32497b6b
commit
f9ee3998fd
@ -154,13 +154,15 @@ impl EventManagerWrapper {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn try_event_routing(&mut self) {
|
pub fn try_event_routing(&mut self) {
|
||||||
let error_handler = |error| self.routing_error_handler(error);
|
let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| {
|
||||||
|
self.routing_error_handler(event_msg, error)
|
||||||
|
};
|
||||||
// Perform the event routing.
|
// Perform the event routing.
|
||||||
self.event_manager.try_event_handling(error_handler);
|
self.event_manager.try_event_handling(error_handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn routing_error_handler(&self, error: EventRoutingError) {
|
pub fn routing_error_handler(&self, event_msg: &EventMessageU32, error: EventRoutingError) {
|
||||||
log::warn!("event routing error: {error:?}");
|
log::warn!("event routing error for event {event_msg:?}: {error:?}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,10 +66,6 @@ pub enum ListenerKey {
|
|||||||
All,
|
All,
|
||||||
}
|
}
|
||||||
|
|
||||||
// pub type EventWithAuxData<Event> = (Event, Option<Params>);
|
|
||||||
// pub type EventU32WithAuxData = EventWithAuxData<EventU32>;
|
|
||||||
// pub type EventU16WithAuxData = EventWithAuxData<EventU16>;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct EventMessage<Event: GenericEvent, ParamProvider: Debug = Params> {
|
pub struct EventMessage<Event: GenericEvent, ParamProvider: Debug = Params> {
|
||||||
sender_id: ComponentId,
|
sender_id: ComponentId,
|
||||||
@ -77,16 +73,16 @@ pub struct EventMessage<Event: GenericEvent, ParamProvider: Debug = Params> {
|
|||||||
params: Option<ParamProvider>,
|
params: Option<ParamProvider>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Event: GenericEvent, ParamProvider: Debug> EventMessage<Event, ParamProvider> {
|
impl<Event: GenericEvent, ParamProvider: Debug + Clone> EventMessage<Event, ParamProvider> {
|
||||||
pub fn new_generic(
|
pub fn new_generic(
|
||||||
sender_id: ComponentId,
|
sender_id: ComponentId,
|
||||||
event: Event,
|
event: Event,
|
||||||
params: Option<ParamProvider>,
|
params: Option<&ParamProvider>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
sender_id,
|
sender_id,
|
||||||
event,
|
event,
|
||||||
params,
|
params: params.cloned(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,7 +102,7 @@ impl<Event: GenericEvent, ParamProvider: Debug> EventMessage<Event, ParamProvide
|
|||||||
Self::new_generic(sender_id, event, None)
|
Self::new_generic(sender_id, event, None)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_with_params(sender_id: ComponentId, event: Event, params: ParamProvider) -> Self {
|
pub fn new_with_params(sender_id: ComponentId, event: Event, params: &ParamProvider) -> Self {
|
||||||
Self::new_generic(sender_id, event, Some(params))
|
Self::new_generic(sender_id, event, Some(params))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -280,42 +276,44 @@ impl<
|
|||||||
/// If this works without any issues, the [EventRoutingResult] will contain context information
|
/// If this works without any issues, the [EventRoutingResult] will contain context information
|
||||||
/// about the routed event.
|
/// about the routed event.
|
||||||
///
|
///
|
||||||
/// This function will track up to 3 errors returned as part of the
|
/// If an error occurs during the routing, the error handler will be called. The error handler
|
||||||
/// [EventRoutingErrorsWithResult] error struct.
|
/// should take a reference to the event message as the first argument, and the routing error
|
||||||
pub fn try_event_handling<E: FnMut(EventRoutingError)>(
|
/// as the second argument.
|
||||||
|
pub fn try_event_handling<E: FnMut(&EventMessage<Event, ParamProvider>, EventRoutingError)>(
|
||||||
&self,
|
&self,
|
||||||
mut error_handler: E,
|
mut error_handler: E,
|
||||||
) -> EventRoutingResult<Event, ParamProvider> {
|
) -> EventRoutingResult<Event, ParamProvider> {
|
||||||
// let mut err_idx = 0;
|
|
||||||
// let mut err_slice = [None, None, None];
|
|
||||||
let mut num_recipients = 0;
|
let mut num_recipients = 0;
|
||||||
let mut send_handler = |key: &ListenerKey, event: Event, params: &Option<ParamProvider>| {
|
let mut send_handler =
|
||||||
if self.listener_map.contains_listener(key) {
|
|key: &ListenerKey, event_msg: &EventMessage<Event, ParamProvider>| {
|
||||||
if let Some(ids) = self.listener_map.get_listener_ids(key) {
|
if self.listener_map.contains_listener(key) {
|
||||||
for id in ids {
|
if let Some(ids) = self.listener_map.get_listener_ids(key) {
|
||||||
if let Some(sender) = self.sender_map.get_send_event_provider(id) {
|
for id in ids {
|
||||||
if let Err(e) =
|
if let Some(sender) = self.sender_map.get_send_event_provider(id) {
|
||||||
sender.send(EventMessage::new_generic(*id, event, params.clone()))
|
if let Err(e) = sender.send(EventMessage::new_generic(
|
||||||
{
|
*id,
|
||||||
error_handler(EventRoutingError::Send(e));
|
event_msg.event,
|
||||||
|
event_msg.params.as_ref(),
|
||||||
|
)) {
|
||||||
|
error_handler(event_msg, EventRoutingError::Send(e));
|
||||||
|
} else {
|
||||||
|
num_recipients += 1;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
num_recipients += 1;
|
error_handler(event_msg, EventRoutingError::NoSenderForId(*id));
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
error_handler(EventRoutingError::NoSenderForId(*id));
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
error_handler(event_msg, EventRoutingError::NoSendersForKey(*key));
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
error_handler(EventRoutingError::NoSendersForKey(*key));
|
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
};
|
|
||||||
if let Ok(Some(event_msg)) = self.event_receiver.try_recv_event() {
|
if let Ok(Some(event_msg)) = self.event_receiver.try_recv_event() {
|
||||||
let single_key = ListenerKey::Single(event_msg.event.raw_as_largest_type());
|
let single_key = ListenerKey::Single(event_msg.event.raw_as_largest_type());
|
||||||
send_handler(&single_key, event_msg.event, &event_msg.params);
|
send_handler(&single_key, &event_msg);
|
||||||
let group_key = ListenerKey::Group(event_msg.event.group_id_as_largest_type());
|
let group_key = ListenerKey::Group(event_msg.event.group_id_as_largest_type());
|
||||||
send_handler(&group_key, event_msg.event, &event_msg.params);
|
send_handler(&group_key, &event_msg);
|
||||||
send_handler(&ListenerKey::All, event_msg.event, &event_msg.params);
|
send_handler(&ListenerKey::All, &event_msg);
|
||||||
return EventRoutingResult::Handled {
|
return EventRoutingResult::Handled {
|
||||||
num_recipients,
|
num_recipients,
|
||||||
event_msg,
|
event_msg,
|
||||||
@ -646,8 +644,8 @@ mod tests {
|
|||||||
event_man.subscribe_group(event_grp_1_0.group_id(), group_event_listener.target_id());
|
event_man.subscribe_group(event_grp_1_0.group_id(), group_event_listener.target_id());
|
||||||
event_man.add_sender(group_event_listener);
|
event_man.add_sender(group_event_listener);
|
||||||
|
|
||||||
let error_handler = |e: EventRoutingError| {
|
let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| {
|
||||||
panic!("routing error occurred: {:?}", e);
|
panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
|
||||||
};
|
};
|
||||||
// Test event with one listener
|
// Test event with one listener
|
||||||
event_sender
|
event_sender
|
||||||
@ -669,8 +667,8 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_with_basic_params() {
|
fn test_with_basic_params() {
|
||||||
let error_handler = |e: EventRoutingError| {
|
let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| {
|
||||||
panic!("routing error occurred: {:?}", e);
|
panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
|
||||||
};
|
};
|
||||||
let (event_sender, mut event_man) = generic_event_man();
|
let (event_sender, mut event_man) = generic_event_man();
|
||||||
let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap();
|
let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap();
|
||||||
@ -682,7 +680,7 @@ mod tests {
|
|||||||
.send(EventMessage::new_with_params(
|
.send(EventMessage::new_with_params(
|
||||||
TEST_COMPONENT_ID_0.id(),
|
TEST_COMPONENT_ID_0.id(),
|
||||||
event_grp_0,
|
event_grp_0,
|
||||||
Params::Heapless((2_u32, 3_u32).into()),
|
&Params::Heapless((2_u32, 3_u32).into()),
|
||||||
))
|
))
|
||||||
.expect("Sending group error failed");
|
.expect("Sending group error failed");
|
||||||
let res = event_man.try_event_handling(&error_handler);
|
let res = event_man.try_event_handling(&error_handler);
|
||||||
@ -701,11 +699,11 @@ mod tests {
|
|||||||
/// Test listening for multiple groups
|
/// Test listening for multiple groups
|
||||||
#[test]
|
#[test]
|
||||||
fn test_multi_group() {
|
fn test_multi_group() {
|
||||||
let error_handler = |e: EventRoutingError| {
|
let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| {
|
||||||
panic!("routing error occurred: {:?}", e);
|
panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
|
||||||
};
|
};
|
||||||
let (event_sender, mut event_man) = generic_event_man();
|
let (event_sender, mut event_man) = generic_event_man();
|
||||||
let res = event_man.try_event_handling(&error_handler);
|
let res = event_man.try_event_handling(error_handler);
|
||||||
assert!(matches!(res, EventRoutingResult::Empty));
|
assert!(matches!(res, EventRoutingResult::Empty));
|
||||||
|
|
||||||
let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap();
|
let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap();
|
||||||
@ -741,8 +739,8 @@ mod tests {
|
|||||||
/// to both group and single events from one listener
|
/// to both group and single events from one listener
|
||||||
#[test]
|
#[test]
|
||||||
fn test_listening_to_same_event_and_multi_type() {
|
fn test_listening_to_same_event_and_multi_type() {
|
||||||
let error_handler = |e: EventRoutingError| {
|
let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| {
|
||||||
panic!("routing error occurred: {:?}", e);
|
panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
|
||||||
};
|
};
|
||||||
let (event_sender, mut event_man) = generic_event_man();
|
let (event_sender, mut event_man) = generic_event_man();
|
||||||
let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap();
|
let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap();
|
||||||
@ -793,8 +791,8 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_all_events_listener() {
|
fn test_all_events_listener() {
|
||||||
let error_handler = |e: EventRoutingError| {
|
let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| {
|
||||||
panic!("routing error occurred: {:?}", e);
|
panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
|
||||||
};
|
};
|
||||||
let (event_sender, manager_queue) = mpsc::channel();
|
let (event_sender, manager_queue) = mpsc::channel();
|
||||||
let event_man_receiver = MpscEventReceiver::new(manager_queue);
|
let event_man_receiver = MpscEventReceiver::new(manager_queue);
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use satrs::event_man::{
|
use satrs::event_man::{
|
||||||
EventManagerWithMpsc, EventMessage, EventRoutingError, EventSendProvider, EventU32SenderMpsc,
|
EventManagerWithMpsc, EventMessage, EventMessageU32, EventRoutingError, EventSendProvider,
|
||||||
MpscEventU32Receiver,
|
EventU32SenderMpsc, MpscEventU32Receiver,
|
||||||
};
|
};
|
||||||
use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo};
|
use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo};
|
||||||
use satrs::params::U32Pair;
|
use satrs::params::U32Pair;
|
||||||
@ -41,8 +41,8 @@ fn test_threaded_usage() {
|
|||||||
let reporter =
|
let reporter =
|
||||||
EventReporter::new(TEST_ID.raw(), 0x02, 128).expect("Creating event reporter failed");
|
EventReporter::new(TEST_ID.raw(), 0x02, 128).expect("Creating event reporter failed");
|
||||||
let pus_event_man = PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default());
|
let pus_event_man = PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default());
|
||||||
let error_handler = |error: EventRoutingError| {
|
let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| {
|
||||||
panic!("received routing error {error:?}");
|
panic!("received routing error for event {event_msg:?}: {error:?}");
|
||||||
};
|
};
|
||||||
// PUS + Generic event manager thread
|
// PUS + Generic event manager thread
|
||||||
let jh0 = thread::spawn(move || {
|
let jh0 = thread::spawn(move || {
|
||||||
@ -133,7 +133,7 @@ fn test_threaded_usage() {
|
|||||||
.send(EventMessage::new_with_params(
|
.send(EventMessage::new_with_params(
|
||||||
TEST_COMPONENT_ID_0.id(),
|
TEST_COMPONENT_ID_0.id(),
|
||||||
LOW_SEV_EVENT,
|
LOW_SEV_EVENT,
|
||||||
Params::Heapless((2_u32, 3_u32).into()),
|
&Params::Heapless((2_u32, 3_u32).into()),
|
||||||
))
|
))
|
||||||
.expect("Sending low severity event failed");
|
.expect("Sending low severity event failed");
|
||||||
loop {
|
loop {
|
||||||
|
Loading…
Reference in New Issue
Block a user