added some more tests

This commit is contained in:
Robin Müller 2024-02-23 11:21:40 +01:00
parent fe3908cff4
commit e6947b602c
Signed by: muellerr
GPG Key ID: A649FB78196E3849
3 changed files with 68 additions and 27 deletions

View File

@ -42,16 +42,18 @@ impl PusEventHandler {
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
tm_sender: impl EcssTmSender,
) -> Self {
let (pus_event_man_tx, pus_event_man_rx) = mpsc::sync_channel(30);
let event_queue_cap = 30;
let (pus_event_man_tx, pus_event_man_rx) = mpsc::sync_channel(event_queue_cap);
// All events sent to the manager are routed to the PUS event manager, which generates PUS event
// telemetry for each event.
let event_reporter = EventReporter::new(PUS_APID, 128).unwrap();
let pus_event_dispatcher =
DefaultPusEventU32Dispatcher::new_with_default_backend(event_reporter);
let pus_event_man_send_provider = EventU32SenderMpscBounded::new(1, pus_event_man_tx);
let pus_event_man_send_provider =
EventU32SenderMpscBounded::new(1, pus_event_man_tx, event_queue_cap);
event_manager.subscribe_all(pus_event_man_send_provider.id());
event_manager.subscribe_all(pus_event_man_send_provider.channel_id());
event_manager.add_sender(pus_event_man_send_provider);
Self {

View File

@ -74,12 +74,12 @@ pub type EventU32WithAuxData = EventWithAuxData<EventU32>;
pub type EventU16WithAuxData = EventWithAuxData<EventU16>;
pub trait EventSendProvider<EV: GenericEvent, AuxDataProvider = Params> {
// type Error;
fn channel_id(&self) -> ChannelId;
fn id(&self) -> ChannelId;
fn send_no_data(&self, event: EV) -> Result<(), GenericSendError> {
self.send(event, None)
}
fn send(&self, event: EV, aux_data: Option<AuxDataProvider>) -> Result<(), GenericSendError>;
}
@ -239,7 +239,7 @@ impl<
pub fn add_sender(&mut self, send_provider: SP) {
if !self
.sender_map
.contains_send_event_provider(&send_provider.id())
.contains_send_event_provider(&send_provider.channel_id())
{
self.sender_map.add_send_event_provider(send_provider);
}
@ -407,11 +407,11 @@ pub mod alloc_mod {
}
fn get_send_event_provider(&self, id: &ChannelId) -> Option<&SP> {
self.senders.get(id).filter(|sender| sender.id() == *id)
self.senders.get(id).filter(|sender| sender.channel_id() == *id)
}
fn add_send_event_provider(&mut self, send_provider: SP) -> bool {
let id = send_provider.id();
let id = send_provider.channel_id();
if self.senders.contains_key(&id) {
return false;
}
@ -451,6 +451,8 @@ pub mod std_mod {
pub type MpscEventU32Receiver = MpscEventReceiver<EventU32>;
pub type MpscEventU16Receiver = MpscEventReceiver<EventU16>;
/// Generic event sender which uses a regular [mpsc::Sender] as the messaging backend to
/// send events.
#[derive(Clone)]
pub struct EventSenderMpsc<Event: GenericEvent + Send> {
id: u32,
@ -464,7 +466,7 @@ pub mod std_mod {
}
impl<Event: GenericEvent + Send> EventSendProvider<Event> for EventSenderMpsc<Event> {
fn id(&self) -> u32 {
fn channel_id(&self) -> u32 {
self.id
}
fn send(&self, event: Event, aux_data: Option<Params>) -> Result<(), GenericSendError> {
@ -474,26 +476,39 @@ pub mod std_mod {
}
}
/// Generic event sender which uses the [mpsc::SyncSender] as the messaging backend to send
/// events. This has the advantage that the channel is bounded and thus more deterministic.
#[derive(Clone)]
pub struct EventSenderMpscBounded<Event: GenericEvent + Send> {
id: u32,
channel_id: u32,
sender: mpsc::SyncSender<(Event, Option<Params>)>,
capacity: usize,
}
impl<Event: GenericEvent + Send> EventSenderMpscBounded<Event> {
pub fn new(id: u32, sender: mpsc::SyncSender<(Event, Option<Params>)>) -> Self {
Self { id, sender }
pub fn new(
channel_id: u32,
sender: mpsc::SyncSender<(Event, Option<Params>)>,
capacity: usize,
) -> Self {
Self {
channel_id,
sender,
capacity,
}
}
}
impl<Event: GenericEvent + Send> EventSendProvider<Event> for EventSenderMpscBounded<Event> {
fn id(&self) -> u32 {
self.id
fn channel_id(&self) -> u32 {
self.channel_id
}
fn send(&self, event: Event, aux_data: Option<Params>) -> Result<(), GenericSendError> {
if let Err(e) = self.sender.try_send((event, aux_data)) {
return match e {
mpsc::TrySendError::Full(_) => Err(GenericSendError::QueueFull(None)),
mpsc::TrySendError::Full(_) => {
Err(GenericSendError::QueueFull(Some(self.capacity as u32)))
}
mpsc::TrySendError::Disconnected(_) => Err(GenericSendError::RxDisconnected),
};
}
@ -514,7 +529,9 @@ mod tests {
use crate::events::{EventU32, GenericEvent, Severity};
use crate::params::ParamsRaw;
use std::format;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::mpsc::{self, channel, Receiver, Sender};
const TEST_EVENT: EventU32 = EventU32::const_new(Severity::INFO, 0, 5);
fn check_next_event(
expected: EventU32,
@ -557,11 +574,11 @@ mod tests {
let event_grp_1_0 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
let (single_event_sender, single_event_receiver) = channel();
let single_event_listener = EventSenderMpsc::new(0, single_event_sender);
event_man.subscribe_single(&event_grp_0, single_event_listener.id());
event_man.subscribe_single(&event_grp_0, single_event_listener.channel_id());
event_man.add_sender(single_event_listener);
let (group_event_sender_0, group_event_receiver_0) = channel();
let group_event_listener = EventU32SenderMpsc::new(1, group_event_sender_0);
event_man.subscribe_group(event_grp_1_0.group_id(), group_event_listener.id());
event_man.subscribe_group(event_grp_1_0.group_id(), group_event_listener.channel_id());
event_man.add_sender(group_event_listener);
// Test event with one listener
@ -589,7 +606,7 @@ mod tests {
let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap();
let (single_event_sender, single_event_receiver) = channel();
let single_event_listener = EventSenderMpsc::new(0, single_event_sender);
event_man.subscribe_single(&event_grp_0, single_event_listener.id());
event_man.subscribe_single(&event_grp_0, single_event_listener.channel_id());
event_man.add_sender(single_event_listener);
event_sender
.send((event_grp_0, Some(Params::Heapless((2_u32, 3_u32).into()))))
@ -621,8 +638,8 @@ mod tests {
let event_grp_1_0 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
let (event_grp_0_sender, event_grp_0_receiver) = channel();
let event_grp_0_and_1_listener = EventU32SenderMpsc::new(0, event_grp_0_sender);
event_man.subscribe_group(event_grp_0.group_id(), event_grp_0_and_1_listener.id());
event_man.subscribe_group(event_grp_1_0.group_id(), event_grp_0_and_1_listener.id());
event_man.subscribe_group(event_grp_0.group_id(), event_grp_0_and_1_listener.channel_id());
event_man.subscribe_group(event_grp_1_0.group_id(), event_grp_0_and_1_listener.channel_id());
event_man.add_sender(event_grp_0_and_1_listener);
event_sender
@ -653,10 +670,10 @@ mod tests {
let (event_0_tx_1, event_0_rx_1) = channel();
let event_listener_0 = EventU32SenderMpsc::new(0, event_0_tx_0);
let event_listener_1 = EventU32SenderMpsc::new(1, event_0_tx_1);
let event_listener_0_sender_id = event_listener_0.id();
let event_listener_0_sender_id = event_listener_0.channel_id();
event_man.subscribe_single(&event_0, event_listener_0_sender_id);
event_man.add_sender(event_listener_0);
let event_listener_1_sender_id = event_listener_1.id();
let event_listener_1_sender_id = event_listener_1.channel_id();
event_man.subscribe_single(&event_0, event_listener_1_sender_id);
event_man.add_sender(event_listener_1);
event_sender
@ -706,7 +723,7 @@ mod tests {
let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
let (event_0_tx_0, all_events_rx) = channel();
let all_events_listener = EventU32SenderMpsc::new(0, event_0_tx_0);
event_man.subscribe_all(all_events_listener.id());
event_man.subscribe_all(all_events_listener.channel_id());
event_man.add_sender(all_events_listener);
event_sender
.send((event_0, None))
@ -725,7 +742,29 @@ mod tests {
}
#[test]
fn test_bounded_event_sender() {
// TODO: Test with bounded sender
fn test_bounded_event_sender_queue_full() {
let (event_sender, _event_receiver) = mpsc::sync_channel(3);
let event_sender = EventU32SenderMpscBounded::new(1, event_sender, 3);
event_sender.send_no_data(TEST_EVENT).expect("sending test event failed");
event_sender.send_no_data(TEST_EVENT).expect("sending test event failed");
event_sender.send_no_data(TEST_EVENT).expect("sending test event failed");
let error = event_sender.send_no_data(TEST_EVENT);
if let Err(e) = error {
assert!(matches!(e, GenericSendError::QueueFull(Some(3))));
} else {
panic!("unexpected error {error:?}");
}
}
#[test]
fn test_bounded_event_sender_rx_dropped() {
let (event_sender, event_receiver) = mpsc::sync_channel(3);
let event_sender = EventU32SenderMpscBounded::new(1, event_sender, 3);
drop(event_receiver);
if let Err(e) = event_sender.send_no_data(TEST_EVENT) {
assert!(matches!(e, GenericSendError::RxDisconnected));
} else {
panic!("Expected error");
}
}
}

View File

@ -30,7 +30,7 @@ fn test_threaded_usage() {
let (pus_event_man_tx, pus_event_man_rx) = channel();
let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx);
event_man.subscribe_all(pus_event_man_send_provider.id());
event_man.subscribe_all(pus_event_man_send_provider.channel_id());
event_man.add_sender(pus_event_man_send_provider);
let (event_tx, event_rx) = channel();
let reporter = EventReporter::new(0x02, 128).expect("Creating event reporter failed");