Merge pull request 'simplified event management' (#172) from simplify-event-management into main
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
Reviewed-on: #172
This commit is contained in:
commit
4736d40997
@ -1,14 +1,17 @@
|
|||||||
# Events
|
# Events
|
||||||
|
|
||||||
Events can be an extremely important mechanism used for remote systems to monitor unexpected
|
Events are an important mechanism used for remote systems to monitor unexpected
|
||||||
or expected anomalies and events occuring on these systems. They are oftentimes tied to
|
or expected anomalies and events occuring on these systems.
|
||||||
|
|
||||||
|
One common use case for events on remote systems is to offer a light-weight publish-subscribe
|
||||||
|
mechanism and IPC mechanism for software and hardware events which are also packaged as telemetry
|
||||||
|
(TM) or can trigger a system response. They can also be tied to
|
||||||
Fault Detection, Isolation and Recovery (FDIR) operations, which need to happen autonomously.
|
Fault Detection, Isolation and Recovery (FDIR) operations, which need to happen autonomously.
|
||||||
|
|
||||||
Events can also be used as a convenient Inter-Process Communication (IPC) mechansism, which is
|
The PUS Service 5 standardizes how the ground interface for events might look like, but does not
|
||||||
also observable for the Ground segment. The PUS Service 5 standardizes how the ground interface
|
specify how other software components might react to those events. There is the PUS Service 19,
|
||||||
for events might look like, but does not specify how other software components might react
|
which might be used for that purpose, but the event components recommended by this framework do not
|
||||||
to those events. There is the PUS Service 19, which might be used for that purpose, but the
|
rely on the present of this service.
|
||||||
event components recommended by this framework do not really need this service.
|
|
||||||
|
|
||||||
The following images shows how the flow of events could look like in a system where components
|
The following images shows how the flow of events could look like in a system where components
|
||||||
can generate events, and where other system components might be interested in those events:
|
can generate events, and where other system components might be interested in those events:
|
||||||
|
@ -10,6 +10,7 @@ class Apid(enum.IntEnum):
|
|||||||
GENERIC_PUS = 2
|
GENERIC_PUS = 2
|
||||||
ACS = 3
|
ACS = 3
|
||||||
CFDP = 4
|
CFDP = 4
|
||||||
|
TMTC = 5
|
||||||
|
|
||||||
|
|
||||||
class EventSeverity(enum.IntEnum):
|
class EventSeverity(enum.IntEnum):
|
||||||
|
@ -144,7 +144,9 @@ class PusHandler(GenericApidHandlerBase):
|
|||||||
)
|
)
|
||||||
src_data = tm_packet.source_data
|
src_data = tm_packet.source_data
|
||||||
event_u32 = EventU32.unpack(src_data)
|
event_u32 = EventU32.unpack(src_data)
|
||||||
_LOGGER.info(f"Received event packet. Event: {event_u32}")
|
_LOGGER.info(
|
||||||
|
f"Received event packet. Source APID: {Apid(tm_packet.apid)!r}, Event: {event_u32}"
|
||||||
|
)
|
||||||
if event_u32.group_id == 0 and event_u32.unique_id == 0:
|
if event_u32.group_id == 0 and event_u32.unique_id == 0:
|
||||||
_LOGGER.info("Received test event")
|
_LOGGER.info("Received test event")
|
||||||
elif service == 17:
|
elif service == 17:
|
||||||
|
@ -8,13 +8,10 @@ use satrs::pus::verification::VerificationReporter;
|
|||||||
use satrs::pus::EcssTmSender;
|
use satrs::pus::EcssTmSender;
|
||||||
use satrs::request::UniqueApidTargetId;
|
use satrs::request::UniqueApidTargetId;
|
||||||
use satrs::{
|
use satrs::{
|
||||||
event_man::{
|
event_man::{EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded},
|
||||||
EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded,
|
|
||||||
MpscEventReceiver,
|
|
||||||
},
|
|
||||||
pus::{
|
pus::{
|
||||||
event_man::{
|
event_man::{
|
||||||
DefaultPusEventU32Dispatcher, EventReporter, EventRequest, EventRequestWithToken,
|
DefaultPusEventU32TmCreator, EventReporter, EventRequest, EventRequestWithToken,
|
||||||
},
|
},
|
||||||
verification::{TcStateStarted, VerificationReportingProvider, VerificationToken},
|
verification::{TcStateStarted, VerificationReportingProvider, VerificationToken},
|
||||||
},
|
},
|
||||||
@ -40,13 +37,12 @@ impl EventTmHookProvider for EventApidSetter {
|
|||||||
/// packets. It also handles the verification completion of PUS event service requests.
|
/// packets. It also handles the verification completion of PUS event service requests.
|
||||||
pub struct PusEventHandler<TmSender: EcssTmSender> {
|
pub struct PusEventHandler<TmSender: EcssTmSender> {
|
||||||
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
|
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
|
||||||
pus_event_dispatcher: DefaultPusEventU32Dispatcher<()>,
|
pus_event_tm_creator: DefaultPusEventU32TmCreator<EventApidSetter>,
|
||||||
pus_event_man_rx: mpsc::Receiver<EventMessageU32>,
|
pus_event_man_rx: mpsc::Receiver<EventMessageU32>,
|
||||||
tm_sender: TmSender,
|
tm_sender: TmSender,
|
||||||
time_provider: CdsTime,
|
time_provider: CdsTime,
|
||||||
timestamp: [u8; 7],
|
timestamp: [u8; 7],
|
||||||
verif_handler: VerificationReporter,
|
verif_handler: VerificationReporter,
|
||||||
event_apid_setter: EventApidSetter,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TmSender: EcssTmSender> PusEventHandler<TmSender> {
|
impl<TmSender: EcssTmSender> PusEventHandler<TmSender> {
|
||||||
@ -61,9 +57,16 @@ impl<TmSender: EcssTmSender> PusEventHandler<TmSender> {
|
|||||||
|
|
||||||
// All events sent to the manager are routed to the PUS event manager, which generates PUS event
|
// All events sent to the manager are routed to the PUS event manager, which generates PUS event
|
||||||
// telemetry for each event.
|
// telemetry for each event.
|
||||||
let event_reporter = EventReporter::new(PUS_EVENT_MANAGEMENT.raw(), 0, 0, 128).unwrap();
|
let event_reporter = EventReporter::new_with_hook(
|
||||||
|
PUS_EVENT_MANAGEMENT.raw(),
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
128,
|
||||||
|
EventApidSetter::default(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
let pus_event_dispatcher =
|
let pus_event_dispatcher =
|
||||||
DefaultPusEventU32Dispatcher::new_with_default_backend(event_reporter);
|
DefaultPusEventU32TmCreator::new_with_default_backend(event_reporter);
|
||||||
let pus_event_man_send_provider = EventU32SenderMpscBounded::new(
|
let pus_event_man_send_provider = EventU32SenderMpscBounded::new(
|
||||||
PUS_EVENT_MANAGEMENT.raw(),
|
PUS_EVENT_MANAGEMENT.raw(),
|
||||||
pus_event_man_tx,
|
pus_event_man_tx,
|
||||||
@ -75,13 +78,12 @@ impl<TmSender: EcssTmSender> PusEventHandler<TmSender> {
|
|||||||
|
|
||||||
Self {
|
Self {
|
||||||
event_request_rx,
|
event_request_rx,
|
||||||
pus_event_dispatcher,
|
pus_event_tm_creator: pus_event_dispatcher,
|
||||||
pus_event_man_rx,
|
pus_event_man_rx,
|
||||||
time_provider: CdsTime::new_with_u16_days(0, 0),
|
time_provider: CdsTime::new_with_u16_days(0, 0),
|
||||||
timestamp: [0; 7],
|
timestamp: [0; 7],
|
||||||
verif_handler,
|
verif_handler,
|
||||||
tm_sender,
|
tm_sender,
|
||||||
event_apid_setter: EventApidSetter::default(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,36 +97,49 @@ impl<TmSender: EcssTmSender> PusEventHandler<TmSender> {
|
|||||||
.completion_success(&self.tm_sender, started_token, timestamp)
|
.completion_success(&self.tm_sender, started_token, timestamp)
|
||||||
.expect("Sending completion success failed");
|
.expect("Sending completion success failed");
|
||||||
};
|
};
|
||||||
|
loop {
|
||||||
// handle event requests
|
// handle event requests
|
||||||
if let Ok(event_req) = self.event_request_rx.try_recv() {
|
match self.event_request_rx.try_recv() {
|
||||||
match event_req.request {
|
Ok(event_req) => match event_req.request {
|
||||||
EventRequest::Enable(event) => {
|
EventRequest::Enable(event) => {
|
||||||
self.pus_event_dispatcher
|
self.pus_event_tm_creator
|
||||||
.enable_tm_for_event(&event)
|
.enable_tm_for_event(&event)
|
||||||
.expect("Enabling TM failed");
|
.expect("Enabling TM failed");
|
||||||
update_time(&mut self.time_provider, &mut self.timestamp);
|
update_time(&mut self.time_provider, &mut self.timestamp);
|
||||||
report_completion(event_req, &self.timestamp);
|
report_completion(event_req, &self.timestamp);
|
||||||
}
|
}
|
||||||
EventRequest::Disable(event) => {
|
EventRequest::Disable(event) => {
|
||||||
self.pus_event_dispatcher
|
self.pus_event_tm_creator
|
||||||
.disable_tm_for_event(&event)
|
.disable_tm_for_event(&event)
|
||||||
.expect("Disabling TM failed");
|
.expect("Disabling TM failed");
|
||||||
update_time(&mut self.time_provider, &mut self.timestamp);
|
update_time(&mut self.time_provider, &mut self.timestamp);
|
||||||
report_completion(event_req, &self.timestamp);
|
report_completion(event_req, &self.timestamp);
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
Err(e) => match e {
|
||||||
|
mpsc::TryRecvError::Empty => break,
|
||||||
|
mpsc::TryRecvError::Disconnected => {
|
||||||
|
log::warn!("all event request senders have disconnected");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn generate_pus_event_tm(&mut self) {
|
pub fn generate_pus_event_tm(&mut self) {
|
||||||
|
loop {
|
||||||
// Perform the generation of PUS event packets
|
// Perform the generation of PUS event packets
|
||||||
if let Ok(event_msg) = self.pus_event_man_rx.try_recv() {
|
match self.pus_event_man_rx.try_recv() {
|
||||||
|
Ok(event_msg) => {
|
||||||
update_time(&mut self.time_provider, &mut self.timestamp);
|
update_time(&mut self.time_provider, &mut self.timestamp);
|
||||||
let param_vec = event_msg.params().map_or(Vec::new(), |param| {
|
let param_vec = event_msg.params().map_or(Vec::new(), |param| {
|
||||||
param.to_vec().expect("failed to convert params to vec")
|
param.to_vec().expect("failed to convert params to vec")
|
||||||
});
|
});
|
||||||
self.event_apid_setter.next_apid = UniqueApidTargetId::from(event_msg.sender_id()).apid;
|
// We use the TM modification hook to set the sender APID for each event.
|
||||||
self.pus_event_dispatcher
|
self.pus_event_tm_creator.reporter.tm_hook.next_apid =
|
||||||
|
UniqueApidTargetId::from(event_msg.sender_id()).apid;
|
||||||
|
self.pus_event_tm_creator
|
||||||
.generate_pus_event_tm_generic(
|
.generate_pus_event_tm_generic(
|
||||||
&self.tm_sender,
|
&self.tm_sender,
|
||||||
&self.timestamp,
|
&self.timestamp,
|
||||||
@ -133,37 +148,54 @@ impl<TmSender: EcssTmSender> PusEventHandler<TmSender> {
|
|||||||
)
|
)
|
||||||
.expect("Sending TM as event failed");
|
.expect("Sending TM as event failed");
|
||||||
}
|
}
|
||||||
|
Err(e) => match e {
|
||||||
|
mpsc::TryRecvError::Empty => break,
|
||||||
|
mpsc::TryRecvError::Disconnected => {
|
||||||
|
log::warn!("All event senders have disconnected");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This is a thin wrapper around the event manager which also caches the sender component
|
pub struct EventHandler<TmSender: EcssTmSender> {
|
||||||
/// used to send events to the event manager.
|
pub pus_event_handler: PusEventHandler<TmSender>,
|
||||||
pub struct EventManagerWrapper {
|
|
||||||
event_manager: EventManagerWithBoundedMpsc,
|
event_manager: EventManagerWithBoundedMpsc,
|
||||||
event_sender: mpsc::Sender<EventMessageU32>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EventManagerWrapper {
|
impl<TmSender: EcssTmSender> EventHandler<TmSender> {
|
||||||
pub fn new() -> Self {
|
pub fn new(
|
||||||
// The sender handle is the primary sender handle for all components which want to create events.
|
tm_sender: TmSender,
|
||||||
// The event manager will receive the RX handle to receive all the events.
|
event_rx: mpsc::Receiver<EventMessageU32>,
|
||||||
let (event_sender, event_man_rx) = mpsc::channel();
|
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
|
||||||
let event_recv = MpscEventReceiver::new(event_man_rx);
|
) -> Self {
|
||||||
|
let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx);
|
||||||
|
let pus_event_handler = PusEventHandler::new(
|
||||||
|
tm_sender,
|
||||||
|
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid),
|
||||||
|
&mut event_manager,
|
||||||
|
event_request_rx,
|
||||||
|
);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
event_manager: EventManagerWithBoundedMpsc::new(event_recv),
|
pus_event_handler,
|
||||||
event_sender,
|
event_manager,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns a cached event sender to send events to the event manager for routing.
|
#[allow(dead_code)]
|
||||||
pub fn clone_event_sender(&self) -> mpsc::Sender<EventMessageU32> {
|
|
||||||
self.event_sender.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc {
|
pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc {
|
||||||
&mut self.event_manager
|
&mut self.event_manager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn periodic_operation(&mut self) {
|
||||||
|
self.pus_event_handler.handle_event_requests();
|
||||||
|
self.try_event_routing();
|
||||||
|
self.pus_event_handler.generate_pus_event_tm();
|
||||||
|
}
|
||||||
|
|
||||||
pub fn try_event_routing(&mut self) {
|
pub fn try_event_routing(&mut self) {
|
||||||
let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| {
|
let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| {
|
||||||
self.routing_error_handler(event_msg, error)
|
self.routing_error_handler(event_msg, error)
|
||||||
@ -177,41 +209,5 @@ impl EventManagerWrapper {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct EventHandler<TmSender: EcssTmSender> {
|
#[cfg(test)]
|
||||||
pub event_man_wrapper: EventManagerWrapper,
|
mod tests {}
|
||||||
pub pus_event_handler: PusEventHandler<TmSender>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<TmSender: EcssTmSender> EventHandler<TmSender> {
|
|
||||||
pub fn new(
|
|
||||||
tm_sender: TmSender,
|
|
||||||
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
|
|
||||||
) -> Self {
|
|
||||||
let mut event_man_wrapper = EventManagerWrapper::new();
|
|
||||||
let pus_event_handler = PusEventHandler::new(
|
|
||||||
tm_sender,
|
|
||||||
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid),
|
|
||||||
event_man_wrapper.event_manager(),
|
|
||||||
event_request_rx,
|
|
||||||
);
|
|
||||||
Self {
|
|
||||||
event_man_wrapper,
|
|
||||||
pus_event_handler,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn clone_event_sender(&self) -> mpsc::Sender<EventMessageU32> {
|
|
||||||
self.event_man_wrapper.clone_event_sender()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc {
|
|
||||||
self.event_man_wrapper.event_manager()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn periodic_operation(&mut self) {
|
|
||||||
self.pus_event_handler.handle_event_requests();
|
|
||||||
self.event_man_wrapper.try_event_routing();
|
|
||||||
self.pus_event_handler.generate_pus_event_tm();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -11,7 +11,7 @@ use crate::events::EventHandler;
|
|||||||
use crate::interface::udp::DynamicUdpTmHandler;
|
use crate::interface::udp::DynamicUdpTmHandler;
|
||||||
use crate::pus::stack::PusStack;
|
use crate::pus::stack::PusStack;
|
||||||
use crate::tmtc::tc_source::{TcSourceTaskDynamic, TcSourceTaskStatic};
|
use crate::tmtc::tc_source::{TcSourceTaskDynamic, TcSourceTaskStatic};
|
||||||
use crate::tmtc::tm_sink::{TmFunnelDynamic, TmFunnelStatic};
|
use crate::tmtc::tm_sink::{TmSinkDynamic, TmSinkStatic};
|
||||||
use log::info;
|
use log::info;
|
||||||
use pus::test::create_test_service_dynamic;
|
use pus::test::create_test_service_dynamic;
|
||||||
use satrs::hal::std::tcp_server::ServerConfig;
|
use satrs::hal::std::tcp_server::ServerConfig;
|
||||||
@ -54,11 +54,11 @@ fn static_tmtc_pool_main() {
|
|||||||
let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool);
|
let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool);
|
||||||
let shared_tc_pool_wrapper = SharedPacketPool::new(&shared_tc_pool);
|
let shared_tc_pool_wrapper = SharedPacketPool::new(&shared_tc_pool);
|
||||||
let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50);
|
let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50);
|
||||||
let (tm_funnel_tx, tm_funnel_rx) = mpsc::sync_channel(50);
|
let (tm_sink_tx, tm_sink_rx) = mpsc::sync_channel(50);
|
||||||
let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50);
|
let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50);
|
||||||
|
|
||||||
let tm_funnel_tx_sender =
|
let tm_sink_tx_sender =
|
||||||
PacketSenderWithSharedPool::new(tm_funnel_tx.clone(), shared_tm_pool_wrapper.clone());
|
PacketSenderWithSharedPool::new(tm_sink_tx.clone(), shared_tm_pool_wrapper.clone());
|
||||||
|
|
||||||
let (mgm_handler_composite_tx, mgm_handler_composite_rx) =
|
let (mgm_handler_composite_tx, mgm_handler_composite_rx) =
|
||||||
mpsc::channel::<GenericMessage<CompositeRequest>>();
|
mpsc::channel::<GenericMessage<CompositeRequest>>();
|
||||||
@ -80,11 +80,12 @@ fn static_tmtc_pool_main() {
|
|||||||
// Create event handling components
|
// Create event handling components
|
||||||
// These sender handles are used to send event requests, for example to enable or disable
|
// These sender handles are used to send event requests, for example to enable or disable
|
||||||
// certain events.
|
// certain events.
|
||||||
|
let (event_tx, event_rx) = mpsc::sync_channel(100);
|
||||||
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
|
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
|
||||||
|
|
||||||
// The event task is the core handler to perform the event routing and TM handling as specified
|
// The event task is the core handler to perform the event routing and TM handling as specified
|
||||||
// in the sat-rs documentation.
|
// in the sat-rs documentation.
|
||||||
let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_request_rx);
|
let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx);
|
||||||
|
|
||||||
let (pus_test_tx, pus_test_rx) = mpsc::channel();
|
let (pus_test_tx, pus_test_rx) = mpsc::channel();
|
||||||
let (pus_event_tx, pus_event_rx) = mpsc::channel();
|
let (pus_event_tx, pus_event_rx) = mpsc::channel();
|
||||||
@ -106,39 +107,39 @@ fn static_tmtc_pool_main() {
|
|||||||
mode_tc_sender: pus_mode_tx,
|
mode_tc_sender: pus_mode_tx,
|
||||||
};
|
};
|
||||||
let pus_test_service = create_test_service_static(
|
let pus_test_service = create_test_service_static(
|
||||||
tm_funnel_tx_sender.clone(),
|
tm_sink_tx_sender.clone(),
|
||||||
shared_tc_pool.clone(),
|
shared_tc_pool.clone(),
|
||||||
event_handler.clone_event_sender(),
|
event_tx.clone(),
|
||||||
pus_test_rx,
|
pus_test_rx,
|
||||||
);
|
);
|
||||||
let pus_scheduler_service = create_scheduler_service_static(
|
let pus_scheduler_service = create_scheduler_service_static(
|
||||||
tm_funnel_tx_sender.clone(),
|
tm_sink_tx_sender.clone(),
|
||||||
tc_source.clone(),
|
tc_source.clone(),
|
||||||
pus_sched_rx,
|
pus_sched_rx,
|
||||||
create_sched_tc_pool(),
|
create_sched_tc_pool(),
|
||||||
);
|
);
|
||||||
let pus_event_service = create_event_service_static(
|
let pus_event_service = create_event_service_static(
|
||||||
tm_funnel_tx_sender.clone(),
|
tm_sink_tx_sender.clone(),
|
||||||
shared_tc_pool.clone(),
|
shared_tc_pool.clone(),
|
||||||
pus_event_rx,
|
pus_event_rx,
|
||||||
event_request_tx,
|
event_request_tx,
|
||||||
);
|
);
|
||||||
let pus_action_service = create_action_service_static(
|
let pus_action_service = create_action_service_static(
|
||||||
tm_funnel_tx_sender.clone(),
|
tm_sink_tx_sender.clone(),
|
||||||
shared_tc_pool.clone(),
|
shared_tc_pool.clone(),
|
||||||
pus_action_rx,
|
pus_action_rx,
|
||||||
request_map.clone(),
|
request_map.clone(),
|
||||||
pus_action_reply_rx,
|
pus_action_reply_rx,
|
||||||
);
|
);
|
||||||
let pus_hk_service = create_hk_service_static(
|
let pus_hk_service = create_hk_service_static(
|
||||||
tm_funnel_tx_sender.clone(),
|
tm_sink_tx_sender.clone(),
|
||||||
shared_tc_pool.clone(),
|
shared_tc_pool.clone(),
|
||||||
pus_hk_rx,
|
pus_hk_rx,
|
||||||
request_map.clone(),
|
request_map.clone(),
|
||||||
pus_hk_reply_rx,
|
pus_hk_reply_rx,
|
||||||
);
|
);
|
||||||
let pus_mode_service = create_mode_service_static(
|
let pus_mode_service = create_mode_service_static(
|
||||||
tm_funnel_tx_sender.clone(),
|
tm_sink_tx_sender.clone(),
|
||||||
shared_tc_pool.clone(),
|
shared_tc_pool.clone(),
|
||||||
pus_mode_rx,
|
pus_mode_rx,
|
||||||
request_map,
|
request_map,
|
||||||
@ -156,7 +157,7 @@ fn static_tmtc_pool_main() {
|
|||||||
let mut tmtc_task = TcSourceTaskStatic::new(
|
let mut tmtc_task = TcSourceTaskStatic::new(
|
||||||
shared_tc_pool_wrapper.clone(),
|
shared_tc_pool_wrapper.clone(),
|
||||||
tc_source_rx,
|
tc_source_rx,
|
||||||
PusTcDistributor::new(tm_funnel_tx_sender, pus_router),
|
PusTcDistributor::new(tm_sink_tx_sender, pus_router),
|
||||||
);
|
);
|
||||||
|
|
||||||
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
|
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
|
||||||
@ -186,10 +187,10 @@ fn static_tmtc_pool_main() {
|
|||||||
)
|
)
|
||||||
.expect("tcp server creation failed");
|
.expect("tcp server creation failed");
|
||||||
|
|
||||||
let mut tm_funnel = TmFunnelStatic::new(
|
let mut tm_sink = TmSinkStatic::new(
|
||||||
shared_tm_pool_wrapper,
|
shared_tm_pool_wrapper,
|
||||||
sync_tm_tcp_source,
|
sync_tm_tcp_source,
|
||||||
tm_funnel_rx,
|
tm_sink_rx,
|
||||||
tm_server_tx,
|
tm_server_tx,
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -209,7 +210,7 @@ fn static_tmtc_pool_main() {
|
|||||||
mode_leaf_interface,
|
mode_leaf_interface,
|
||||||
mgm_handler_composite_rx,
|
mgm_handler_composite_rx,
|
||||||
pus_hk_reply_tx,
|
pus_hk_reply_tx,
|
||||||
tm_funnel_tx,
|
tm_sink_tx,
|
||||||
dummy_spi_interface,
|
dummy_spi_interface,
|
||||||
shared_mgm_set,
|
shared_mgm_set,
|
||||||
);
|
);
|
||||||
@ -240,9 +241,9 @@ fn static_tmtc_pool_main() {
|
|||||||
|
|
||||||
info!("Starting TM funnel task");
|
info!("Starting TM funnel task");
|
||||||
let jh_tm_funnel = thread::Builder::new()
|
let jh_tm_funnel = thread::Builder::new()
|
||||||
.name("TM Funnel".to_string())
|
.name("tm sink".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
tm_funnel.operation();
|
tm_sink.operation();
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@ -314,10 +315,11 @@ fn dyn_tmtc_pool_main() {
|
|||||||
// Create event handling components
|
// Create event handling components
|
||||||
// These sender handles are used to send event requests, for example to enable or disable
|
// These sender handles are used to send event requests, for example to enable or disable
|
||||||
// certain events.
|
// certain events.
|
||||||
|
let (event_tx, event_rx) = mpsc::sync_channel(100);
|
||||||
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
|
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
|
||||||
// The event task is the core handler to perform the event routing and TM handling as specified
|
// The event task is the core handler to perform the event routing and TM handling as specified
|
||||||
// in the sat-rs documentation.
|
// in the sat-rs documentation.
|
||||||
let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_request_rx);
|
let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_rx, event_request_rx);
|
||||||
|
|
||||||
let (pus_test_tx, pus_test_rx) = mpsc::channel();
|
let (pus_test_tx, pus_test_rx) = mpsc::channel();
|
||||||
let (pus_event_tx, pus_event_rx) = mpsc::channel();
|
let (pus_event_tx, pus_event_rx) = mpsc::channel();
|
||||||
@ -339,11 +341,8 @@ fn dyn_tmtc_pool_main() {
|
|||||||
mode_tc_sender: pus_mode_tx,
|
mode_tc_sender: pus_mode_tx,
|
||||||
};
|
};
|
||||||
|
|
||||||
let pus_test_service = create_test_service_dynamic(
|
let pus_test_service =
|
||||||
tm_funnel_tx.clone(),
|
create_test_service_dynamic(tm_funnel_tx.clone(), event_tx.clone(), pus_test_rx);
|
||||||
event_handler.clone_event_sender(),
|
|
||||||
pus_test_rx,
|
|
||||||
);
|
|
||||||
let pus_scheduler_service = create_scheduler_service_dynamic(
|
let pus_scheduler_service = create_scheduler_service_dynamic(
|
||||||
tm_funnel_tx.clone(),
|
tm_funnel_tx.clone(),
|
||||||
tc_source_tx.clone(),
|
tc_source_tx.clone(),
|
||||||
@ -411,7 +410,7 @@ fn dyn_tmtc_pool_main() {
|
|||||||
)
|
)
|
||||||
.expect("tcp server creation failed");
|
.expect("tcp server creation failed");
|
||||||
|
|
||||||
let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx);
|
let mut tm_funnel = TmSinkDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx);
|
||||||
|
|
||||||
let (mgm_handler_mode_reply_to_parent_tx, _mgm_handler_mode_reply_to_parent_rx) =
|
let (mgm_handler_mode_reply_to_parent_tx, _mgm_handler_mode_reply_to_parent_rx) =
|
||||||
mpsc::channel();
|
mpsc::channel();
|
||||||
|
@ -23,7 +23,7 @@ use super::HandlingStatus;
|
|||||||
pub fn create_test_service_static(
|
pub fn create_test_service_static(
|
||||||
tm_sender: PacketSenderWithSharedPool,
|
tm_sender: PacketSenderWithSharedPool,
|
||||||
tc_pool: SharedStaticMemoryPool,
|
tc_pool: SharedStaticMemoryPool,
|
||||||
event_sender: mpsc::Sender<EventMessageU32>,
|
event_sender: mpsc::SyncSender<EventMessageU32>,
|
||||||
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
|
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
|
||||||
) -> TestCustomServiceWrapper<PacketSenderWithSharedPool, EcssTcInSharedStoreConverter> {
|
) -> TestCustomServiceWrapper<PacketSenderWithSharedPool, EcssTcInSharedStoreConverter> {
|
||||||
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
|
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
|
||||||
@ -41,7 +41,7 @@ pub fn create_test_service_static(
|
|||||||
|
|
||||||
pub fn create_test_service_dynamic(
|
pub fn create_test_service_dynamic(
|
||||||
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
|
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
|
||||||
event_sender: mpsc::Sender<EventMessageU32>,
|
event_sender: mpsc::SyncSender<EventMessageU32>,
|
||||||
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
|
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
|
||||||
) -> TestCustomServiceWrapper<MpscTmAsVecSender, EcssTcInVecConverter> {
|
) -> TestCustomServiceWrapper<MpscTmAsVecSender, EcssTcInVecConverter> {
|
||||||
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
|
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
|
||||||
@ -61,7 +61,7 @@ pub struct TestCustomServiceWrapper<TmSender: EcssTmSender, TcInMemConverter: Ec
|
|||||||
{
|
{
|
||||||
pub handler:
|
pub handler:
|
||||||
PusService17TestHandler<MpscTcReceiver, TmSender, TcInMemConverter, VerificationReporter>,
|
PusService17TestHandler<MpscTcReceiver, TmSender, TcInMemConverter, VerificationReporter>,
|
||||||
pub test_srv_event_sender: mpsc::Sender<EventMessageU32>,
|
pub test_srv_event_sender: mpsc::SyncSender<EventMessageU32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter>
|
impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter>
|
||||||
|
@ -70,18 +70,23 @@ impl TmFunnelCommon {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn packet_printout(tm: &PusTmZeroCopyWriter) {
|
fn packet_printout(tm: &PusTmZeroCopyWriter) {
|
||||||
info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice());
|
info!(
|
||||||
|
"Sending PUS TM[{},{}] with APID {}",
|
||||||
|
tm.service(),
|
||||||
|
tm.subservice(),
|
||||||
|
tm.apid()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TmFunnelStatic {
|
pub struct TmSinkStatic {
|
||||||
common: TmFunnelCommon,
|
common: TmFunnelCommon,
|
||||||
shared_tm_store: SharedPacketPool,
|
shared_tm_store: SharedPacketPool,
|
||||||
tm_funnel_rx: mpsc::Receiver<PacketInPool>,
|
tm_funnel_rx: mpsc::Receiver<PacketInPool>,
|
||||||
tm_server_tx: mpsc::SyncSender<PacketInPool>,
|
tm_server_tx: mpsc::SyncSender<PacketInPool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TmFunnelStatic {
|
impl TmSinkStatic {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
shared_tm_store: SharedPacketPool,
|
shared_tm_store: SharedPacketPool,
|
||||||
sync_tm_tcp_source: SyncTcpTmSource,
|
sync_tm_tcp_source: SyncTcpTmSource,
|
||||||
@ -121,13 +126,13 @@ impl TmFunnelStatic {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TmFunnelDynamic {
|
pub struct TmSinkDynamic {
|
||||||
common: TmFunnelCommon,
|
common: TmFunnelCommon,
|
||||||
tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
|
tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
|
||||||
tm_server_tx: mpsc::Sender<PacketAsVec>,
|
tm_server_tx: mpsc::Sender<PacketAsVec>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TmFunnelDynamic {
|
impl TmSinkDynamic {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
sync_tm_tcp_source: SyncTcpTmSource,
|
sync_tm_tcp_source: SyncTcpTmSource,
|
||||||
tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
|
tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
|
||||||
|
@ -8,6 +8,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
|
|||||||
|
|
||||||
# [unreleased]
|
# [unreleased]
|
||||||
|
|
||||||
|
# [v0.2.0-rc.5] 2024-04-23
|
||||||
|
|
||||||
|
## Changed
|
||||||
|
|
||||||
|
- Removed `MpscEventReceiver`, the `EventReceiveProvider` trait is implemented directly
|
||||||
|
on `mpsc::Receiver<EventMessage<Event>>`
|
||||||
|
- Renamed `PusEventDispatcher` to `PusEventTmCreatorWithMap`.
|
||||||
|
- Renamed `DefaultPusEventU32Dispatcher` to `DefaultPusEventU32EventCreator`.
|
||||||
|
- Renamed `PusEventMgmtBackendProvider` renamed to `PusEventReportingMap`.
|
||||||
|
|
||||||
# [v0.2.0-rc.4] 2024-04-23
|
# [v0.2.0-rc.4] 2024-04-23
|
||||||
|
|
||||||
## Changed
|
## Changed
|
||||||
|
@ -1,14 +1,12 @@
|
|||||||
//! Event management and forwarding
|
//! Event management and forwarding
|
||||||
//!
|
//!
|
||||||
//! This module provides components to perform event routing. The most important component for this
|
|
||||||
//! task is the [EventManager]. It receives all events and then routes them to event subscribers
|
|
||||||
//! where appropriate. One common use case for satellite systems is to offer a light-weight
|
|
||||||
//! publish-subscribe mechanism and IPC mechanism for software and hardware events which are also
|
|
||||||
//! packaged as telemetry (TM) or can trigger a system response.
|
|
||||||
//!
|
|
||||||
//! It is recommended to read the
|
//! It is recommended to read the
|
||||||
//! [sat-rs book chapter](https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/book/events.html)
|
//! [sat-rs book chapter](https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/book/events.html)
|
||||||
//! about events first:
|
//! about events first.
|
||||||
|
//!
|
||||||
|
//! This module provides components to perform event routing. The most important component for this
|
||||||
|
//! task is the [EventManager]. It receives all events and then routes them to event subscribers
|
||||||
|
//! where appropriate.
|
||||||
//!
|
//!
|
||||||
//! The event manager has a listener table abstracted by the [ListenerMapProvider], which maps
|
//! The event manager has a listener table abstracted by the [ListenerMapProvider], which maps
|
||||||
//! listener groups identified by [ListenerKey]s to a [listener ID][ComponentId].
|
//! listener groups identified by [ListenerKey]s to a [listener ID][ComponentId].
|
||||||
@ -21,8 +19,8 @@
|
|||||||
//!
|
//!
|
||||||
//! 1. Provide a concrete [EventReceiveProvider] implementation. This abstraction allow to use different
|
//! 1. Provide a concrete [EventReceiveProvider] implementation. This abstraction allow to use different
|
||||||
//! message queue backends. A straightforward implementation where dynamic memory allocation is
|
//! message queue backends. A straightforward implementation where dynamic memory allocation is
|
||||||
//! not a big concern could use [std::sync::mpsc::channel] to do this and is provided in
|
//! not a big concern would be to use the [std::sync::mpsc::Receiver] handle. The trait is
|
||||||
//! form of the [MpscEventReceiver].
|
//! already implemented for this type.
|
||||||
//! 2. To set up event creators, create channel pairs using some message queue implementation.
|
//! 2. To set up event creators, create channel pairs using some message queue implementation.
|
||||||
//! Each event creator gets a (cloned) sender component which allows it to send events to the
|
//! Each event creator gets a (cloned) sender component which allows it to send events to the
|
||||||
//! manager.
|
//! manager.
|
||||||
@ -44,6 +42,12 @@
|
|||||||
//! You can check [integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/pus_events.rs)
|
//! You can check [integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/pus_events.rs)
|
||||||
//! for a concrete example using multi-threading where events are routed to
|
//! for a concrete example using multi-threading where events are routed to
|
||||||
//! different threads.
|
//! different threads.
|
||||||
|
//!
|
||||||
|
//! The [satrs-example](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example)
|
||||||
|
//! also contains a full event manager instance and exposes a test event via the PUS test service.
|
||||||
|
//! The [PUS event](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/pus/event.rs)
|
||||||
|
//! module and the generic [events module](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/events.rs)
|
||||||
|
//! show how the event management modules can be integrated into a more complex software.
|
||||||
use crate::events::{EventU16, EventU32, GenericEvent, LargestEventRaw, LargestGroupIdRaw};
|
use crate::events::{EventU16, EventU32, GenericEvent, LargestEventRaw, LargestGroupIdRaw};
|
||||||
use crate::params::Params;
|
use crate::params::Params;
|
||||||
use crate::queue::GenericSendError;
|
use crate::queue::GenericSendError;
|
||||||
@ -157,9 +161,10 @@ pub trait SenderMapProvider<
|
|||||||
/// * `SenderMap`: [SenderMapProvider] which maps channel IDs to send providers.
|
/// * `SenderMap`: [SenderMapProvider] which maps channel IDs to send providers.
|
||||||
/// * `ListenerMap`: [ListenerMapProvider] which maps listener keys to channel IDs.
|
/// * `ListenerMap`: [ListenerMapProvider] which maps listener keys to channel IDs.
|
||||||
/// * `EventSender`: [EventSendProvider] contained within the sender map which sends the events.
|
/// * `EventSender`: [EventSendProvider] contained within the sender map which sends the events.
|
||||||
/// * `Ev`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32]
|
/// * `Event`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32]
|
||||||
/// and [EventU16] are supported.
|
/// and [EventU16] are supported.
|
||||||
/// * `Data`: Auxiliary data which is sent with the event to provide optional context information
|
/// * `ParamProvider`: Auxiliary data which is sent with the event to provide optional context
|
||||||
|
/// information
|
||||||
pub struct EventManager<
|
pub struct EventManager<
|
||||||
EventReceiver: EventReceiveProvider<Event, ParamProvider>,
|
EventReceiver: EventReceiveProvider<Event, ParamProvider>,
|
||||||
SenderMap: SenderMapProvider<EventSender, Event, ParamProvider>,
|
SenderMap: SenderMapProvider<EventSender, Event, ParamProvider>,
|
||||||
@ -331,11 +336,11 @@ pub mod alloc_mod {
|
|||||||
|
|
||||||
/// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap]
|
/// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap]
|
||||||
/// and the [DefaultListenerMap]. It uses regular mpsc channels as the message queue backend.
|
/// and the [DefaultListenerMap]. It uses regular mpsc channels as the message queue backend.
|
||||||
pub type EventManagerWithMpsc<EV = EventU32, AUX = Params> = EventManager<
|
pub type EventManagerWithMpsc<Event = EventU32, ParamProvider = Params> = EventManager<
|
||||||
MpscEventReceiver,
|
EventU32ReceiverMpsc<ParamProvider>,
|
||||||
DefaultSenderMap<EventSenderMpsc<EV>, EV, AUX>,
|
DefaultSenderMap<EventSenderMpsc<Event>, Event, ParamProvider>,
|
||||||
DefaultListenerMap,
|
DefaultListenerMap,
|
||||||
EventSenderMpsc<EV>,
|
EventSenderMpsc<Event>,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
/// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap]
|
/// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap]
|
||||||
@ -343,7 +348,7 @@ pub mod alloc_mod {
|
|||||||
/// [bounded mpsc senders](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) as the
|
/// [bounded mpsc senders](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) as the
|
||||||
/// message queue backend.
|
/// message queue backend.
|
||||||
pub type EventManagerWithBoundedMpsc<Event = EventU32, ParamProvider = Params> = EventManager<
|
pub type EventManagerWithBoundedMpsc<Event = EventU32, ParamProvider = Params> = EventManager<
|
||||||
MpscEventReceiver,
|
EventU32ReceiverMpsc<ParamProvider>,
|
||||||
DefaultSenderMap<EventSenderMpscBounded<Event>, Event, ParamProvider>,
|
DefaultSenderMap<EventSenderMpscBounded<Event>, Event, ParamProvider>,
|
||||||
DefaultListenerMap,
|
DefaultListenerMap,
|
||||||
EventSenderMpscBounded<Event>,
|
EventSenderMpscBounded<Event>,
|
||||||
@ -479,20 +484,16 @@ pub mod std_mod {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
|
|
||||||
pub struct MpscEventReceiver<Event: GenericEvent + Send = EventU32> {
|
impl<Event: GenericEvent + Send, ParamProvider: Debug>
|
||||||
receiver: mpsc::Receiver<EventMessage<Event>>,
|
EventReceiveProvider<Event, ParamProvider>
|
||||||
}
|
for mpsc::Receiver<EventMessage<Event, ParamProvider>>
|
||||||
|
{
|
||||||
impl<Event: GenericEvent + Send> MpscEventReceiver<Event> {
|
|
||||||
pub fn new(receiver: mpsc::Receiver<EventMessage<Event>>) -> Self {
|
|
||||||
Self { receiver }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl<Event: GenericEvent + Send> EventReceiveProvider<Event> for MpscEventReceiver<Event> {
|
|
||||||
type Error = GenericReceiveError;
|
type Error = GenericReceiveError;
|
||||||
|
|
||||||
fn try_recv_event(&self) -> Result<Option<EventMessage<Event>>, Self::Error> {
|
fn try_recv_event(
|
||||||
match self.receiver.try_recv() {
|
&self,
|
||||||
|
) -> Result<Option<EventMessage<Event, ParamProvider>>, Self::Error> {
|
||||||
|
match self.try_recv() {
|
||||||
Ok(msg) => Ok(Some(msg)),
|
Ok(msg) => Ok(Some(msg)),
|
||||||
Err(e) => match e {
|
Err(e) => match e {
|
||||||
mpsc::TryRecvError::Empty => Ok(None),
|
mpsc::TryRecvError::Empty => Ok(None),
|
||||||
@ -504,8 +505,10 @@ pub mod std_mod {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type MpscEventU32Receiver = MpscEventReceiver<EventU32>;
|
pub type EventU32ReceiverMpsc<ParamProvider = Params> =
|
||||||
pub type MpscEventU16Receiver = MpscEventReceiver<EventU16>;
|
mpsc::Receiver<EventMessage<EventU32, ParamProvider>>;
|
||||||
|
pub type EventU16ReceiverMpsc<ParamProvider = Params> =
|
||||||
|
mpsc::Receiver<EventMessage<EventU16, ParamProvider>>;
|
||||||
|
|
||||||
/// Generic event sender which uses a regular [mpsc::Sender] as the messaging backend to
|
/// Generic event sender which uses a regular [mpsc::Sender] as the messaging backend to
|
||||||
/// send events.
|
/// send events.
|
||||||
@ -624,9 +627,8 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn generic_event_man() -> (mpsc::Sender<EventMessageU32>, EventManagerWithMpsc) {
|
fn generic_event_man() -> (mpsc::Sender<EventMessageU32>, EventManagerWithMpsc) {
|
||||||
let (event_sender, manager_queue) = mpsc::channel();
|
let (event_sender, event_receiver) = mpsc::channel();
|
||||||
let event_man_receiver = MpscEventReceiver::new(manager_queue);
|
(event_sender, EventManager::new(event_receiver))
|
||||||
(event_sender, EventManager::new(event_man_receiver))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -793,9 +795,8 @@ mod tests {
|
|||||||
let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| {
|
let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| {
|
||||||
panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
|
panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
|
||||||
};
|
};
|
||||||
let (event_sender, manager_queue) = mpsc::channel();
|
let (event_sender, event_receiver) = mpsc::channel();
|
||||||
let event_man_receiver = MpscEventReceiver::new(manager_queue);
|
let mut event_man = EventManagerWithMpsc::new(event_receiver);
|
||||||
let mut event_man = EventManagerWithMpsc::new(event_man_receiver);
|
|
||||||
let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap();
|
let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap();
|
||||||
let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
|
let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
|
||||||
let (event_0_tx_0, all_events_rx) = mpsc::channel();
|
let (event_0_tx_0, all_events_rx) = mpsc::channel();
|
||||||
|
@ -28,7 +28,7 @@ pub use heapless_mod::*;
|
|||||||
/// structure to track disabled events. A more primitive and embedded friendly
|
/// structure to track disabled events. A more primitive and embedded friendly
|
||||||
/// solution could track this information in a static or pre-allocated list which contains
|
/// solution could track this information in a static or pre-allocated list which contains
|
||||||
/// the disabled events.
|
/// the disabled events.
|
||||||
pub trait PusEventMgmtBackendProvider<Event: GenericEvent> {
|
pub trait PusEventReportingMapProvider<Event: GenericEvent> {
|
||||||
type Error;
|
type Error;
|
||||||
|
|
||||||
fn event_enabled(&self, event: &Event) -> bool;
|
fn event_enabled(&self, event: &Event) -> bool;
|
||||||
@ -56,7 +56,7 @@ pub mod heapless_mod {
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<const N: usize, Provider: GenericEvent> PusEventMgmtBackendProvider<Provider>
|
impl<const N: usize, Provider: GenericEvent> PusEventReportingMapProvider<Provider>
|
||||||
for HeaplessPusMgmtBackendProvider<N, Provider>
|
for HeaplessPusMgmtBackendProvider<N, Provider>
|
||||||
{
|
{
|
||||||
type Error = ();
|
type Error = ();
|
||||||
@ -105,20 +105,23 @@ impl From<EcssTmtcError> for EventManError {
|
|||||||
pub mod alloc_mod {
|
pub mod alloc_mod {
|
||||||
use core::marker::PhantomData;
|
use core::marker::PhantomData;
|
||||||
|
|
||||||
use crate::events::EventU16;
|
use crate::{
|
||||||
|
events::EventU16,
|
||||||
|
pus::event::{DummyEventHook, EventTmHookProvider},
|
||||||
|
};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
/// Default backend provider which uses a hash set as the event reporting status container
|
/// Default backend provider which uses a hash set as the event reporting status container
|
||||||
/// like mentioned in the example of the [PusEventMgmtBackendProvider] documentation.
|
/// like mentioned in the example of the [PusEventReportingMapProvider] documentation.
|
||||||
///
|
///
|
||||||
/// This provider is a good option for host systems or larger embedded systems where
|
/// This provider is a good option for host systems or larger embedded systems where
|
||||||
/// the expected occasional memory allocation performed by the [HashSet] is not an issue.
|
/// the expected occasional memory allocation performed by the [HashSet] is not an issue.
|
||||||
pub struct DefaultPusEventMgmtBackend<Event: GenericEvent = EventU32> {
|
pub struct DefaultPusEventReportingMap<Event: GenericEvent = EventU32> {
|
||||||
disabled: HashSet<Event>,
|
disabled: HashSet<Event>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Event: GenericEvent> Default for DefaultPusEventMgmtBackend<Event> {
|
impl<Event: GenericEvent> Default for DefaultPusEventReportingMap<Event> {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
disabled: HashSet::default(),
|
disabled: HashSet::default(),
|
||||||
@ -126,51 +129,54 @@ pub mod alloc_mod {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<EV: GenericEvent + PartialEq + Eq + Hash + Copy + Clone> PusEventMgmtBackendProvider<EV>
|
impl<Event: GenericEvent + PartialEq + Eq + Hash + Copy + Clone>
|
||||||
for DefaultPusEventMgmtBackend<EV>
|
PusEventReportingMapProvider<Event> for DefaultPusEventReportingMap<Event>
|
||||||
{
|
{
|
||||||
type Error = ();
|
type Error = ();
|
||||||
|
|
||||||
fn event_enabled(&self, event: &EV) -> bool {
|
fn event_enabled(&self, event: &Event) -> bool {
|
||||||
!self.disabled.contains(event)
|
!self.disabled.contains(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn enable_event_reporting(&mut self, event: &EV) -> Result<bool, Self::Error> {
|
fn enable_event_reporting(&mut self, event: &Event) -> Result<bool, Self::Error> {
|
||||||
Ok(self.disabled.remove(event))
|
Ok(self.disabled.remove(event))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn disable_event_reporting(&mut self, event: &EV) -> Result<bool, Self::Error> {
|
fn disable_event_reporting(&mut self, event: &Event) -> Result<bool, Self::Error> {
|
||||||
Ok(self.disabled.insert(*event))
|
Ok(self.disabled.insert(*event))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct PusEventDispatcher<
|
pub struct PusEventTmCreatorWithMap<
|
||||||
B: PusEventMgmtBackendProvider<EV, Error = E>,
|
ReportingMap: PusEventReportingMapProvider<Event>,
|
||||||
EV: GenericEvent,
|
Event: GenericEvent,
|
||||||
E,
|
EventTmHook: EventTmHookProvider = DummyEventHook,
|
||||||
> {
|
> {
|
||||||
reporter: EventReporter,
|
pub reporter: EventReporter<EventTmHook>,
|
||||||
backend: B,
|
reporting_map: ReportingMap,
|
||||||
phantom: PhantomData<(E, EV)>,
|
phantom: PhantomData<Event>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: PusEventMgmtBackendProvider<Event, Error = E>, Event: GenericEvent, E>
|
impl<
|
||||||
PusEventDispatcher<B, Event, E>
|
ReportingMap: PusEventReportingMapProvider<Event>,
|
||||||
|
Event: GenericEvent,
|
||||||
|
EventTmHook: EventTmHookProvider,
|
||||||
|
> PusEventTmCreatorWithMap<ReportingMap, Event, EventTmHook>
|
||||||
{
|
{
|
||||||
pub fn new(reporter: EventReporter, backend: B) -> Self {
|
pub fn new(reporter: EventReporter<EventTmHook>, backend: ReportingMap) -> Self {
|
||||||
Self {
|
Self {
|
||||||
reporter,
|
reporter,
|
||||||
backend,
|
reporting_map: backend,
|
||||||
phantom: PhantomData,
|
phantom: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn enable_tm_for_event(&mut self, event: &Event) -> Result<bool, E> {
|
pub fn enable_tm_for_event(&mut self, event: &Event) -> Result<bool, ReportingMap::Error> {
|
||||||
self.backend.enable_event_reporting(event)
|
self.reporting_map.enable_event_reporting(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn disable_tm_for_event(&mut self, event: &Event) -> Result<bool, E> {
|
pub fn disable_tm_for_event(&mut self, event: &Event) -> Result<bool, ReportingMap::Error> {
|
||||||
self.backend.disable_event_reporting(event)
|
self.reporting_map.disable_event_reporting(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn generate_pus_event_tm_generic(
|
pub fn generate_pus_event_tm_generic(
|
||||||
@ -180,7 +186,7 @@ pub mod alloc_mod {
|
|||||||
event: Event,
|
event: Event,
|
||||||
params: Option<&[u8]>,
|
params: Option<&[u8]>,
|
||||||
) -> Result<bool, EventManError> {
|
) -> Result<bool, EventManError> {
|
||||||
if !self.backend.event_enabled(&event) {
|
if !self.reporting_map.event_enabled(&event) {
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
match event.severity() {
|
match event.severity() {
|
||||||
@ -208,31 +214,33 @@ pub mod alloc_mod {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<EV: GenericEvent + Copy + PartialEq + Eq + Hash>
|
impl<Event: GenericEvent + Copy + PartialEq + Eq + Hash, EventTmHook: EventTmHookProvider>
|
||||||
PusEventDispatcher<DefaultPusEventMgmtBackend<EV>, EV, ()>
|
PusEventTmCreatorWithMap<DefaultPusEventReportingMap<Event>, Event, EventTmHook>
|
||||||
{
|
{
|
||||||
pub fn new_with_default_backend(reporter: EventReporter) -> Self {
|
pub fn new_with_default_backend(reporter: EventReporter<EventTmHook>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
reporter,
|
reporter,
|
||||||
backend: DefaultPusEventMgmtBackend::default(),
|
reporting_map: DefaultPusEventReportingMap::default(),
|
||||||
phantom: PhantomData,
|
phantom: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: PusEventMgmtBackendProvider<EventU32, Error = E>, E> PusEventDispatcher<B, EventU32, E> {
|
impl<ReportingMap: PusEventReportingMapProvider<EventU32>>
|
||||||
|
PusEventTmCreatorWithMap<ReportingMap, EventU32>
|
||||||
|
{
|
||||||
pub fn enable_tm_for_event_with_sev<Severity: HasSeverity>(
|
pub fn enable_tm_for_event_with_sev<Severity: HasSeverity>(
|
||||||
&mut self,
|
&mut self,
|
||||||
event: &EventU32TypedSev<Severity>,
|
event: &EventU32TypedSev<Severity>,
|
||||||
) -> Result<bool, E> {
|
) -> Result<bool, ReportingMap::Error> {
|
||||||
self.backend.enable_event_reporting(event.as_ref())
|
self.reporting_map.enable_event_reporting(event.as_ref())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn disable_tm_for_event_with_sev<Severity: HasSeverity>(
|
pub fn disable_tm_for_event_with_sev<Severity: HasSeverity>(
|
||||||
&mut self,
|
&mut self,
|
||||||
event: &EventU32TypedSev<Severity>,
|
event: &EventU32TypedSev<Severity>,
|
||||||
) -> Result<bool, E> {
|
) -> Result<bool, ReportingMap::Error> {
|
||||||
self.backend.disable_event_reporting(event.as_ref())
|
self.reporting_map.disable_event_reporting(event.as_ref())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn generate_pus_event_tm<Severity: HasSeverity>(
|
pub fn generate_pus_event_tm<Severity: HasSeverity>(
|
||||||
@ -246,10 +254,10 @@ pub mod alloc_mod {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type DefaultPusEventU16Dispatcher<E> =
|
pub type DefaultPusEventU16TmCreator<EventTmHook = DummyEventHook> =
|
||||||
PusEventDispatcher<DefaultPusEventMgmtBackend<EventU16>, EventU16, E>;
|
PusEventTmCreatorWithMap<DefaultPusEventReportingMap<EventU16>, EventU16, EventTmHook>;
|
||||||
pub type DefaultPusEventU32Dispatcher<E> =
|
pub type DefaultPusEventU32TmCreator<EventTmHook = DummyEventHook> =
|
||||||
PusEventDispatcher<DefaultPusEventMgmtBackend<EventU32>, EventU32, E>;
|
PusEventTmCreatorWithMap<DefaultPusEventReportingMap<EventU32>, EventU32, EventTmHook>;
|
||||||
}
|
}
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
@ -265,16 +273,16 @@ mod tests {
|
|||||||
const TEST_APID: u16 = 0x02;
|
const TEST_APID: u16 = 0x02;
|
||||||
const TEST_ID: UniqueApidTargetId = UniqueApidTargetId::new(TEST_APID, 0x05);
|
const TEST_ID: UniqueApidTargetId = UniqueApidTargetId::new(TEST_APID, 0x05);
|
||||||
|
|
||||||
fn create_basic_man_1() -> DefaultPusEventU32Dispatcher<()> {
|
fn create_basic_man_1() -> DefaultPusEventU32TmCreator {
|
||||||
let reporter = EventReporter::new(TEST_ID.raw(), TEST_APID, 0, 128)
|
let reporter = EventReporter::new(TEST_ID.raw(), TEST_APID, 0, 128)
|
||||||
.expect("Creating event repoter failed");
|
.expect("Creating event repoter failed");
|
||||||
PusEventDispatcher::new_with_default_backend(reporter)
|
PusEventTmCreatorWithMap::new_with_default_backend(reporter)
|
||||||
}
|
}
|
||||||
fn create_basic_man_2() -> DefaultPusEventU32Dispatcher<()> {
|
fn create_basic_man_2() -> DefaultPusEventU32TmCreator {
|
||||||
let reporter = EventReporter::new(TEST_ID.raw(), TEST_APID, 0, 128)
|
let reporter = EventReporter::new(TEST_ID.raw(), TEST_APID, 0, 128)
|
||||||
.expect("Creating event repoter failed");
|
.expect("Creating event repoter failed");
|
||||||
let backend = DefaultPusEventMgmtBackend::default();
|
let backend = DefaultPusEventReportingMap::default();
|
||||||
PusEventDispatcher::new(reporter, backend)
|
PusEventTmCreatorWithMap::new(reporter, backend)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -1,11 +1,11 @@
|
|||||||
use satrs::event_man::{
|
use satrs::event_man::{
|
||||||
EventManagerWithMpsc, EventMessage, EventMessageU32, EventRoutingError, EventSendProvider,
|
EventManagerWithMpsc, EventMessage, EventMessageU32, EventRoutingError, EventSendProvider,
|
||||||
EventU32SenderMpsc, MpscEventU32Receiver,
|
EventU32SenderMpsc,
|
||||||
};
|
};
|
||||||
use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo};
|
use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo};
|
||||||
use satrs::params::U32Pair;
|
use satrs::params::U32Pair;
|
||||||
use satrs::params::{Params, ParamsHeapless, WritableToBeBytes};
|
use satrs::params::{Params, ParamsHeapless, WritableToBeBytes};
|
||||||
use satrs::pus::event_man::{DefaultPusEventMgmtBackend, EventReporter, PusEventDispatcher};
|
use satrs::pus::event_man::{DefaultPusEventReportingMap, EventReporter, PusEventTmCreatorWithMap};
|
||||||
use satrs::pus::test_util::TEST_COMPONENT_ID_0;
|
use satrs::pus::test_util::TEST_COMPONENT_ID_0;
|
||||||
use satrs::request::UniqueApidTargetId;
|
use satrs::request::UniqueApidTargetId;
|
||||||
use satrs::tmtc::PacketAsVec;
|
use satrs::tmtc::PacketAsVec;
|
||||||
@ -29,18 +29,18 @@ pub enum CustomTmSenderError {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_threaded_usage() {
|
fn test_threaded_usage() {
|
||||||
let (event_sender, event_man_receiver) = mpsc::channel();
|
let (event_tx, event_rx) = mpsc::sync_channel(100);
|
||||||
let event_receiver = MpscEventU32Receiver::new(event_man_receiver);
|
let mut event_man = EventManagerWithMpsc::new(event_rx);
|
||||||
let mut event_man = EventManagerWithMpsc::new(event_receiver);
|
|
||||||
|
|
||||||
let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel();
|
let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel();
|
||||||
let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx);
|
let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx);
|
||||||
event_man.subscribe_all(pus_event_man_send_provider.target_id());
|
event_man.subscribe_all(pus_event_man_send_provider.target_id());
|
||||||
event_man.add_sender(pus_event_man_send_provider);
|
event_man.add_sender(pus_event_man_send_provider);
|
||||||
let (event_tx, event_rx) = mpsc::channel::<PacketAsVec>();
|
let (event_packet_tx, event_packet_rx) = mpsc::channel::<PacketAsVec>();
|
||||||
let reporter =
|
let reporter =
|
||||||
EventReporter::new(TEST_ID.raw(), 0x02, 0, 128).expect("Creating event reporter failed");
|
EventReporter::new(TEST_ID.raw(), 0x02, 0, 128).expect("Creating event reporter failed");
|
||||||
let pus_event_man = PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default());
|
let pus_event_man =
|
||||||
|
PusEventTmCreatorWithMap::new(reporter, DefaultPusEventReportingMap::default());
|
||||||
let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| {
|
let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| {
|
||||||
panic!("received routing error for event {event_msg:?}: {error:?}");
|
panic!("received routing error for event {event_msg:?}: {error:?}");
|
||||||
};
|
};
|
||||||
@ -54,7 +54,7 @@ fn test_threaded_usage() {
|
|||||||
Ok(event_msg) => {
|
Ok(event_msg) => {
|
||||||
let gen_event = |aux_data| {
|
let gen_event = |aux_data| {
|
||||||
pus_event_man.generate_pus_event_tm_generic(
|
pus_event_man.generate_pus_event_tm_generic(
|
||||||
&event_tx,
|
&event_packet_tx,
|
||||||
&EMPTY_STAMP,
|
&EMPTY_STAMP,
|
||||||
event_msg.event(),
|
event_msg.event(),
|
||||||
aux_data,
|
aux_data,
|
||||||
@ -100,14 +100,14 @@ fn test_threaded_usage() {
|
|||||||
|
|
||||||
// Event sender and TM checker thread
|
// Event sender and TM checker thread
|
||||||
let jh1 = thread::spawn(move || {
|
let jh1 = thread::spawn(move || {
|
||||||
event_sender
|
event_tx
|
||||||
.send(EventMessage::new(
|
.send(EventMessage::new(
|
||||||
TEST_COMPONENT_ID_0.id(),
|
TEST_COMPONENT_ID_0.id(),
|
||||||
INFO_EVENT.into(),
|
INFO_EVENT.into(),
|
||||||
))
|
))
|
||||||
.expect("Sending info event failed");
|
.expect("Sending info event failed");
|
||||||
loop {
|
loop {
|
||||||
match event_rx.try_recv() {
|
match event_packet_rx.try_recv() {
|
||||||
// Event TM received successfully
|
// Event TM received successfully
|
||||||
Ok(event_tm) => {
|
Ok(event_tm) => {
|
||||||
let tm = PusTmReader::new(event_tm.packet.as_slice(), 7)
|
let tm = PusTmReader::new(event_tm.packet.as_slice(), 7)
|
||||||
@ -129,7 +129,7 @@ fn test_threaded_usage() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
event_sender
|
event_tx
|
||||||
.send(EventMessage::new_with_params(
|
.send(EventMessage::new_with_params(
|
||||||
TEST_COMPONENT_ID_0.id(),
|
TEST_COMPONENT_ID_0.id(),
|
||||||
LOW_SEV_EVENT,
|
LOW_SEV_EVENT,
|
||||||
@ -137,7 +137,7 @@ fn test_threaded_usage() {
|
|||||||
))
|
))
|
||||||
.expect("Sending low severity event failed");
|
.expect("Sending low severity event failed");
|
||||||
loop {
|
loop {
|
||||||
match event_rx.try_recv() {
|
match event_packet_rx.try_recv() {
|
||||||
// Event TM received successfully
|
// Event TM received successfully
|
||||||
Ok(event_tm) => {
|
Ok(event_tm) => {
|
||||||
let tm = PusTmReader::new(event_tm.packet.as_slice(), 7)
|
let tm = PusTmReader::new(event_tm.packet.as_slice(), 7)
|
||||||
|
Loading…
Reference in New Issue
Block a user