simplified event management #172

Merged
muellerr merged 1 commits from simplify-event-management into main 2024-04-24 15:58:01 +02:00
11 changed files with 257 additions and 232 deletions
Showing only changes of commit 5ec5124ea3 - Show all commits

View File

@ -1,14 +1,17 @@
# Events # Events
Events can be an extremely important mechanism used for remote systems to monitor unexpected Events are an important mechanism used for remote systems to monitor unexpected
or expected anomalies and events occuring on these systems. They are oftentimes tied to or expected anomalies and events occuring on these systems.
One common use case for events on remote systems is to offer a light-weight publish-subscribe
mechanism and IPC mechanism for software and hardware events which are also packaged as telemetry
(TM) or can trigger a system response. They can also be tied to
Fault Detection, Isolation and Recovery (FDIR) operations, which need to happen autonomously. Fault Detection, Isolation and Recovery (FDIR) operations, which need to happen autonomously.
Events can also be used as a convenient Inter-Process Communication (IPC) mechansism, which is The PUS Service 5 standardizes how the ground interface for events might look like, but does not
also observable for the Ground segment. The PUS Service 5 standardizes how the ground interface specify how other software components might react to those events. There is the PUS Service 19,
for events might look like, but does not specify how other software components might react which might be used for that purpose, but the event components recommended by this framework do not
to those events. There is the PUS Service 19, which might be used for that purpose, but the rely on the present of this service.
event components recommended by this framework do not really need this service.
The following images shows how the flow of events could look like in a system where components The following images shows how the flow of events could look like in a system where components
can generate events, and where other system components might be interested in those events: can generate events, and where other system components might be interested in those events:

View File

@ -10,6 +10,7 @@ class Apid(enum.IntEnum):
GENERIC_PUS = 2 GENERIC_PUS = 2
ACS = 3 ACS = 3
CFDP = 4 CFDP = 4
TMTC = 5
class EventSeverity(enum.IntEnum): class EventSeverity(enum.IntEnum):

View File

@ -144,7 +144,9 @@ class PusHandler(GenericApidHandlerBase):
) )
src_data = tm_packet.source_data src_data = tm_packet.source_data
event_u32 = EventU32.unpack(src_data) event_u32 = EventU32.unpack(src_data)
_LOGGER.info(f"Received event packet. Event: {event_u32}") _LOGGER.info(
f"Received event packet. Source APID: {Apid(tm_packet.apid)!r}, Event: {event_u32}"
)
if event_u32.group_id == 0 and event_u32.unique_id == 0: if event_u32.group_id == 0 and event_u32.unique_id == 0:
_LOGGER.info("Received test event") _LOGGER.info("Received test event")
elif service == 17: elif service == 17:

View File

@ -8,13 +8,10 @@ use satrs::pus::verification::VerificationReporter;
use satrs::pus::EcssTmSender; use satrs::pus::EcssTmSender;
use satrs::request::UniqueApidTargetId; use satrs::request::UniqueApidTargetId;
use satrs::{ use satrs::{
event_man::{ event_man::{EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded},
EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded,
MpscEventReceiver,
},
pus::{ pus::{
event_man::{ event_man::{
DefaultPusEventU32Dispatcher, EventReporter, EventRequest, EventRequestWithToken, DefaultPusEventU32TmCreator, EventReporter, EventRequest, EventRequestWithToken,
}, },
verification::{TcStateStarted, VerificationReportingProvider, VerificationToken}, verification::{TcStateStarted, VerificationReportingProvider, VerificationToken},
}, },
@ -40,13 +37,12 @@ impl EventTmHookProvider for EventApidSetter {
/// packets. It also handles the verification completion of PUS event service requests. /// packets. It also handles the verification completion of PUS event service requests.
pub struct PusEventHandler<TmSender: EcssTmSender> { pub struct PusEventHandler<TmSender: EcssTmSender> {
event_request_rx: mpsc::Receiver<EventRequestWithToken>, event_request_rx: mpsc::Receiver<EventRequestWithToken>,
pus_event_dispatcher: DefaultPusEventU32Dispatcher<()>, pus_event_tm_creator: DefaultPusEventU32TmCreator<EventApidSetter>,
pus_event_man_rx: mpsc::Receiver<EventMessageU32>, pus_event_man_rx: mpsc::Receiver<EventMessageU32>,
tm_sender: TmSender, tm_sender: TmSender,
time_provider: CdsTime, time_provider: CdsTime,
timestamp: [u8; 7], timestamp: [u8; 7],
verif_handler: VerificationReporter, verif_handler: VerificationReporter,
event_apid_setter: EventApidSetter,
} }
impl<TmSender: EcssTmSender> PusEventHandler<TmSender> { impl<TmSender: EcssTmSender> PusEventHandler<TmSender> {
@ -61,9 +57,16 @@ impl<TmSender: EcssTmSender> PusEventHandler<TmSender> {
// All events sent to the manager are routed to the PUS event manager, which generates PUS event // All events sent to the manager are routed to the PUS event manager, which generates PUS event
// telemetry for each event. // telemetry for each event.
let event_reporter = EventReporter::new(PUS_EVENT_MANAGEMENT.raw(), 0, 0, 128).unwrap(); let event_reporter = EventReporter::new_with_hook(
PUS_EVENT_MANAGEMENT.raw(),
0,
0,
128,
EventApidSetter::default(),
)
.unwrap();
let pus_event_dispatcher = let pus_event_dispatcher =
DefaultPusEventU32Dispatcher::new_with_default_backend(event_reporter); DefaultPusEventU32TmCreator::new_with_default_backend(event_reporter);
let pus_event_man_send_provider = EventU32SenderMpscBounded::new( let pus_event_man_send_provider = EventU32SenderMpscBounded::new(
PUS_EVENT_MANAGEMENT.raw(), PUS_EVENT_MANAGEMENT.raw(),
pus_event_man_tx, pus_event_man_tx,
@ -75,13 +78,12 @@ impl<TmSender: EcssTmSender> PusEventHandler<TmSender> {
Self { Self {
event_request_rx, event_request_rx,
pus_event_dispatcher, pus_event_tm_creator: pus_event_dispatcher,
pus_event_man_rx, pus_event_man_rx,
time_provider: CdsTime::new_with_u16_days(0, 0), time_provider: CdsTime::new_with_u16_days(0, 0),
timestamp: [0; 7], timestamp: [0; 7],
verif_handler, verif_handler,
tm_sender, tm_sender,
event_apid_setter: EventApidSetter::default(),
} }
} }
@ -95,75 +97,105 @@ impl<TmSender: EcssTmSender> PusEventHandler<TmSender> {
.completion_success(&self.tm_sender, started_token, timestamp) .completion_success(&self.tm_sender, started_token, timestamp)
.expect("Sending completion success failed"); .expect("Sending completion success failed");
}; };
// handle event requests loop {
if let Ok(event_req) = self.event_request_rx.try_recv() { // handle event requests
match event_req.request { match self.event_request_rx.try_recv() {
EventRequest::Enable(event) => { Ok(event_req) => match event_req.request {
self.pus_event_dispatcher EventRequest::Enable(event) => {
.enable_tm_for_event(&event) self.pus_event_tm_creator
.expect("Enabling TM failed"); .enable_tm_for_event(&event)
update_time(&mut self.time_provider, &mut self.timestamp); .expect("Enabling TM failed");
report_completion(event_req, &self.timestamp); update_time(&mut self.time_provider, &mut self.timestamp);
} report_completion(event_req, &self.timestamp);
EventRequest::Disable(event) => { }
self.pus_event_dispatcher EventRequest::Disable(event) => {
.disable_tm_for_event(&event) self.pus_event_tm_creator
.expect("Disabling TM failed"); .disable_tm_for_event(&event)
update_time(&mut self.time_provider, &mut self.timestamp); .expect("Disabling TM failed");
report_completion(event_req, &self.timestamp); update_time(&mut self.time_provider, &mut self.timestamp);
} report_completion(event_req, &self.timestamp);
}
},
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
log::warn!("all event request senders have disconnected");
break;
}
},
} }
} }
} }
pub fn generate_pus_event_tm(&mut self) { pub fn generate_pus_event_tm(&mut self) {
// Perform the generation of PUS event packets loop {
if let Ok(event_msg) = self.pus_event_man_rx.try_recv() { // Perform the generation of PUS event packets
update_time(&mut self.time_provider, &mut self.timestamp); match self.pus_event_man_rx.try_recv() {
let param_vec = event_msg.params().map_or(Vec::new(), |param| { Ok(event_msg) => {
param.to_vec().expect("failed to convert params to vec") update_time(&mut self.time_provider, &mut self.timestamp);
}); let param_vec = event_msg.params().map_or(Vec::new(), |param| {
self.event_apid_setter.next_apid = UniqueApidTargetId::from(event_msg.sender_id()).apid; param.to_vec().expect("failed to convert params to vec")
self.pus_event_dispatcher });
.generate_pus_event_tm_generic( // We use the TM modification hook to set the sender APID for each event.
&self.tm_sender, self.pus_event_tm_creator.reporter.tm_hook.next_apid =
&self.timestamp, UniqueApidTargetId::from(event_msg.sender_id()).apid;
event_msg.event(), self.pus_event_tm_creator
Some(&param_vec), .generate_pus_event_tm_generic(
) &self.tm_sender,
.expect("Sending TM as event failed"); &self.timestamp,
event_msg.event(),
Some(&param_vec),
)
.expect("Sending TM as event failed");
}
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
log::warn!("All event senders have disconnected");
break;
}
},
}
} }
} }
} }
/// This is a thin wrapper around the event manager which also caches the sender component pub struct EventHandler<TmSender: EcssTmSender> {
/// used to send events to the event manager. pub pus_event_handler: PusEventHandler<TmSender>,
pub struct EventManagerWrapper {
event_manager: EventManagerWithBoundedMpsc, event_manager: EventManagerWithBoundedMpsc,
event_sender: mpsc::Sender<EventMessageU32>,
} }
impl EventManagerWrapper { impl<TmSender: EcssTmSender> EventHandler<TmSender> {
pub fn new() -> Self { pub fn new(
// The sender handle is the primary sender handle for all components which want to create events. tm_sender: TmSender,
// The event manager will receive the RX handle to receive all the events. event_rx: mpsc::Receiver<EventMessageU32>,
let (event_sender, event_man_rx) = mpsc::channel(); event_request_rx: mpsc::Receiver<EventRequestWithToken>,
let event_recv = MpscEventReceiver::new(event_man_rx); ) -> Self {
let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx);
let pus_event_handler = PusEventHandler::new(
tm_sender,
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid),
&mut event_manager,
event_request_rx,
);
Self { Self {
event_manager: EventManagerWithBoundedMpsc::new(event_recv), pus_event_handler,
event_sender, event_manager,
} }
} }
// Returns a cached event sender to send events to the event manager for routing. #[allow(dead_code)]
pub fn clone_event_sender(&self) -> mpsc::Sender<EventMessageU32> {
self.event_sender.clone()
}
pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc { pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc {
&mut self.event_manager &mut self.event_manager
} }
pub fn periodic_operation(&mut self) {
self.pus_event_handler.handle_event_requests();
self.try_event_routing();
self.pus_event_handler.generate_pus_event_tm();
}
pub fn try_event_routing(&mut self) { pub fn try_event_routing(&mut self) {
let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| { let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| {
self.routing_error_handler(event_msg, error) self.routing_error_handler(event_msg, error)
@ -177,41 +209,5 @@ impl EventManagerWrapper {
} }
} }
pub struct EventHandler<TmSender: EcssTmSender> { #[cfg(test)]
pub event_man_wrapper: EventManagerWrapper, mod tests {}
pub pus_event_handler: PusEventHandler<TmSender>,
}
impl<TmSender: EcssTmSender> EventHandler<TmSender> {
pub fn new(
tm_sender: TmSender,
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
) -> Self {
let mut event_man_wrapper = EventManagerWrapper::new();
let pus_event_handler = PusEventHandler::new(
tm_sender,
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid),
event_man_wrapper.event_manager(),
event_request_rx,
);
Self {
event_man_wrapper,
pus_event_handler,
}
}
pub fn clone_event_sender(&self) -> mpsc::Sender<EventMessageU32> {
self.event_man_wrapper.clone_event_sender()
}
#[allow(dead_code)]
pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc {
self.event_man_wrapper.event_manager()
}
pub fn periodic_operation(&mut self) {
self.pus_event_handler.handle_event_requests();
self.event_man_wrapper.try_event_routing();
self.pus_event_handler.generate_pus_event_tm();
}
}

View File

@ -11,7 +11,7 @@ use crate::events::EventHandler;
use crate::interface::udp::DynamicUdpTmHandler; use crate::interface::udp::DynamicUdpTmHandler;
use crate::pus::stack::PusStack; use crate::pus::stack::PusStack;
use crate::tmtc::tc_source::{TcSourceTaskDynamic, TcSourceTaskStatic}; use crate::tmtc::tc_source::{TcSourceTaskDynamic, TcSourceTaskStatic};
use crate::tmtc::tm_sink::{TmFunnelDynamic, TmFunnelStatic}; use crate::tmtc::tm_sink::{TmSinkDynamic, TmSinkStatic};
use log::info; use log::info;
use pus::test::create_test_service_dynamic; use pus::test::create_test_service_dynamic;
use satrs::hal::std::tcp_server::ServerConfig; use satrs::hal::std::tcp_server::ServerConfig;
@ -54,11 +54,11 @@ fn static_tmtc_pool_main() {
let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool); let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool);
let shared_tc_pool_wrapper = SharedPacketPool::new(&shared_tc_pool); let shared_tc_pool_wrapper = SharedPacketPool::new(&shared_tc_pool);
let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50); let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50);
let (tm_funnel_tx, tm_funnel_rx) = mpsc::sync_channel(50); let (tm_sink_tx, tm_sink_rx) = mpsc::sync_channel(50);
let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50); let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50);
let tm_funnel_tx_sender = let tm_sink_tx_sender =
PacketSenderWithSharedPool::new(tm_funnel_tx.clone(), shared_tm_pool_wrapper.clone()); PacketSenderWithSharedPool::new(tm_sink_tx.clone(), shared_tm_pool_wrapper.clone());
let (mgm_handler_composite_tx, mgm_handler_composite_rx) = let (mgm_handler_composite_tx, mgm_handler_composite_rx) =
mpsc::channel::<GenericMessage<CompositeRequest>>(); mpsc::channel::<GenericMessage<CompositeRequest>>();
@ -80,11 +80,12 @@ fn static_tmtc_pool_main() {
// Create event handling components // Create event handling components
// These sender handles are used to send event requests, for example to enable or disable // These sender handles are used to send event requests, for example to enable or disable
// certain events. // certain events.
let (event_tx, event_rx) = mpsc::sync_channel(100);
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>(); let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
// The event task is the core handler to perform the event routing and TM handling as specified // The event task is the core handler to perform the event routing and TM handling as specified
// in the sat-rs documentation. // in the sat-rs documentation.
let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_request_rx); let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx);
let (pus_test_tx, pus_test_rx) = mpsc::channel(); let (pus_test_tx, pus_test_rx) = mpsc::channel();
let (pus_event_tx, pus_event_rx) = mpsc::channel(); let (pus_event_tx, pus_event_rx) = mpsc::channel();
@ -106,39 +107,39 @@ fn static_tmtc_pool_main() {
mode_tc_sender: pus_mode_tx, mode_tc_sender: pus_mode_tx,
}; };
let pus_test_service = create_test_service_static( let pus_test_service = create_test_service_static(
tm_funnel_tx_sender.clone(), tm_sink_tx_sender.clone(),
shared_tc_pool.clone(), shared_tc_pool.clone(),
event_handler.clone_event_sender(), event_tx.clone(),
pus_test_rx, pus_test_rx,
); );
let pus_scheduler_service = create_scheduler_service_static( let pus_scheduler_service = create_scheduler_service_static(
tm_funnel_tx_sender.clone(), tm_sink_tx_sender.clone(),
tc_source.clone(), tc_source.clone(),
pus_sched_rx, pus_sched_rx,
create_sched_tc_pool(), create_sched_tc_pool(),
); );
let pus_event_service = create_event_service_static( let pus_event_service = create_event_service_static(
tm_funnel_tx_sender.clone(), tm_sink_tx_sender.clone(),
shared_tc_pool.clone(), shared_tc_pool.clone(),
pus_event_rx, pus_event_rx,
event_request_tx, event_request_tx,
); );
let pus_action_service = create_action_service_static( let pus_action_service = create_action_service_static(
tm_funnel_tx_sender.clone(), tm_sink_tx_sender.clone(),
shared_tc_pool.clone(), shared_tc_pool.clone(),
pus_action_rx, pus_action_rx,
request_map.clone(), request_map.clone(),
pus_action_reply_rx, pus_action_reply_rx,
); );
let pus_hk_service = create_hk_service_static( let pus_hk_service = create_hk_service_static(
tm_funnel_tx_sender.clone(), tm_sink_tx_sender.clone(),
shared_tc_pool.clone(), shared_tc_pool.clone(),
pus_hk_rx, pus_hk_rx,
request_map.clone(), request_map.clone(),
pus_hk_reply_rx, pus_hk_reply_rx,
); );
let pus_mode_service = create_mode_service_static( let pus_mode_service = create_mode_service_static(
tm_funnel_tx_sender.clone(), tm_sink_tx_sender.clone(),
shared_tc_pool.clone(), shared_tc_pool.clone(),
pus_mode_rx, pus_mode_rx,
request_map, request_map,
@ -156,7 +157,7 @@ fn static_tmtc_pool_main() {
let mut tmtc_task = TcSourceTaskStatic::new( let mut tmtc_task = TcSourceTaskStatic::new(
shared_tc_pool_wrapper.clone(), shared_tc_pool_wrapper.clone(),
tc_source_rx, tc_source_rx,
PusTcDistributor::new(tm_funnel_tx_sender, pus_router), PusTcDistributor::new(tm_sink_tx_sender, pus_router),
); );
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
@ -186,10 +187,10 @@ fn static_tmtc_pool_main() {
) )
.expect("tcp server creation failed"); .expect("tcp server creation failed");
let mut tm_funnel = TmFunnelStatic::new( let mut tm_sink = TmSinkStatic::new(
shared_tm_pool_wrapper, shared_tm_pool_wrapper,
sync_tm_tcp_source, sync_tm_tcp_source,
tm_funnel_rx, tm_sink_rx,
tm_server_tx, tm_server_tx,
); );
@ -209,7 +210,7 @@ fn static_tmtc_pool_main() {
mode_leaf_interface, mode_leaf_interface,
mgm_handler_composite_rx, mgm_handler_composite_rx,
pus_hk_reply_tx, pus_hk_reply_tx,
tm_funnel_tx, tm_sink_tx,
dummy_spi_interface, dummy_spi_interface,
shared_mgm_set, shared_mgm_set,
); );
@ -240,9 +241,9 @@ fn static_tmtc_pool_main() {
info!("Starting TM funnel task"); info!("Starting TM funnel task");
let jh_tm_funnel = thread::Builder::new() let jh_tm_funnel = thread::Builder::new()
.name("TM Funnel".to_string()) .name("tm sink".to_string())
.spawn(move || loop { .spawn(move || loop {
tm_funnel.operation(); tm_sink.operation();
}) })
.unwrap(); .unwrap();
@ -314,10 +315,11 @@ fn dyn_tmtc_pool_main() {
// Create event handling components // Create event handling components
// These sender handles are used to send event requests, for example to enable or disable // These sender handles are used to send event requests, for example to enable or disable
// certain events. // certain events.
let (event_tx, event_rx) = mpsc::sync_channel(100);
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>(); let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
// The event task is the core handler to perform the event routing and TM handling as specified // The event task is the core handler to perform the event routing and TM handling as specified
// in the sat-rs documentation. // in the sat-rs documentation.
let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_request_rx); let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_rx, event_request_rx);
let (pus_test_tx, pus_test_rx) = mpsc::channel(); let (pus_test_tx, pus_test_rx) = mpsc::channel();
let (pus_event_tx, pus_event_rx) = mpsc::channel(); let (pus_event_tx, pus_event_rx) = mpsc::channel();
@ -339,11 +341,8 @@ fn dyn_tmtc_pool_main() {
mode_tc_sender: pus_mode_tx, mode_tc_sender: pus_mode_tx,
}; };
let pus_test_service = create_test_service_dynamic( let pus_test_service =
tm_funnel_tx.clone(), create_test_service_dynamic(tm_funnel_tx.clone(), event_tx.clone(), pus_test_rx);
event_handler.clone_event_sender(),
pus_test_rx,
);
let pus_scheduler_service = create_scheduler_service_dynamic( let pus_scheduler_service = create_scheduler_service_dynamic(
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
tc_source_tx.clone(), tc_source_tx.clone(),
@ -411,7 +410,7 @@ fn dyn_tmtc_pool_main() {
) )
.expect("tcp server creation failed"); .expect("tcp server creation failed");
let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); let mut tm_funnel = TmSinkDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx);
let (mgm_handler_mode_reply_to_parent_tx, _mgm_handler_mode_reply_to_parent_rx) = let (mgm_handler_mode_reply_to_parent_tx, _mgm_handler_mode_reply_to_parent_rx) =
mpsc::channel(); mpsc::channel();

View File

@ -23,7 +23,7 @@ use super::HandlingStatus;
pub fn create_test_service_static( pub fn create_test_service_static(
tm_sender: PacketSenderWithSharedPool, tm_sender: PacketSenderWithSharedPool,
tc_pool: SharedStaticMemoryPool, tc_pool: SharedStaticMemoryPool,
event_sender: mpsc::Sender<EventMessageU32>, event_sender: mpsc::SyncSender<EventMessageU32>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>, pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
) -> TestCustomServiceWrapper<PacketSenderWithSharedPool, EcssTcInSharedStoreConverter> { ) -> TestCustomServiceWrapper<PacketSenderWithSharedPool, EcssTcInSharedStoreConverter> {
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
@ -41,7 +41,7 @@ pub fn create_test_service_static(
pub fn create_test_service_dynamic( pub fn create_test_service_dynamic(
tm_funnel_tx: mpsc::Sender<PacketAsVec>, tm_funnel_tx: mpsc::Sender<PacketAsVec>,
event_sender: mpsc::Sender<EventMessageU32>, event_sender: mpsc::SyncSender<EventMessageU32>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>, pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
) -> TestCustomServiceWrapper<MpscTmAsVecSender, EcssTcInVecConverter> { ) -> TestCustomServiceWrapper<MpscTmAsVecSender, EcssTcInVecConverter> {
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
@ -61,7 +61,7 @@ pub struct TestCustomServiceWrapper<TmSender: EcssTmSender, TcInMemConverter: Ec
{ {
pub handler: pub handler:
PusService17TestHandler<MpscTcReceiver, TmSender, TcInMemConverter, VerificationReporter>, PusService17TestHandler<MpscTcReceiver, TmSender, TcInMemConverter, VerificationReporter>,
pub test_srv_event_sender: mpsc::Sender<EventMessageU32>, pub test_srv_event_sender: mpsc::SyncSender<EventMessageU32>,
} }
impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter> impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter>

View File

@ -70,18 +70,23 @@ impl TmFunnelCommon {
} }
fn packet_printout(tm: &PusTmZeroCopyWriter) { fn packet_printout(tm: &PusTmZeroCopyWriter) {
info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice()); info!(
"Sending PUS TM[{},{}] with APID {}",
tm.service(),
tm.subservice(),
tm.apid()
);
} }
} }
pub struct TmFunnelStatic { pub struct TmSinkStatic {
common: TmFunnelCommon, common: TmFunnelCommon,
shared_tm_store: SharedPacketPool, shared_tm_store: SharedPacketPool,
tm_funnel_rx: mpsc::Receiver<PacketInPool>, tm_funnel_rx: mpsc::Receiver<PacketInPool>,
tm_server_tx: mpsc::SyncSender<PacketInPool>, tm_server_tx: mpsc::SyncSender<PacketInPool>,
} }
impl TmFunnelStatic { impl TmSinkStatic {
pub fn new( pub fn new(
shared_tm_store: SharedPacketPool, shared_tm_store: SharedPacketPool,
sync_tm_tcp_source: SyncTcpTmSource, sync_tm_tcp_source: SyncTcpTmSource,
@ -121,13 +126,13 @@ impl TmFunnelStatic {
} }
} }
pub struct TmFunnelDynamic { pub struct TmSinkDynamic {
common: TmFunnelCommon, common: TmFunnelCommon,
tm_funnel_rx: mpsc::Receiver<PacketAsVec>, tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
tm_server_tx: mpsc::Sender<PacketAsVec>, tm_server_tx: mpsc::Sender<PacketAsVec>,
} }
impl TmFunnelDynamic { impl TmSinkDynamic {
pub fn new( pub fn new(
sync_tm_tcp_source: SyncTcpTmSource, sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: mpsc::Receiver<PacketAsVec>, tm_funnel_rx: mpsc::Receiver<PacketAsVec>,

View File

@ -8,6 +8,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
# [unreleased] # [unreleased]
# [v0.2.0-rc.5] 2024-04-23
## Changed
- Removed `MpscEventReceiver`, the `EventReceiveProvider` trait is implemented directly
on `mpsc::Receiver<EventMessage<Event>>`
- Renamed `PusEventDispatcher` to `PusEventTmCreatorWithMap`.
- Renamed `DefaultPusEventU32Dispatcher` to `DefaultPusEventU32EventCreator`.
- Renamed `PusEventMgmtBackendProvider` renamed to `PusEventReportingMap`.
# [v0.2.0-rc.4] 2024-04-23 # [v0.2.0-rc.4] 2024-04-23
## Changed ## Changed

View File

@ -1,14 +1,12 @@
//! Event management and forwarding //! Event management and forwarding
//! //!
//! This module provides components to perform event routing. The most important component for this
//! task is the [EventManager]. It receives all events and then routes them to event subscribers
//! where appropriate. One common use case for satellite systems is to offer a light-weight
//! publish-subscribe mechanism and IPC mechanism for software and hardware events which are also
//! packaged as telemetry (TM) or can trigger a system response.
//!
//! It is recommended to read the //! It is recommended to read the
//! [sat-rs book chapter](https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/book/events.html) //! [sat-rs book chapter](https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/book/events.html)
//! about events first: //! about events first.
//!
//! This module provides components to perform event routing. The most important component for this
//! task is the [EventManager]. It receives all events and then routes them to event subscribers
//! where appropriate.
//! //!
//! The event manager has a listener table abstracted by the [ListenerMapProvider], which maps //! The event manager has a listener table abstracted by the [ListenerMapProvider], which maps
//! listener groups identified by [ListenerKey]s to a [listener ID][ComponentId]. //! listener groups identified by [ListenerKey]s to a [listener ID][ComponentId].
@ -21,8 +19,8 @@
//! //!
//! 1. Provide a concrete [EventReceiveProvider] implementation. This abstraction allow to use different //! 1. Provide a concrete [EventReceiveProvider] implementation. This abstraction allow to use different
//! message queue backends. A straightforward implementation where dynamic memory allocation is //! message queue backends. A straightforward implementation where dynamic memory allocation is
//! not a big concern could use [std::sync::mpsc::channel] to do this and is provided in //! not a big concern would be to use the [std::sync::mpsc::Receiver] handle. The trait is
//! form of the [MpscEventReceiver]. //! already implemented for this type.
//! 2. To set up event creators, create channel pairs using some message queue implementation. //! 2. To set up event creators, create channel pairs using some message queue implementation.
//! Each event creator gets a (cloned) sender component which allows it to send events to the //! Each event creator gets a (cloned) sender component which allows it to send events to the
//! manager. //! manager.
@ -44,6 +42,12 @@
//! You can check [integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/pus_events.rs) //! You can check [integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/pus_events.rs)
//! for a concrete example using multi-threading where events are routed to //! for a concrete example using multi-threading where events are routed to
//! different threads. //! different threads.
//!
//! The [satrs-example](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example)
//! also contains a full event manager instance and exposes a test event via the PUS test service.
//! The [PUS event](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/pus/event.rs)
//! module and the generic [events module](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/events.rs)
//! show how the event management modules can be integrated into a more complex software.
use crate::events::{EventU16, EventU32, GenericEvent, LargestEventRaw, LargestGroupIdRaw}; use crate::events::{EventU16, EventU32, GenericEvent, LargestEventRaw, LargestGroupIdRaw};
use crate::params::Params; use crate::params::Params;
use crate::queue::GenericSendError; use crate::queue::GenericSendError;
@ -157,9 +161,10 @@ pub trait SenderMapProvider<
/// * `SenderMap`: [SenderMapProvider] which maps channel IDs to send providers. /// * `SenderMap`: [SenderMapProvider] which maps channel IDs to send providers.
/// * `ListenerMap`: [ListenerMapProvider] which maps listener keys to channel IDs. /// * `ListenerMap`: [ListenerMapProvider] which maps listener keys to channel IDs.
/// * `EventSender`: [EventSendProvider] contained within the sender map which sends the events. /// * `EventSender`: [EventSendProvider] contained within the sender map which sends the events.
/// * `Ev`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32] /// * `Event`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32]
/// and [EventU16] are supported. /// and [EventU16] are supported.
/// * `Data`: Auxiliary data which is sent with the event to provide optional context information /// * `ParamProvider`: Auxiliary data which is sent with the event to provide optional context
/// information
pub struct EventManager< pub struct EventManager<
EventReceiver: EventReceiveProvider<Event, ParamProvider>, EventReceiver: EventReceiveProvider<Event, ParamProvider>,
SenderMap: SenderMapProvider<EventSender, Event, ParamProvider>, SenderMap: SenderMapProvider<EventSender, Event, ParamProvider>,
@ -331,11 +336,11 @@ pub mod alloc_mod {
/// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap] /// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap]
/// and the [DefaultListenerMap]. It uses regular mpsc channels as the message queue backend. /// and the [DefaultListenerMap]. It uses regular mpsc channels as the message queue backend.
pub type EventManagerWithMpsc<EV = EventU32, AUX = Params> = EventManager< pub type EventManagerWithMpsc<Event = EventU32, ParamProvider = Params> = EventManager<
MpscEventReceiver, EventU32ReceiverMpsc<ParamProvider>,
DefaultSenderMap<EventSenderMpsc<EV>, EV, AUX>, DefaultSenderMap<EventSenderMpsc<Event>, Event, ParamProvider>,
DefaultListenerMap, DefaultListenerMap,
EventSenderMpsc<EV>, EventSenderMpsc<Event>,
>; >;
/// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap] /// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap]
@ -343,7 +348,7 @@ pub mod alloc_mod {
/// [bounded mpsc senders](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) as the /// [bounded mpsc senders](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) as the
/// message queue backend. /// message queue backend.
pub type EventManagerWithBoundedMpsc<Event = EventU32, ParamProvider = Params> = EventManager< pub type EventManagerWithBoundedMpsc<Event = EventU32, ParamProvider = Params> = EventManager<
MpscEventReceiver, EventU32ReceiverMpsc<ParamProvider>,
DefaultSenderMap<EventSenderMpscBounded<Event>, Event, ParamProvider>, DefaultSenderMap<EventSenderMpscBounded<Event>, Event, ParamProvider>,
DefaultListenerMap, DefaultListenerMap,
EventSenderMpscBounded<Event>, EventSenderMpscBounded<Event>,
@ -479,20 +484,16 @@ pub mod std_mod {
use super::*; use super::*;
use std::sync::mpsc; use std::sync::mpsc;
pub struct MpscEventReceiver<Event: GenericEvent + Send = EventU32> { impl<Event: GenericEvent + Send, ParamProvider: Debug>
receiver: mpsc::Receiver<EventMessage<Event>>, EventReceiveProvider<Event, ParamProvider>
} for mpsc::Receiver<EventMessage<Event, ParamProvider>>
{
impl<Event: GenericEvent + Send> MpscEventReceiver<Event> {
pub fn new(receiver: mpsc::Receiver<EventMessage<Event>>) -> Self {
Self { receiver }
}
}
impl<Event: GenericEvent + Send> EventReceiveProvider<Event> for MpscEventReceiver<Event> {
type Error = GenericReceiveError; type Error = GenericReceiveError;
fn try_recv_event(&self) -> Result<Option<EventMessage<Event>>, Self::Error> { fn try_recv_event(
match self.receiver.try_recv() { &self,
) -> Result<Option<EventMessage<Event, ParamProvider>>, Self::Error> {
match self.try_recv() {
Ok(msg) => Ok(Some(msg)), Ok(msg) => Ok(Some(msg)),
Err(e) => match e { Err(e) => match e {
mpsc::TryRecvError::Empty => Ok(None), mpsc::TryRecvError::Empty => Ok(None),
@ -504,8 +505,10 @@ pub mod std_mod {
} }
} }
pub type MpscEventU32Receiver = MpscEventReceiver<EventU32>; pub type EventU32ReceiverMpsc<ParamProvider = Params> =
pub type MpscEventU16Receiver = MpscEventReceiver<EventU16>; mpsc::Receiver<EventMessage<EventU32, ParamProvider>>;
pub type EventU16ReceiverMpsc<ParamProvider = Params> =
mpsc::Receiver<EventMessage<EventU16, ParamProvider>>;
/// Generic event sender which uses a regular [mpsc::Sender] as the messaging backend to /// Generic event sender which uses a regular [mpsc::Sender] as the messaging backend to
/// send events. /// send events.
@ -624,9 +627,8 @@ mod tests {
} }
fn generic_event_man() -> (mpsc::Sender<EventMessageU32>, EventManagerWithMpsc) { fn generic_event_man() -> (mpsc::Sender<EventMessageU32>, EventManagerWithMpsc) {
let (event_sender, manager_queue) = mpsc::channel(); let (event_sender, event_receiver) = mpsc::channel();
let event_man_receiver = MpscEventReceiver::new(manager_queue); (event_sender, EventManager::new(event_receiver))
(event_sender, EventManager::new(event_man_receiver))
} }
#[test] #[test]
@ -793,9 +795,8 @@ mod tests {
let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| { let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| {
panic!("routing error occurred for event {:?}: {:?}", event_msg, e); panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
}; };
let (event_sender, manager_queue) = mpsc::channel(); let (event_sender, event_receiver) = mpsc::channel();
let event_man_receiver = MpscEventReceiver::new(manager_queue); let mut event_man = EventManagerWithMpsc::new(event_receiver);
let mut event_man = EventManagerWithMpsc::new(event_man_receiver);
let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap(); let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap();
let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
let (event_0_tx_0, all_events_rx) = mpsc::channel(); let (event_0_tx_0, all_events_rx) = mpsc::channel();

View File

@ -28,7 +28,7 @@ pub use heapless_mod::*;
/// structure to track disabled events. A more primitive and embedded friendly /// structure to track disabled events. A more primitive and embedded friendly
/// solution could track this information in a static or pre-allocated list which contains /// solution could track this information in a static or pre-allocated list which contains
/// the disabled events. /// the disabled events.
pub trait PusEventMgmtBackendProvider<Event: GenericEvent> { pub trait PusEventReportingMapProvider<Event: GenericEvent> {
type Error; type Error;
fn event_enabled(&self, event: &Event) -> bool; fn event_enabled(&self, event: &Event) -> bool;
@ -56,7 +56,7 @@ pub mod heapless_mod {
{ {
} }
impl<const N: usize, Provider: GenericEvent> PusEventMgmtBackendProvider<Provider> impl<const N: usize, Provider: GenericEvent> PusEventReportingMapProvider<Provider>
for HeaplessPusMgmtBackendProvider<N, Provider> for HeaplessPusMgmtBackendProvider<N, Provider>
{ {
type Error = (); type Error = ();
@ -105,20 +105,23 @@ impl From<EcssTmtcError> for EventManError {
pub mod alloc_mod { pub mod alloc_mod {
use core::marker::PhantomData; use core::marker::PhantomData;
use crate::events::EventU16; use crate::{
events::EventU16,
pus::event::{DummyEventHook, EventTmHookProvider},
};
use super::*; use super::*;
/// Default backend provider which uses a hash set as the event reporting status container /// Default backend provider which uses a hash set as the event reporting status container
/// like mentioned in the example of the [PusEventMgmtBackendProvider] documentation. /// like mentioned in the example of the [PusEventReportingMapProvider] documentation.
/// ///
/// This provider is a good option for host systems or larger embedded systems where /// This provider is a good option for host systems or larger embedded systems where
/// the expected occasional memory allocation performed by the [HashSet] is not an issue. /// the expected occasional memory allocation performed by the [HashSet] is not an issue.
pub struct DefaultPusEventMgmtBackend<Event: GenericEvent = EventU32> { pub struct DefaultPusEventReportingMap<Event: GenericEvent = EventU32> {
disabled: HashSet<Event>, disabled: HashSet<Event>,
} }
impl<Event: GenericEvent> Default for DefaultPusEventMgmtBackend<Event> { impl<Event: GenericEvent> Default for DefaultPusEventReportingMap<Event> {
fn default() -> Self { fn default() -> Self {
Self { Self {
disabled: HashSet::default(), disabled: HashSet::default(),
@ -126,51 +129,54 @@ pub mod alloc_mod {
} }
} }
impl<EV: GenericEvent + PartialEq + Eq + Hash + Copy + Clone> PusEventMgmtBackendProvider<EV> impl<Event: GenericEvent + PartialEq + Eq + Hash + Copy + Clone>
for DefaultPusEventMgmtBackend<EV> PusEventReportingMapProvider<Event> for DefaultPusEventReportingMap<Event>
{ {
type Error = (); type Error = ();
fn event_enabled(&self, event: &EV) -> bool { fn event_enabled(&self, event: &Event) -> bool {
!self.disabled.contains(event) !self.disabled.contains(event)
} }
fn enable_event_reporting(&mut self, event: &EV) -> Result<bool, Self::Error> { fn enable_event_reporting(&mut self, event: &Event) -> Result<bool, Self::Error> {
Ok(self.disabled.remove(event)) Ok(self.disabled.remove(event))
} }
fn disable_event_reporting(&mut self, event: &EV) -> Result<bool, Self::Error> { fn disable_event_reporting(&mut self, event: &Event) -> Result<bool, Self::Error> {
Ok(self.disabled.insert(*event)) Ok(self.disabled.insert(*event))
} }
} }
pub struct PusEventDispatcher< pub struct PusEventTmCreatorWithMap<
B: PusEventMgmtBackendProvider<EV, Error = E>, ReportingMap: PusEventReportingMapProvider<Event>,
EV: GenericEvent, Event: GenericEvent,
E, EventTmHook: EventTmHookProvider = DummyEventHook,
> { > {
reporter: EventReporter, pub reporter: EventReporter<EventTmHook>,
backend: B, reporting_map: ReportingMap,
phantom: PhantomData<(E, EV)>, phantom: PhantomData<Event>,
} }
impl<B: PusEventMgmtBackendProvider<Event, Error = E>, Event: GenericEvent, E> impl<
PusEventDispatcher<B, Event, E> ReportingMap: PusEventReportingMapProvider<Event>,
Event: GenericEvent,
EventTmHook: EventTmHookProvider,
> PusEventTmCreatorWithMap<ReportingMap, Event, EventTmHook>
{ {
pub fn new(reporter: EventReporter, backend: B) -> Self { pub fn new(reporter: EventReporter<EventTmHook>, backend: ReportingMap) -> Self {
Self { Self {
reporter, reporter,
backend, reporting_map: backend,
phantom: PhantomData, phantom: PhantomData,
} }
} }
pub fn enable_tm_for_event(&mut self, event: &Event) -> Result<bool, E> { pub fn enable_tm_for_event(&mut self, event: &Event) -> Result<bool, ReportingMap::Error> {
self.backend.enable_event_reporting(event) self.reporting_map.enable_event_reporting(event)
} }
pub fn disable_tm_for_event(&mut self, event: &Event) -> Result<bool, E> { pub fn disable_tm_for_event(&mut self, event: &Event) -> Result<bool, ReportingMap::Error> {
self.backend.disable_event_reporting(event) self.reporting_map.disable_event_reporting(event)
} }
pub fn generate_pus_event_tm_generic( pub fn generate_pus_event_tm_generic(
@ -180,7 +186,7 @@ pub mod alloc_mod {
event: Event, event: Event,
params: Option<&[u8]>, params: Option<&[u8]>,
) -> Result<bool, EventManError> { ) -> Result<bool, EventManError> {
if !self.backend.event_enabled(&event) { if !self.reporting_map.event_enabled(&event) {
return Ok(false); return Ok(false);
} }
match event.severity() { match event.severity() {
@ -208,31 +214,33 @@ pub mod alloc_mod {
} }
} }
impl<EV: GenericEvent + Copy + PartialEq + Eq + Hash> impl<Event: GenericEvent + Copy + PartialEq + Eq + Hash, EventTmHook: EventTmHookProvider>
PusEventDispatcher<DefaultPusEventMgmtBackend<EV>, EV, ()> PusEventTmCreatorWithMap<DefaultPusEventReportingMap<Event>, Event, EventTmHook>
{ {
pub fn new_with_default_backend(reporter: EventReporter) -> Self { pub fn new_with_default_backend(reporter: EventReporter<EventTmHook>) -> Self {
Self { Self {
reporter, reporter,
backend: DefaultPusEventMgmtBackend::default(), reporting_map: DefaultPusEventReportingMap::default(),
phantom: PhantomData, phantom: PhantomData,
} }
} }
} }
impl<B: PusEventMgmtBackendProvider<EventU32, Error = E>, E> PusEventDispatcher<B, EventU32, E> { impl<ReportingMap: PusEventReportingMapProvider<EventU32>>
PusEventTmCreatorWithMap<ReportingMap, EventU32>
{
pub fn enable_tm_for_event_with_sev<Severity: HasSeverity>( pub fn enable_tm_for_event_with_sev<Severity: HasSeverity>(
&mut self, &mut self,
event: &EventU32TypedSev<Severity>, event: &EventU32TypedSev<Severity>,
) -> Result<bool, E> { ) -> Result<bool, ReportingMap::Error> {
self.backend.enable_event_reporting(event.as_ref()) self.reporting_map.enable_event_reporting(event.as_ref())
} }
pub fn disable_tm_for_event_with_sev<Severity: HasSeverity>( pub fn disable_tm_for_event_with_sev<Severity: HasSeverity>(
&mut self, &mut self,
event: &EventU32TypedSev<Severity>, event: &EventU32TypedSev<Severity>,
) -> Result<bool, E> { ) -> Result<bool, ReportingMap::Error> {
self.backend.disable_event_reporting(event.as_ref()) self.reporting_map.disable_event_reporting(event.as_ref())
} }
pub fn generate_pus_event_tm<Severity: HasSeverity>( pub fn generate_pus_event_tm<Severity: HasSeverity>(
@ -246,10 +254,10 @@ pub mod alloc_mod {
} }
} }
pub type DefaultPusEventU16Dispatcher<E> = pub type DefaultPusEventU16TmCreator<EventTmHook = DummyEventHook> =
PusEventDispatcher<DefaultPusEventMgmtBackend<EventU16>, EventU16, E>; PusEventTmCreatorWithMap<DefaultPusEventReportingMap<EventU16>, EventU16, EventTmHook>;
pub type DefaultPusEventU32Dispatcher<E> = pub type DefaultPusEventU32TmCreator<EventTmHook = DummyEventHook> =
PusEventDispatcher<DefaultPusEventMgmtBackend<EventU32>, EventU32, E>; PusEventTmCreatorWithMap<DefaultPusEventReportingMap<EventU32>, EventU32, EventTmHook>;
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
@ -265,16 +273,16 @@ mod tests {
const TEST_APID: u16 = 0x02; const TEST_APID: u16 = 0x02;
const TEST_ID: UniqueApidTargetId = UniqueApidTargetId::new(TEST_APID, 0x05); const TEST_ID: UniqueApidTargetId = UniqueApidTargetId::new(TEST_APID, 0x05);
fn create_basic_man_1() -> DefaultPusEventU32Dispatcher<()> { fn create_basic_man_1() -> DefaultPusEventU32TmCreator {
let reporter = EventReporter::new(TEST_ID.raw(), TEST_APID, 0, 128) let reporter = EventReporter::new(TEST_ID.raw(), TEST_APID, 0, 128)
.expect("Creating event repoter failed"); .expect("Creating event repoter failed");
PusEventDispatcher::new_with_default_backend(reporter) PusEventTmCreatorWithMap::new_with_default_backend(reporter)
} }
fn create_basic_man_2() -> DefaultPusEventU32Dispatcher<()> { fn create_basic_man_2() -> DefaultPusEventU32TmCreator {
let reporter = EventReporter::new(TEST_ID.raw(), TEST_APID, 0, 128) let reporter = EventReporter::new(TEST_ID.raw(), TEST_APID, 0, 128)
.expect("Creating event repoter failed"); .expect("Creating event repoter failed");
let backend = DefaultPusEventMgmtBackend::default(); let backend = DefaultPusEventReportingMap::default();
PusEventDispatcher::new(reporter, backend) PusEventTmCreatorWithMap::new(reporter, backend)
} }
#[test] #[test]

View File

@ -1,11 +1,11 @@
use satrs::event_man::{ use satrs::event_man::{
EventManagerWithMpsc, EventMessage, EventMessageU32, EventRoutingError, EventSendProvider, EventManagerWithMpsc, EventMessage, EventMessageU32, EventRoutingError, EventSendProvider,
EventU32SenderMpsc, MpscEventU32Receiver, EventU32SenderMpsc,
}; };
use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo}; use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo};
use satrs::params::U32Pair; use satrs::params::U32Pair;
use satrs::params::{Params, ParamsHeapless, WritableToBeBytes}; use satrs::params::{Params, ParamsHeapless, WritableToBeBytes};
use satrs::pus::event_man::{DefaultPusEventMgmtBackend, EventReporter, PusEventDispatcher}; use satrs::pus::event_man::{DefaultPusEventReportingMap, EventReporter, PusEventTmCreatorWithMap};
use satrs::pus::test_util::TEST_COMPONENT_ID_0; use satrs::pus::test_util::TEST_COMPONENT_ID_0;
use satrs::request::UniqueApidTargetId; use satrs::request::UniqueApidTargetId;
use satrs::tmtc::PacketAsVec; use satrs::tmtc::PacketAsVec;
@ -29,18 +29,18 @@ pub enum CustomTmSenderError {
#[test] #[test]
fn test_threaded_usage() { fn test_threaded_usage() {
let (event_sender, event_man_receiver) = mpsc::channel(); let (event_tx, event_rx) = mpsc::sync_channel(100);
let event_receiver = MpscEventU32Receiver::new(event_man_receiver); let mut event_man = EventManagerWithMpsc::new(event_rx);
let mut event_man = EventManagerWithMpsc::new(event_receiver);
let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel(); let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel();
let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx); let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx);
event_man.subscribe_all(pus_event_man_send_provider.target_id()); event_man.subscribe_all(pus_event_man_send_provider.target_id());
event_man.add_sender(pus_event_man_send_provider); event_man.add_sender(pus_event_man_send_provider);
let (event_tx, event_rx) = mpsc::channel::<PacketAsVec>(); let (event_packet_tx, event_packet_rx) = mpsc::channel::<PacketAsVec>();
let reporter = let reporter =
EventReporter::new(TEST_ID.raw(), 0x02, 0, 128).expect("Creating event reporter failed"); EventReporter::new(TEST_ID.raw(), 0x02, 0, 128).expect("Creating event reporter failed");
let pus_event_man = PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default()); let pus_event_man =
PusEventTmCreatorWithMap::new(reporter, DefaultPusEventReportingMap::default());
let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| { let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| {
panic!("received routing error for event {event_msg:?}: {error:?}"); panic!("received routing error for event {event_msg:?}: {error:?}");
}; };
@ -54,7 +54,7 @@ fn test_threaded_usage() {
Ok(event_msg) => { Ok(event_msg) => {
let gen_event = |aux_data| { let gen_event = |aux_data| {
pus_event_man.generate_pus_event_tm_generic( pus_event_man.generate_pus_event_tm_generic(
&event_tx, &event_packet_tx,
&EMPTY_STAMP, &EMPTY_STAMP,
event_msg.event(), event_msg.event(),
aux_data, aux_data,
@ -100,14 +100,14 @@ fn test_threaded_usage() {
// Event sender and TM checker thread // Event sender and TM checker thread
let jh1 = thread::spawn(move || { let jh1 = thread::spawn(move || {
event_sender event_tx
.send(EventMessage::new( .send(EventMessage::new(
TEST_COMPONENT_ID_0.id(), TEST_COMPONENT_ID_0.id(),
INFO_EVENT.into(), INFO_EVENT.into(),
)) ))
.expect("Sending info event failed"); .expect("Sending info event failed");
loop { loop {
match event_rx.try_recv() { match event_packet_rx.try_recv() {
// Event TM received successfully // Event TM received successfully
Ok(event_tm) => { Ok(event_tm) => {
let tm = PusTmReader::new(event_tm.packet.as_slice(), 7) let tm = PusTmReader::new(event_tm.packet.as_slice(), 7)
@ -129,7 +129,7 @@ fn test_threaded_usage() {
} }
} }
} }
event_sender event_tx
.send(EventMessage::new_with_params( .send(EventMessage::new_with_params(
TEST_COMPONENT_ID_0.id(), TEST_COMPONENT_ID_0.id(),
LOW_SEV_EVENT, LOW_SEV_EVENT,
@ -137,7 +137,7 @@ fn test_threaded_usage() {
)) ))
.expect("Sending low severity event failed"); .expect("Sending low severity event failed");
loop { loop {
match event_rx.try_recv() { match event_packet_rx.try_recv() {
// Event TM received successfully // Event TM received successfully
Ok(event_tm) => { Ok(event_tm) => {
let tm = PusTmReader::new(event_tm.packet.as_slice(), 7) let tm = PusTmReader::new(event_tm.packet.as_slice(), 7)