From c57a4efd88cf7cd3b0681b8b557cbe32a02b2271 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 27 Oct 2022 23:56:47 +0200 Subject: [PATCH] needs some research --- fsrc-core/src/event_man.rs | 92 +++++++++++++++++++++++--------------- 1 file changed, 55 insertions(+), 37 deletions(-) diff --git a/fsrc-core/src/event_man.rs b/fsrc-core/src/event_man.rs index 33884b6..fec1b6c 100644 --- a/fsrc-core/src/event_man.rs +++ b/fsrc-core/src/event_man.rs @@ -15,7 +15,10 @@ pub trait EventListener { type Error; fn id(&self) -> u32; - fn send_to(&mut self, event: Provider) -> Result<(), Self::Error>; + fn send_to_no_data(&mut self, event: Provider) -> Result<(), Self::Error> { + self.send_to(event, None) + } + fn send_to(&mut self, event: Provider, aux_data: Option<&[u8]>) -> Result<(), Self::Error>; } struct Listener { @@ -24,7 +27,7 @@ struct Listener { } pub trait ReceivesAllEvent { - fn receive(&mut self) -> Option; + fn receive(&mut self) -> Option<(Provider, Option<&[u8]>)>; } pub struct EventManager { @@ -114,23 +117,28 @@ impl EventManager { pub fn try_event_handling(&mut self) -> Result, E> { let mut err_status = None; let mut num_recipients = 0; - let mut send_handler = |event: Provider, llist: &mut Vec>| { - for listener in llist.iter_mut() { - if let Err(e) = listener.dest.send_to(event) { - err_status = Some(Err(e)); - } else { - num_recipients += 1; + let mut send_handler = + |event: Provider, aux_data: Option<&[u8]>, llist: &mut Vec>| { + for listener in llist.iter_mut() { + if let Err(e) = listener.dest.send_to(event, aux_data) { + err_status = Some(Err(e)); + } else { + num_recipients += 1; + } } - } - }; - if let Some(event) = self.event_receiver.receive() { + }; + if let Some((event, aux_data)) = self.event_receiver.receive() { let single_key = ListenerType::Single(event.raw_as_largest_type()); if self.listeners.contains_key(&single_key) { - send_handler(event, self.listeners.get_mut(&single_key).unwrap()); + send_handler( + event, + aux_data, + self.listeners.get_mut(&single_key).unwrap(), + ); } let group_key = ListenerType::Group(event.group_id_as_largest_type()); if self.listeners.contains_key(&group_key) { - send_handler(event, self.listeners.get_mut(&group_key).unwrap()); + send_handler(event, aux_data, self.listeners.get_mut(&group_key).unwrap()); } if let Some(err) = err_status { return err; @@ -151,36 +159,46 @@ mod tests { use std::thread; use std::time::Duration; - struct EventReceiver { - mpsc_receiver: Receiver, + type EventAndParams<'a> = (EventU32, Option<&'a [u8]>); + + struct EventReceiver<'a> { + mpsc_receiver: Receiver>, } - impl ReceivesAllEvent for EventReceiver { - fn receive(&mut self) -> Option { - self.mpsc_receiver.try_recv().ok() + + impl ReceivesAllEvent for EventReceiver<'_> { + fn receive(&mut self) -> Option<(EventU32, Option<&[u8]>)> { + if let Some((event, _params)) = self.mpsc_receiver.try_recv().ok() { + return Some((event, None)); + } + None } } #[derive(Clone)] - struct MpscEventSenderQueue { + struct MpscEventSenderQueue<'a> { id: u32, - mpsc_sender: Sender, + mpsc_sender: Sender>, } - impl EventListener for MpscEventSenderQueue { - type Error = SendError; + impl<'a> EventListener for MpscEventSenderQueue<'a> { + type Error = SendError>; fn id(&self) -> u32 { self.id } - fn send_to(&mut self, event: EventU32) -> Result<(), Self::Error> { - self.mpsc_sender.send(event) + fn send_to( + &mut self, + event: EventU32, + _aux_data: Option<&[u8]>, + ) -> Result<(), Self::Error> { + self.mpsc_sender.send((event, None)) } } - fn check_next_event(expected: EventU32, receiver: &Receiver) { + fn check_next_event(expected: EventU32, receiver: &Receiver) { for _ in 0..5 { if let Ok(event) = receiver.try_recv() { - assert_eq!(event, expected); + assert_eq!(event.0, expected); break; } thread::sleep(Duration::from_millis(1)); @@ -205,7 +223,7 @@ mod tests { let event_man_receiver = EventReceiver { mpsc_receiver: manager_queue, }; - let mut event_man: EventManager, EventU32> = + let mut event_man: EventManager, EventU32> = EventManager::new(Box::new(event_man_receiver)); let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap(); let event_grp_1_0 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); @@ -224,7 +242,7 @@ mod tests { // Test event with one listener event_sender - .send(event_grp_0) + .send((event_grp_0, None)) .expect("Sending single error failed"); let res = event_man.try_event_handling(); assert!(res.is_ok()); @@ -233,7 +251,7 @@ mod tests { // Test event which is sent to all group listeners event_sender - .send(event_grp_1_0) + .send((event_grp_1_0, None)) .expect("Sending group error failed"); let res = event_man.try_event_handling(); assert!(res.is_ok()); @@ -248,7 +266,7 @@ mod tests { let event_man_receiver = EventReceiver { mpsc_receiver: manager_queue, }; - let mut event_man: EventManager, EventU32> = + let mut event_man: EventManager, EventU32> = EventManager::new(Box::new(event_man_receiver)); let res = event_man.try_event_handling(); assert!(res.is_ok()); @@ -266,10 +284,10 @@ mod tests { event_man.subscribe_group(event_grp_1_0.group_id(), event_grp_0_and_1_listener); event_sender - .send(event_grp_0) + .send((event_grp_0, None)) .expect("Sending Event Group 0 failed"); event_sender - .send(event_grp_1_0) + .send((event_grp_1_0, None)) .expect("Sendign Event Group 1 failed"); let res = event_man.try_event_handling(); assert!(res.is_ok()); @@ -290,7 +308,7 @@ mod tests { let event_man_receiver = EventReceiver { mpsc_receiver: manager_queue, }; - let mut event_man: EventManager, EventU32> = + let mut event_man: EventManager, EventU32> = EventManager::new(Box::new(event_man_receiver)); let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap(); let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); @@ -307,7 +325,7 @@ mod tests { event_man.subscribe_single(event_0, event_listener_0.clone()); event_man.subscribe_single(event_0, event_listener_1); event_sender - .send(event_0) + .send((event_0, None)) .expect("Triggering Event 0 failed"); let res = event_man.try_event_handling(); assert!(res.is_ok()); @@ -316,10 +334,10 @@ mod tests { check_next_event(event_0, &event_0_rx_1); event_man.subscribe_group(event_1.group_id(), event_listener_0.clone()); event_sender - .send(event_0) + .send((event_0, None)) .expect("Triggering Event 0 failed"); event_sender - .send(event_1) + .send((event_1, None)) .expect("Triggering Event 1 failed"); // 3 Events messages will be sent now @@ -336,7 +354,7 @@ mod tests { // Double insertion should be detected, result should remain the same event_man.subscribe_group(event_1.group_id(), event_listener_0); event_sender - .send(event_1) + .send((event_1, None)) .expect("Triggering Event 1 failed"); let res = event_man.try_event_handling(); assert!(res.is_ok());