diff --git a/satrs-book/src/events.md b/satrs-book/src/events.md index 01dbb59..e21aac9 100644 --- a/satrs-book/src/events.md +++ b/satrs-book/src/events.md @@ -1,14 +1,17 @@ # Events -Events can be an extremely important mechanism used for remote systems to monitor unexpected -or expected anomalies and events occuring on these systems. They are oftentimes tied to +Events are an important mechanism used for remote systems to monitor unexpected +or expected anomalies and events occuring on these systems. + +One common use case for events on remote systems is to offer a light-weight publish-subscribe +mechanism and IPC mechanism for software and hardware events which are also packaged as telemetry +(TM) or can trigger a system response. They can also be tied to Fault Detection, Isolation and Recovery (FDIR) operations, which need to happen autonomously. -Events can also be used as a convenient Inter-Process Communication (IPC) mechansism, which is -also observable for the Ground segment. The PUS Service 5 standardizes how the ground interface -for events might look like, but does not specify how other software components might react -to those events. There is the PUS Service 19, which might be used for that purpose, but the -event components recommended by this framework do not really need this service. +The PUS Service 5 standardizes how the ground interface for events might look like, but does not +specify how other software components might react to those events. There is the PUS Service 19, +which might be used for that purpose, but the event components recommended by this framework do not +rely on the present of this service. The following images shows how the flow of events could look like in a system where components can generate events, and where other system components might be interested in those events: diff --git a/satrs-example/satrs-tmtc/.gitignore b/satrs-example/pytmtc/.gitignore similarity index 100% rename from satrs-example/satrs-tmtc/.gitignore rename to satrs-example/pytmtc/.gitignore diff --git a/satrs-example/satrs-tmtc/common.py b/satrs-example/pytmtc/common.py similarity index 98% rename from satrs-example/satrs-tmtc/common.py rename to satrs-example/pytmtc/common.py index 6f56604..a37967e 100644 --- a/satrs-example/satrs-tmtc/common.py +++ b/satrs-example/pytmtc/common.py @@ -10,6 +10,7 @@ class Apid(enum.IntEnum): GENERIC_PUS = 2 ACS = 3 CFDP = 4 + TMTC = 5 class EventSeverity(enum.IntEnum): diff --git a/satrs-example/satrs-tmtc/main.py b/satrs-example/pytmtc/main.py similarity index 91% rename from satrs-example/satrs-tmtc/main.py rename to satrs-example/pytmtc/main.py index a3e0caf..a90a011 100755 --- a/satrs-example/satrs-tmtc/main.py +++ b/satrs-example/pytmtc/main.py @@ -103,7 +103,9 @@ class PusHandler(GenericApidHandlerBase): def handle_tm(self, apid: int, packet: bytes, _user_args: Any): try: - pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) + pus_tm = PusTelemetry.unpack( + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE + ) except ValueError as e: _LOGGER.warning("Could not generate PUS TM object from raw data") _LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") @@ -111,7 +113,7 @@ class PusHandler(GenericApidHandlerBase): service = pus_tm.service if service == 1: tm_packet = Service1Tm.unpack( - data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2) + data=packet, params=UnpackParams(CdsShortTimestamp.TIMESTAMP_SIZE, 1, 2) ) res = self.verif_wrapper.add_tm(tm_packet) if res is None: @@ -128,7 +130,9 @@ class PusHandler(GenericApidHandlerBase): elif service == 3: _LOGGER.info("No handling for HK packets implemented") _LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]") - pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) + pus_tm = PusTelemetry.unpack( + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE + ) if pus_tm.subservice == 25: if len(pus_tm.source_data) < 8: raise ValueError("No addressable ID in HK packet") @@ -136,16 +140,18 @@ class PusHandler(GenericApidHandlerBase): _LOGGER.info(json_str) elif service == 5: tm_packet = PusTelemetry.unpack( - packet, time_reader=CdsShortTimestamp.empty() + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE ) src_data = tm_packet.source_data event_u32 = EventU32.unpack(src_data) - _LOGGER.info(f"Received event packet. Event: {event_u32}") + _LOGGER.info( + f"Received event packet. Source APID: {Apid(tm_packet.apid)!r}, Event: {event_u32}" + ) if event_u32.group_id == 0 and event_u32.unique_id == 0: _LOGGER.info("Received test event") elif service == 17: tm_packet = Service17Tm.unpack( - packet, time_reader=CdsShortTimestamp.empty() + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE ) if tm_packet.subservice == 2: self.file_logger.info("Received Ping Reply TM[17,2]") @@ -162,7 +168,7 @@ class PusHandler(GenericApidHandlerBase): f"The service {service} is not implemented in Telemetry Factory" ) tm_packet = PusTelemetry.unpack( - packet, time_reader=CdsShortTimestamp.empty() + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE ) self.raw_logger.log_tm(pus_tm) @@ -197,15 +203,15 @@ class TcHandler(TcHandlerBase): _LOGGER.info(log_entry.log_str) def queue_finished_cb(self, info: ProcedureWrapper): - if info.proc_type == TcProcedureType.DEFAULT: - def_proc = info.to_def_procedure() + if info.proc_type == TcProcedureType.TREE_COMMANDING: + def_proc = info.to_tree_commanding_procedure() _LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}") def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): q = self.queue_helper q.queue_wrapper = wrapper.queue_wrapper - if info.proc_type == TcProcedureType.DEFAULT: - def_proc = info.to_def_procedure() + if info.proc_type == TcProcedureType.TREE_COMMANDING: + def_proc = info.to_tree_commanding_procedure() assert def_proc.cmd_path is not None pus_tc.pack_pus_telecommands(q, def_proc.cmd_path) @@ -256,6 +262,7 @@ def main(): while True: state = tmtc_backend.periodic_op(None) if state.request == BackendRequest.TERMINATION_NO_ERROR: + tmtc_backend.close_com_if() sys.exit(0) elif state.request == BackendRequest.DELAY_IDLE: _LOGGER.info("TMTC Client in IDLE mode") @@ -270,6 +277,7 @@ def main(): elif state.request == BackendRequest.CALL_NEXT: pass except KeyboardInterrupt: + tmtc_backend.close_com_if() sys.exit(0) diff --git a/satrs-example/satrs-tmtc/pus_tc.py b/satrs-example/pytmtc/pus_tc.py similarity index 100% rename from satrs-example/satrs-tmtc/pus_tc.py rename to satrs-example/pytmtc/pus_tc.py diff --git a/satrs-example/satrs-tmtc/pus_tm.py b/satrs-example/pytmtc/pus_tm.py similarity index 100% rename from satrs-example/satrs-tmtc/pus_tm.py rename to satrs-example/pytmtc/pus_tm.py diff --git a/satrs-example/satrs-tmtc/requirements.txt b/satrs-example/pytmtc/requirements.txt similarity index 83% rename from satrs-example/satrs-tmtc/requirements.txt rename to satrs-example/pytmtc/requirements.txt index b3f6f2a..325615c 100644 --- a/satrs-example/satrs-tmtc/requirements.txt +++ b/satrs-example/pytmtc/requirements.txt @@ -1,2 +1,2 @@ -tmtccmd == 8.0.0rc1 +tmtccmd == 8.0.0rc2 # -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd diff --git a/satrs-example/satrs-tmtc/tc_definitions.py b/satrs-example/pytmtc/tc_definitions.py similarity index 100% rename from satrs-example/satrs-tmtc/tc_definitions.py rename to satrs-example/pytmtc/tc_definitions.py diff --git a/satrs-example/satrs-tmtc/tmtc_conf.json b/satrs-example/pytmtc/tmtc_conf.json similarity index 100% rename from satrs-example/satrs-tmtc/tmtc_conf.json rename to satrs-example/pytmtc/tmtc_conf.json diff --git a/satrs-example/src/config.rs b/satrs-example/src/config.rs index 5168927..a8c495a 100644 --- a/satrs-example/src/config.rs +++ b/satrs-example/src/config.rs @@ -38,8 +38,7 @@ pub enum GroupId { pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const SERVER_PORT: u16 = 7301; -pub const TEST_EVENT: EventU32TypedSev = - EventU32TypedSev::::const_new(0, 0); +pub const TEST_EVENT: EventU32TypedSev = EventU32TypedSev::::new(0, 0); lazy_static! { pub static ref PACKET_ID_VALIDATOR: HashSet = { diff --git a/satrs-example/src/events.rs b/satrs-example/src/events.rs index 5d1bdaf..cb0caf8 100644 --- a/satrs-example/src/events.rs +++ b/satrs-example/src/events.rs @@ -8,13 +8,10 @@ use satrs::pus::verification::VerificationReporter; use satrs::pus::EcssTmSender; use satrs::request::UniqueApidTargetId; use satrs::{ - event_man::{ - EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded, - MpscEventReceiver, - }, + event_man::{EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded}, pus::{ event_man::{ - DefaultPusEventU32Dispatcher, EventReporter, EventRequest, EventRequestWithToken, + DefaultPusEventU32TmCreator, EventReporter, EventRequest, EventRequestWithToken, }, verification::{TcStateStarted, VerificationReportingProvider, VerificationToken}, }, @@ -40,13 +37,12 @@ impl EventTmHookProvider for EventApidSetter { /// packets. It also handles the verification completion of PUS event service requests. pub struct PusEventHandler { event_request_rx: mpsc::Receiver, - pus_event_dispatcher: DefaultPusEventU32Dispatcher<()>, + pus_event_tm_creator: DefaultPusEventU32TmCreator, pus_event_man_rx: mpsc::Receiver, tm_sender: TmSender, time_provider: CdsTime, timestamp: [u8; 7], verif_handler: VerificationReporter, - event_apid_setter: EventApidSetter, } impl PusEventHandler { @@ -61,9 +57,16 @@ impl PusEventHandler { // All events sent to the manager are routed to the PUS event manager, which generates PUS event // telemetry for each event. - let event_reporter = EventReporter::new(PUS_EVENT_MANAGEMENT.raw(), 0, 0, 128).unwrap(); + let event_reporter = EventReporter::new_with_hook( + PUS_EVENT_MANAGEMENT.raw(), + 0, + 0, + 128, + EventApidSetter::default(), + ) + .unwrap(); let pus_event_dispatcher = - DefaultPusEventU32Dispatcher::new_with_default_backend(event_reporter); + DefaultPusEventU32TmCreator::new_with_default_backend(event_reporter); let pus_event_man_send_provider = EventU32SenderMpscBounded::new( PUS_EVENT_MANAGEMENT.raw(), pus_event_man_tx, @@ -75,13 +78,12 @@ impl PusEventHandler { Self { event_request_rx, - pus_event_dispatcher, + pus_event_tm_creator: pus_event_dispatcher, pus_event_man_rx, time_provider: CdsTime::new_with_u16_days(0, 0), timestamp: [0; 7], verif_handler, tm_sender, - event_apid_setter: EventApidSetter::default(), } } @@ -95,75 +97,105 @@ impl PusEventHandler { .completion_success(&self.tm_sender, started_token, timestamp) .expect("Sending completion success failed"); }; - // handle event requests - if let Ok(event_req) = self.event_request_rx.try_recv() { - match event_req.request { - EventRequest::Enable(event) => { - self.pus_event_dispatcher - .enable_tm_for_event(&event) - .expect("Enabling TM failed"); - update_time(&mut self.time_provider, &mut self.timestamp); - report_completion(event_req, &self.timestamp); - } - EventRequest::Disable(event) => { - self.pus_event_dispatcher - .disable_tm_for_event(&event) - .expect("Disabling TM failed"); - update_time(&mut self.time_provider, &mut self.timestamp); - report_completion(event_req, &self.timestamp); - } + loop { + // handle event requests + match self.event_request_rx.try_recv() { + Ok(event_req) => match event_req.request { + EventRequest::Enable(event) => { + self.pus_event_tm_creator + .enable_tm_for_event(&event) + .expect("Enabling TM failed"); + update_time(&mut self.time_provider, &mut self.timestamp); + report_completion(event_req, &self.timestamp); + } + EventRequest::Disable(event) => { + self.pus_event_tm_creator + .disable_tm_for_event(&event) + .expect("Disabling TM failed"); + update_time(&mut self.time_provider, &mut self.timestamp); + report_completion(event_req, &self.timestamp); + } + }, + Err(e) => match e { + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + log::warn!("all event request senders have disconnected"); + break; + } + }, } } } pub fn generate_pus_event_tm(&mut self) { - // Perform the generation of PUS event packets - if let Ok(event_msg) = self.pus_event_man_rx.try_recv() { - update_time(&mut self.time_provider, &mut self.timestamp); - let param_vec = event_msg.params().map_or(Vec::new(), |param| { - param.to_vec().expect("failed to convert params to vec") - }); - self.event_apid_setter.next_apid = UniqueApidTargetId::from(event_msg.sender_id()).apid; - self.pus_event_dispatcher - .generate_pus_event_tm_generic( - &self.tm_sender, - &self.timestamp, - event_msg.event(), - Some(¶m_vec), - ) - .expect("Sending TM as event failed"); + loop { + // Perform the generation of PUS event packets + match self.pus_event_man_rx.try_recv() { + Ok(event_msg) => { + update_time(&mut self.time_provider, &mut self.timestamp); + let param_vec = event_msg.params().map_or(Vec::new(), |param| { + param.to_vec().expect("failed to convert params to vec") + }); + // We use the TM modification hook to set the sender APID for each event. + self.pus_event_tm_creator.reporter.tm_hook.next_apid = + UniqueApidTargetId::from(event_msg.sender_id()).apid; + self.pus_event_tm_creator + .generate_pus_event_tm_generic( + &self.tm_sender, + &self.timestamp, + event_msg.event(), + Some(¶m_vec), + ) + .expect("Sending TM as event failed"); + } + Err(e) => match e { + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + log::warn!("All event senders have disconnected"); + break; + } + }, + } } } } -/// This is a thin wrapper around the event manager which also caches the sender component -/// used to send events to the event manager. -pub struct EventManagerWrapper { +pub struct EventHandler { + pub pus_event_handler: PusEventHandler, event_manager: EventManagerWithBoundedMpsc, - event_sender: mpsc::Sender, } -impl EventManagerWrapper { - pub fn new() -> Self { - // The sender handle is the primary sender handle for all components which want to create events. - // The event manager will receive the RX handle to receive all the events. - let (event_sender, event_man_rx) = mpsc::channel(); - let event_recv = MpscEventReceiver::new(event_man_rx); +impl EventHandler { + pub fn new( + tm_sender: TmSender, + event_rx: mpsc::Receiver, + event_request_rx: mpsc::Receiver, + ) -> Self { + let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx); + let pus_event_handler = PusEventHandler::new( + tm_sender, + create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid), + &mut event_manager, + event_request_rx, + ); + Self { - event_manager: EventManagerWithBoundedMpsc::new(event_recv), - event_sender, + pus_event_handler, + event_manager, } } - // Returns a cached event sender to send events to the event manager for routing. - pub fn clone_event_sender(&self) -> mpsc::Sender { - self.event_sender.clone() - } - + #[allow(dead_code)] pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc { &mut self.event_manager } + pub fn periodic_operation(&mut self) { + self.pus_event_handler.handle_event_requests(); + self.try_event_routing(); + self.pus_event_handler.generate_pus_event_tm(); + } + pub fn try_event_routing(&mut self) { let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| { self.routing_error_handler(event_msg, error) @@ -177,41 +209,83 @@ impl EventManagerWrapper { } } -pub struct EventHandler { - pub event_man_wrapper: EventManagerWrapper, - pub pus_event_handler: PusEventHandler, -} +#[cfg(test)] +mod tests { + use satrs::{ + events::EventU32, + pus::verification::VerificationReporterCfg, + spacepackets::{ + ecss::{tm::PusTmReader, PusPacket}, + CcsdsPacket, + }, + tmtc::PacketAsVec, + }; -impl EventHandler { - pub fn new( - tm_sender: TmSender, - event_request_rx: mpsc::Receiver, - ) -> Self { - let mut event_man_wrapper = EventManagerWrapper::new(); - let pus_event_handler = PusEventHandler::new( - tm_sender, - create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid), - event_man_wrapper.event_manager(), - event_request_rx, - ); - Self { - event_man_wrapper, - pus_event_handler, + use super::*; + + const TEST_CREATOR_ID: UniqueApidTargetId = UniqueApidTargetId::new(1, 2); + const TEST_EVENT: EventU32 = EventU32::new(satrs::events::Severity::Info, 1, 1); + + pub struct EventManagementTestbench { + pub event_tx: mpsc::SyncSender, + pub event_manager: EventManagerWithBoundedMpsc, + pub tm_receiver: mpsc::Receiver, + pub pus_event_handler: PusEventHandler>, + } + + impl EventManagementTestbench { + pub fn new() -> Self { + let (event_tx, event_rx) = mpsc::sync_channel(10); + let (_event_req_tx, event_req_rx) = mpsc::sync_channel(10); + let (tm_sender, tm_receiver) = mpsc::channel(); + let verif_reporter_cfg = VerificationReporterCfg::new(0x05, 2, 2, 128).unwrap(); + let verif_reporter = + VerificationReporter::new(PUS_EVENT_MANAGEMENT.id(), &verif_reporter_cfg); + let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx); + let pus_event_handler = PusEventHandler::>::new( + tm_sender, + verif_reporter, + &mut event_manager, + event_req_rx, + ); + Self { + event_tx, + tm_receiver, + event_manager, + pus_event_handler, + } } } - pub fn clone_event_sender(&self) -> mpsc::Sender { - self.event_man_wrapper.clone_event_sender() + #[test] + fn test_basic_event_generation() { + let mut testbench = EventManagementTestbench::new(); + testbench + .event_tx + .send(EventMessageU32::new( + TEST_CREATOR_ID.id(), + EventU32::new(satrs::events::Severity::Info, 1, 1), + )) + .expect("failed to send event"); + testbench.pus_event_handler.handle_event_requests(); + testbench.event_manager.try_event_handling(|_, _| {}); + testbench.pus_event_handler.generate_pus_event_tm(); + let tm_packet = testbench + .tm_receiver + .try_recv() + .expect("failed to receive TM packet"); + assert_eq!(tm_packet.sender_id, PUS_EVENT_MANAGEMENT.id()); + let tm_reader = PusTmReader::new(&tm_packet.packet, 7) + .expect("failed to create TM reader") + .0; + assert_eq!(tm_reader.apid(), TEST_CREATOR_ID.apid); + assert_eq!(tm_reader.user_data().len(), 4); + let event_read_back = EventU32::from_be_bytes(tm_reader.user_data().try_into().unwrap()); + assert_eq!(event_read_back, TEST_EVENT); } - #[allow(dead_code)] - pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc { - self.event_man_wrapper.event_manager() - } - - pub fn periodic_operation(&mut self) { - self.pus_event_handler.handle_event_requests(); - self.event_man_wrapper.try_event_routing(); - self.pus_event_handler.generate_pus_event_tm(); + #[test] + fn test_basic_event_disabled() { + // TODO: Add test. } } diff --git a/satrs-example/src/interface/tcp.rs b/satrs-example/src/interface/tcp.rs index cc3f669..021ad31 100644 --- a/satrs-example/src/interface/tcp.rs +++ b/satrs-example/src/interface/tcp.rs @@ -1,3 +1,4 @@ +use std::time::Duration; use std::{ collections::{HashSet, VecDeque}, fmt::Debug, @@ -139,7 +140,9 @@ impl, SendError: Debug + 'static> pub fn periodic_operation(&mut self) { loop { - let result = self.0.handle_all_connections(None); + let result = self + .0 + .handle_all_connections(Some(Duration::from_millis(400))); match result { Ok(_conn_result) => (), Err(e) => { diff --git a/satrs-example/src/interface/udp.rs b/satrs-example/src/interface/udp.rs index cae1c8c..d7816e2 100644 --- a/satrs-example/src/interface/udp.rs +++ b/satrs-example/src/interface/udp.rs @@ -114,6 +114,7 @@ impl< #[cfg(test)] mod tests { + use std::net::Ipv4Addr; use std::{ cell::RefCell, collections::VecDeque, @@ -182,7 +183,7 @@ mod tests { #[test] fn test_transactions() { - let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); + let sock_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0); let test_receiver = TestSender::default(); // let tc_queue = test_receiver.tc_vec.clone(); let udp_tc_server = @@ -200,8 +201,8 @@ mod tests { .unwrap(); let client = UdpSocket::bind("127.0.0.1:0").expect("Connecting to UDP server failed"); let client_addr = client.local_addr().unwrap(); - client.connect(server_addr).unwrap(); - client.send(&ping_tc).unwrap(); + println!("{}", server_addr); + client.send_to(&ping_tc, server_addr).unwrap(); udp_dyn_server.periodic_operation(); { let mut queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow_mut(); diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index cf8e050..02138c5 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -11,7 +11,7 @@ use crate::events::EventHandler; use crate::interface::udp::DynamicUdpTmHandler; use crate::pus::stack::PusStack; use crate::tmtc::tc_source::{TcSourceTaskDynamic, TcSourceTaskStatic}; -use crate::tmtc::tm_sink::{TmFunnelDynamic, TmFunnelStatic}; +use crate::tmtc::tm_sink::{TmSinkDynamic, TmSinkStatic}; use log::info; use pus::test::create_test_service_dynamic; use satrs::hal::std::tcp_server::ServerConfig; @@ -54,11 +54,11 @@ fn static_tmtc_pool_main() { let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool); let shared_tc_pool_wrapper = SharedPacketPool::new(&shared_tc_pool); let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50); - let (tm_funnel_tx, tm_funnel_rx) = mpsc::sync_channel(50); + let (tm_sink_tx, tm_sink_rx) = mpsc::sync_channel(50); let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50); - let tm_funnel_tx_sender = - PacketSenderWithSharedPool::new(tm_funnel_tx.clone(), shared_tm_pool_wrapper.clone()); + let tm_sink_tx_sender = + PacketSenderWithSharedPool::new(tm_sink_tx.clone(), shared_tm_pool_wrapper.clone()); let (mgm_handler_composite_tx, mgm_handler_composite_rx) = mpsc::channel::>(); @@ -80,11 +80,12 @@ fn static_tmtc_pool_main() { // Create event handling components // These sender handles are used to send event requests, for example to enable or disable // certain events. + let (event_tx, event_rx) = mpsc::sync_channel(100); let (event_request_tx, event_request_rx) = mpsc::channel::(); // The event task is the core handler to perform the event routing and TM handling as specified // in the sat-rs documentation. - let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_request_rx); + let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx); let (pus_test_tx, pus_test_rx) = mpsc::channel(); let (pus_event_tx, pus_event_rx) = mpsc::channel(); @@ -106,39 +107,39 @@ fn static_tmtc_pool_main() { mode_tc_sender: pus_mode_tx, }; let pus_test_service = create_test_service_static( - tm_funnel_tx_sender.clone(), + tm_sink_tx_sender.clone(), shared_tc_pool.clone(), - event_handler.clone_event_sender(), + event_tx.clone(), pus_test_rx, ); let pus_scheduler_service = create_scheduler_service_static( - tm_funnel_tx_sender.clone(), + tm_sink_tx_sender.clone(), tc_source.clone(), pus_sched_rx, create_sched_tc_pool(), ); let pus_event_service = create_event_service_static( - tm_funnel_tx_sender.clone(), + tm_sink_tx_sender.clone(), shared_tc_pool.clone(), pus_event_rx, event_request_tx, ); let pus_action_service = create_action_service_static( - tm_funnel_tx_sender.clone(), + tm_sink_tx_sender.clone(), shared_tc_pool.clone(), pus_action_rx, request_map.clone(), pus_action_reply_rx, ); let pus_hk_service = create_hk_service_static( - tm_funnel_tx_sender.clone(), + tm_sink_tx_sender.clone(), shared_tc_pool.clone(), pus_hk_rx, request_map.clone(), pus_hk_reply_rx, ); let pus_mode_service = create_mode_service_static( - tm_funnel_tx_sender.clone(), + tm_sink_tx_sender.clone(), shared_tc_pool.clone(), pus_mode_rx, request_map, @@ -156,7 +157,7 @@ fn static_tmtc_pool_main() { let mut tmtc_task = TcSourceTaskStatic::new( shared_tc_pool_wrapper.clone(), tc_source_rx, - PusTcDistributor::new(tm_funnel_tx_sender, pus_router), + PusTcDistributor::new(tm_sink_tx_sender, pus_router), ); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); @@ -186,10 +187,10 @@ fn static_tmtc_pool_main() { ) .expect("tcp server creation failed"); - let mut tm_funnel = TmFunnelStatic::new( + let mut tm_sink = TmSinkStatic::new( shared_tm_pool_wrapper, sync_tm_tcp_source, - tm_funnel_rx, + tm_sink_rx, tm_server_tx, ); @@ -209,7 +210,7 @@ fn static_tmtc_pool_main() { mode_leaf_interface, mgm_handler_composite_rx, pus_hk_reply_tx, - tm_funnel_tx, + tm_sink_tx, dummy_spi_interface, shared_mgm_set, ); @@ -240,9 +241,9 @@ fn static_tmtc_pool_main() { info!("Starting TM funnel task"); let jh_tm_funnel = thread::Builder::new() - .name("TM Funnel".to_string()) + .name("tm sink".to_string()) .spawn(move || loop { - tm_funnel.operation(); + tm_sink.operation(); }) .unwrap(); @@ -314,10 +315,11 @@ fn dyn_tmtc_pool_main() { // Create event handling components // These sender handles are used to send event requests, for example to enable or disable // certain events. + let (event_tx, event_rx) = mpsc::sync_channel(100); let (event_request_tx, event_request_rx) = mpsc::channel::(); // The event task is the core handler to perform the event routing and TM handling as specified // in the sat-rs documentation. - let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_request_rx); + let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_rx, event_request_rx); let (pus_test_tx, pus_test_rx) = mpsc::channel(); let (pus_event_tx, pus_event_rx) = mpsc::channel(); @@ -339,11 +341,8 @@ fn dyn_tmtc_pool_main() { mode_tc_sender: pus_mode_tx, }; - let pus_test_service = create_test_service_dynamic( - tm_funnel_tx.clone(), - event_handler.clone_event_sender(), - pus_test_rx, - ); + let pus_test_service = + create_test_service_dynamic(tm_funnel_tx.clone(), event_tx.clone(), pus_test_rx); let pus_scheduler_service = create_scheduler_service_dynamic( tm_funnel_tx.clone(), tc_source_tx.clone(), @@ -411,7 +410,7 @@ fn dyn_tmtc_pool_main() { ) .expect("tcp server creation failed"); - let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); + let mut tm_funnel = TmSinkDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); let (mgm_handler_mode_reply_to_parent_tx, _mgm_handler_mode_reply_to_parent_rx) = mpsc::channel(); @@ -459,7 +458,7 @@ fn dyn_tmtc_pool_main() { info!("Starting TM funnel task"); let jh_tm_funnel = thread::Builder::new() - .name("sat-rs tm-funnel".to_string()) + .name("sat-rs tm-sink".to_string()) .spawn(move || loop { tm_funnel.operation(); }) diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index 583b72c..585e93b 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -23,7 +23,7 @@ use super::HandlingStatus; pub fn create_test_service_static( tm_sender: PacketSenderWithSharedPool, tc_pool: SharedStaticMemoryPool, - event_sender: mpsc::Sender, + event_sender: mpsc::SyncSender, pus_test_rx: mpsc::Receiver, ) -> TestCustomServiceWrapper { let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( @@ -41,7 +41,7 @@ pub fn create_test_service_static( pub fn create_test_service_dynamic( tm_funnel_tx: mpsc::Sender, - event_sender: mpsc::Sender, + event_sender: mpsc::SyncSender, pus_test_rx: mpsc::Receiver, ) -> TestCustomServiceWrapper { let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( @@ -61,7 +61,7 @@ pub struct TestCustomServiceWrapper, - pub test_srv_event_sender: mpsc::Sender, + pub test_srv_event_sender: mpsc::SyncSender, } impl diff --git a/satrs-example/src/tmtc/tm_sink.rs b/satrs-example/src/tmtc/tm_sink.rs index 955a997..0771a79 100644 --- a/satrs-example/src/tmtc/tm_sink.rs +++ b/satrs-example/src/tmtc/tm_sink.rs @@ -70,18 +70,23 @@ impl TmFunnelCommon { } fn packet_printout(tm: &PusTmZeroCopyWriter) { - info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice()); + info!( + "Sending PUS TM[{},{}] with APID {}", + tm.service(), + tm.subservice(), + tm.apid() + ); } } -pub struct TmFunnelStatic { +pub struct TmSinkStatic { common: TmFunnelCommon, shared_tm_store: SharedPacketPool, tm_funnel_rx: mpsc::Receiver, tm_server_tx: mpsc::SyncSender, } -impl TmFunnelStatic { +impl TmSinkStatic { pub fn new( shared_tm_store: SharedPacketPool, sync_tm_tcp_source: SyncTcpTmSource, @@ -121,13 +126,13 @@ impl TmFunnelStatic { } } -pub struct TmFunnelDynamic { +pub struct TmSinkDynamic { common: TmFunnelCommon, tm_funnel_rx: mpsc::Receiver, tm_server_tx: mpsc::Sender, } -impl TmFunnelDynamic { +impl TmSinkDynamic { pub fn new( sync_tm_tcp_source: SyncTcpTmSource, tm_funnel_rx: mpsc::Receiver, diff --git a/satrs-shared/CHANGELOG.md b/satrs-shared/CHANGELOG.md index 0c62de9..8e718d8 100644 --- a/satrs-shared/CHANGELOG.md +++ b/satrs-shared/CHANGELOG.md @@ -8,6 +8,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/). # [unreleased] +# [v0.1.4] 2024-04-24 + +## Added + +- `ResultU16::from_be_bytes` +- `From` impl for `ResultU16`. +- Optional `defmt` support: `defmt::Format` impl on `ResultU16` if the `defmt` feature is + activated. + # [v0.1.3] 2024-04-16 Allow `spacepackets` range starting with v0.10 and v0.11. diff --git a/satrs-shared/Cargo.toml b/satrs-shared/Cargo.toml index 2ed8f26..175f909 100644 --- a/satrs-shared/Cargo.toml +++ b/satrs-shared/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "satrs-shared" description = "Components shared by multiple sat-rs crates" -version = "0.1.3" +version = "0.1.4" edition = "2021" authors = ["Robin Mueller "] homepage = "https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/" @@ -17,12 +17,17 @@ version = "1" default-features = false optional = true +[dependencies.defmt] +version = "0.3" +optional = true + [dependencies.spacepackets] version = ">0.9, <=0.11" default-features = false [features] serde = ["dep:serde", "spacepackets/serde"] +spacepackets = ["dep:defmt", "spacepackets/defmt"] [package.metadata.docs.rs] -rustdoc-args = ["--cfg", "doc_cfg", "--generate-link-to-definition"] +rustdoc-args = ["--cfg", "docs_rs", "--generate-link-to-definition"] diff --git a/satrs-shared/src/lib.rs b/satrs-shared/src/lib.rs index 6ecdbac..428f01f 100644 --- a/satrs-shared/src/lib.rs +++ b/satrs-shared/src/lib.rs @@ -1,3 +1,4 @@ //! This crates contains modules shared among other sat-rs framework crates. #![no_std] +#![cfg_attr(docs_rs, feature(doc_auto_cfg))] pub mod res_code; diff --git a/satrs-shared/src/res_code.rs b/satrs-shared/src/res_code.rs index e7816f3..099e84d 100644 --- a/satrs-shared/src/res_code.rs +++ b/satrs-shared/src/res_code.rs @@ -7,6 +7,7 @@ use spacepackets::ByteConversionError; /// Simple [u16] based result code type which also allows to group related resultcodes. #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct ResultU16 { group_id: u8, unique_id: u8, @@ -19,15 +20,28 @@ impl ResultU16 { unique_id, } } + pub fn raw(&self) -> u16 { ((self.group_id as u16) << 8) | self.unique_id as u16 } + pub fn group_id(&self) -> u8 { self.group_id } + pub fn unique_id(&self) -> u8 { self.unique_id } + + pub fn from_be_bytes(bytes: [u8; 2]) -> Self { + Self::from(u16::from_be_bytes(bytes)) + } +} + +impl From for ResultU16 { + fn from(value: u16) -> Self { + Self::new(((value >> 8) & 0xff) as u8, (value & 0xff) as u8) + } } impl From for EcssEnumU16 { @@ -84,5 +98,14 @@ mod tests { assert_eq!(written, 2); assert_eq!(buf[0], 1); assert_eq!(buf[1], 1); + let read_back = ResultU16::from_be_bytes(buf); + assert_eq!(read_back, result_code); + } + + #[test] + fn test_from_u16() { + let result_code = ResultU16::new(1, 1); + let result_code_2 = ResultU16::from(result_code.raw()); + assert_eq!(result_code, result_code_2); } } diff --git a/satrs/CHANGELOG.md b/satrs/CHANGELOG.md index 56c1e14..6124919 100644 --- a/satrs/CHANGELOG.md +++ b/satrs/CHANGELOG.md @@ -8,7 +8,22 @@ and this project adheres to [Semantic Versioning](http://semver.org/). # [unreleased] -# [v0.2.0-rc.4] 2024-04-20 +# [v0.2.0-rc.5] 2024-04-24 + +## Added + +- Optional `defmt::Format` support for the event types, if the `defmt` feature is activated. + +## Changed + +- Removed `MpscEventReceiver`, the `EventReceiveProvider` trait is implemented directly + on `mpsc::Receiver>` +- Renamed `PusEventDispatcher` to `PusEventTmCreatorWithMap`. +- Renamed `DefaultPusEventU32Dispatcher` to `DefaultPusEventU32EventCreator`. +- Renamed `PusEventMgmtBackendProvider` renamed to `PusEventReportingMap`. +- Reanmed Event `const_new` methods to `new` and the former `new` methods to `new_checked` + +# [v0.2.0-rc.4] 2024-04-23 ## Changed @@ -21,6 +36,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - `parse_for_ccsds_space_packets` did not detect CCSDS space packets at the buffer end with the smallest possible size of 7 bytes. +- TCP server component now re-registers the internal `mio::Poll` object if the client reset + the connection unexpectedly. Not doing so prevented the server from functioning properly + after a re-connect. # [v0.2.0-rc.3] 2024-04-17 diff --git a/satrs/Cargo.toml b/satrs/Cargo.toml index c0ec369..94f8e79 100644 --- a/satrs/Cargo.toml +++ b/satrs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "satrs" -version = "0.2.0-rc.3" +version = "0.2.0-rc.5" edition = "2021" rust-version = "1.71.1" authors = ["Robin Mueller "] @@ -84,10 +84,14 @@ version = "0.8" features = ["os-poll", "net"] optional = true +[dependencies.defmt] +version = "0.3" +optional = true + [dev-dependencies] serde = "1" zerocopy = "0.7" -once_cell = "1.13" +once_cell = "1" serde_json = "1" rand = "0.8" tempfile = "3" @@ -120,7 +124,7 @@ alloc = [ serde = ["dep:serde", "spacepackets/serde", "satrs-shared/serde"] crossbeam = ["crossbeam-channel"] heapless = ["dep:heapless"] -defmt = ["spacepackets/defmt"] +defmt = ["dep:defmt", "spacepackets/defmt"] test_util = [] doc-images = [] diff --git a/satrs/src/encoding/ccsds.rs b/satrs/src/encoding/ccsds.rs index c3ed382..1f21426 100644 --- a/satrs/src/encoding/ccsds.rs +++ b/satrs/src/encoding/ccsds.rs @@ -203,12 +203,8 @@ mod tests { .expect("writing packet failed"); let tc_cacher = TcCacher::default(); let verificator = SimpleVerificator::new_with_second_id(); - let parse_result = parse_buffer_for_ccsds_space_packets( - &buffer, - &verificator, - PARSER_ID, - &tc_cacher, - ); + let parse_result = + parse_buffer_for_ccsds_space_packets(&buffer, &verificator, PARSER_ID, &tc_cacher); assert!(parse_result.is_ok()); let parse_result = parse_result.unwrap(); assert_eq!(parse_result.packets_found, 2); @@ -251,7 +247,6 @@ mod tests { let incomplete_tail_idx = parse_result.incomplete_tail_start.unwrap(); assert_eq!(incomplete_tail_idx, packet_len_ping); - let queue = tc_cacher.tc_queue.borrow(); assert_eq!(queue.len(), 1); // The broken packet was moved to the start, so the next write index should be after the @@ -295,12 +290,8 @@ mod tests { .expect("writing failed"); let verificator = SimpleVerificator::default(); let tc_cacher = TcCacher::default(); - let parse_result = parse_buffer_for_ccsds_space_packets( - &buf, - &verificator, - PARSER_ID, - &tc_cacher, - ); + let parse_result = + parse_buffer_for_ccsds_space_packets(&buf, &verificator, PARSER_ID, &tc_cacher); assert!(parse_result.is_ok()); let parse_result = parse_result.unwrap(); assert_eq!(parse_result.packets_found, 1); diff --git a/satrs/src/event_man.rs b/satrs/src/event_man.rs index 38752eb..bf33a68 100644 --- a/satrs/src/event_man.rs +++ b/satrs/src/event_man.rs @@ -1,14 +1,12 @@ //! Event management and forwarding //! -//! This module provides components to perform event routing. The most important component for this -//! task is the [EventManager]. It receives all events and then routes them to event subscribers -//! where appropriate. One common use case for satellite systems is to offer a light-weight -//! publish-subscribe mechanism and IPC mechanism for software and hardware events which are also -//! packaged as telemetry (TM) or can trigger a system response. -//! //! It is recommended to read the //! [sat-rs book chapter](https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/book/events.html) -//! about events first: +//! about events first. +//! +//! This module provides components to perform event routing. The most important component for this +//! task is the [EventManager]. It receives all events and then routes them to event subscribers +//! where appropriate. //! //! The event manager has a listener table abstracted by the [ListenerMapProvider], which maps //! listener groups identified by [ListenerKey]s to a [listener ID][ComponentId]. @@ -21,8 +19,8 @@ //! //! 1. Provide a concrete [EventReceiveProvider] implementation. This abstraction allow to use different //! message queue backends. A straightforward implementation where dynamic memory allocation is -//! not a big concern could use [std::sync::mpsc::channel] to do this and is provided in -//! form of the [MpscEventReceiver]. +//! not a big concern would be to use the [std::sync::mpsc::Receiver] handle. The trait is +//! already implemented for this type. //! 2. To set up event creators, create channel pairs using some message queue implementation. //! Each event creator gets a (cloned) sender component which allows it to send events to the //! manager. @@ -44,6 +42,12 @@ //! You can check [integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/pus_events.rs) //! for a concrete example using multi-threading where events are routed to //! different threads. +//! +//! The [satrs-example](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example) +//! also contains a full event manager instance and exposes a test event via the PUS test service. +//! The [PUS event](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/pus/event.rs) +//! module and the generic [events module](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/events.rs) +//! show how the event management modules can be integrated into a more complex software. use crate::events::{EventU16, EventU32, GenericEvent, LargestEventRaw, LargestGroupIdRaw}; use crate::params::Params; use crate::queue::GenericSendError; @@ -157,9 +161,10 @@ pub trait SenderMapProvider< /// * `SenderMap`: [SenderMapProvider] which maps channel IDs to send providers. /// * `ListenerMap`: [ListenerMapProvider] which maps listener keys to channel IDs. /// * `EventSender`: [EventSendProvider] contained within the sender map which sends the events. -/// * `Ev`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32] +/// * `Event`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32] /// and [EventU16] are supported. -/// * `Data`: Auxiliary data which is sent with the event to provide optional context information +/// * `ParamProvider`: Auxiliary data which is sent with the event to provide optional context +/// information pub struct EventManager< EventReceiver: EventReceiveProvider, SenderMap: SenderMapProvider, @@ -290,7 +295,7 @@ impl< for id in ids { if let Some(sender) = self.sender_map.get_send_event_provider(id) { if let Err(e) = sender.send(EventMessage::new_generic( - *id, + event_msg.sender_id, event_msg.event, event_msg.params.as_ref(), )) { @@ -331,11 +336,11 @@ pub mod alloc_mod { /// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap] /// and the [DefaultListenerMap]. It uses regular mpsc channels as the message queue backend. - pub type EventManagerWithMpsc = EventManager< - MpscEventReceiver, - DefaultSenderMap, EV, AUX>, + pub type EventManagerWithMpsc = EventManager< + EventU32ReceiverMpsc, + DefaultSenderMap, Event, ParamProvider>, DefaultListenerMap, - EventSenderMpsc, + EventSenderMpsc, >; /// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap] @@ -343,7 +348,7 @@ pub mod alloc_mod { /// [bounded mpsc senders](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) as the /// message queue backend. pub type EventManagerWithBoundedMpsc = EventManager< - MpscEventReceiver, + EventU32ReceiverMpsc, DefaultSenderMap, Event, ParamProvider>, DefaultListenerMap, EventSenderMpscBounded, @@ -479,20 +484,16 @@ pub mod std_mod { use super::*; use std::sync::mpsc; - pub struct MpscEventReceiver { - receiver: mpsc::Receiver>, - } - - impl MpscEventReceiver { - pub fn new(receiver: mpsc::Receiver>) -> Self { - Self { receiver } - } - } - impl EventReceiveProvider for MpscEventReceiver { + impl + EventReceiveProvider + for mpsc::Receiver> + { type Error = GenericReceiveError; - fn try_recv_event(&self) -> Result>, Self::Error> { - match self.receiver.try_recv() { + fn try_recv_event( + &self, + ) -> Result>, Self::Error> { + match self.try_recv() { Ok(msg) => Ok(Some(msg)), Err(e) => match e { mpsc::TryRecvError::Empty => Ok(None), @@ -504,8 +505,10 @@ pub mod std_mod { } } - pub type MpscEventU32Receiver = MpscEventReceiver; - pub type MpscEventU16Receiver = MpscEventReceiver; + pub type EventU32ReceiverMpsc = + mpsc::Receiver>; + pub type EventU16ReceiverMpsc = + mpsc::Receiver>; /// Generic event sender which uses a regular [mpsc::Sender] as the messaging backend to /// send events. @@ -594,7 +597,7 @@ mod tests { use std::format; use std::sync::mpsc::{self}; - const TEST_EVENT: EventU32 = EventU32::const_new(Severity::INFO, 0, 5); + const TEST_EVENT: EventU32 = EventU32::new(Severity::Info, 0, 5); fn check_next_event( expected: EventU32, @@ -611,6 +614,7 @@ mod tests { res: EventRoutingResult, expected: EventU32, expected_num_sent: u32, + expected_sender_id: ComponentId, ) { assert!(matches!(res, EventRoutingResult::Handled { .. })); if let EventRoutingResult::Handled { @@ -619,21 +623,21 @@ mod tests { } = res { assert_eq!(event_msg.event, expected); + assert_eq!(event_msg.sender_id, expected_sender_id); assert_eq!(num_recipients, expected_num_sent); } } fn generic_event_man() -> (mpsc::Sender, EventManagerWithMpsc) { - let (event_sender, manager_queue) = mpsc::channel(); - let event_man_receiver = MpscEventReceiver::new(manager_queue); - (event_sender, EventManager::new(event_man_receiver)) + let (event_sender, event_receiver) = mpsc::channel(); + (event_sender, EventManager::new(event_receiver)) } #[test] fn test_basic() { let (event_sender, mut event_man) = generic_event_man(); - let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap(); - let event_grp_1_0 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); + let event_grp_0 = EventU32::new(Severity::Info, 0, 0); + let event_grp_1_0 = EventU32::new(Severity::High, 1, 0); let (single_event_sender, single_event_receiver) = mpsc::channel(); let single_event_listener = EventSenderMpsc::new(0, single_event_sender); event_man.subscribe_single(&event_grp_0, single_event_listener.target_id()); @@ -651,8 +655,7 @@ mod tests { .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_grp_0)) .expect("Sending single error failed"); let res = event_man.try_event_handling(&error_handler); - // assert!(res.is_ok()); - check_handled_event(res, event_grp_0, 1); + check_handled_event(res, event_grp_0, 1, TEST_COMPONENT_ID_0.id()); check_next_event(event_grp_0, &single_event_receiver); // Test event which is sent to all group listeners @@ -660,7 +663,7 @@ mod tests { .send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_grp_1_0)) .expect("Sending group error failed"); let res = event_man.try_event_handling(&error_handler); - check_handled_event(res, event_grp_1_0, 1); + check_handled_event(res, event_grp_1_0, 1, TEST_COMPONENT_ID_1.id()); check_next_event(event_grp_1_0, &group_event_receiver_0); } @@ -670,7 +673,7 @@ mod tests { panic!("routing error occurred for event {:?}: {:?}", event_msg, e); }; let (event_sender, mut event_man) = generic_event_man(); - let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap(); + let event_grp_0 = EventU32::new(Severity::Info, 0, 0); let (single_event_sender, single_event_receiver) = mpsc::channel(); let single_event_listener = EventSenderMpsc::new(0, single_event_sender); event_man.subscribe_single(&event_grp_0, single_event_listener.target_id()); @@ -683,7 +686,7 @@ mod tests { )) .expect("Sending group error failed"); let res = event_man.try_event_handling(&error_handler); - check_handled_event(res, event_grp_0, 1); + check_handled_event(res, event_grp_0, 1, TEST_COMPONENT_ID_0.id()); let aux = check_next_event(event_grp_0, &single_event_receiver); assert!(aux.is_some()); let aux = aux.unwrap(); @@ -705,8 +708,8 @@ mod tests { let res = event_man.try_event_handling(error_handler); assert!(matches!(res, EventRoutingResult::Empty)); - let event_grp_0 = EventU32::new(Severity::INFO, 0, 0).unwrap(); - let event_grp_1_0 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); + let event_grp_0 = EventU32::new(Severity::Info, 0, 0); + let event_grp_1_0 = EventU32::new(Severity::High, 1, 0); let (event_grp_0_sender, event_grp_0_receiver) = mpsc::channel(); let event_grp_0_and_1_listener = EventU32SenderMpsc::new(0, event_grp_0_sender); event_man.subscribe_group( @@ -726,9 +729,9 @@ mod tests { .send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_grp_1_0)) .expect("Sendign Event Group 1 failed"); let res = event_man.try_event_handling(error_handler); - check_handled_event(res, event_grp_0, 1); + check_handled_event(res, event_grp_0, 1, TEST_COMPONENT_ID_0.id()); let res = event_man.try_event_handling(error_handler); - check_handled_event(res, event_grp_1_0, 1); + check_handled_event(res, event_grp_1_0, 1, TEST_COMPONENT_ID_1.id()); check_next_event(event_grp_0, &event_grp_0_receiver); check_next_event(event_grp_1_0, &event_grp_0_receiver); @@ -742,8 +745,8 @@ mod tests { panic!("routing error occurred for event {:?}: {:?}", event_msg, e); }; let (event_sender, mut event_man) = generic_event_man(); - let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap(); - let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); + let event_0 = EventU32::new(Severity::Info, 0, 5); + let event_1 = EventU32::new(Severity::High, 1, 0); let (event_0_tx_0, event_0_rx_0) = mpsc::channel(); let (event_0_tx_1, event_0_rx_1) = mpsc::channel(); let event_listener_0 = EventU32SenderMpsc::new(0, event_0_tx_0); @@ -758,7 +761,7 @@ mod tests { .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_0)) .expect("Triggering Event 0 failed"); let res = event_man.try_event_handling(error_handler); - check_handled_event(res, event_0, 2); + check_handled_event(res, event_0, 2, TEST_COMPONENT_ID_0.id()); check_next_event(event_0, &event_0_rx_0); check_next_event(event_0, &event_0_rx_1); event_man.subscribe_group(event_1.group_id(), event_listener_0_sender_id); @@ -771,9 +774,9 @@ mod tests { // 3 Events messages will be sent now let res = event_man.try_event_handling(error_handler); - check_handled_event(res, event_0, 2); + check_handled_event(res, event_0, 2, TEST_COMPONENT_ID_0.id()); let res = event_man.try_event_handling(error_handler); - check_handled_event(res, event_1, 1); + check_handled_event(res, event_1, 1, TEST_COMPONENT_ID_1.id()); // Both the single event and the group event should arrive now check_next_event(event_0, &event_0_rx_0); check_next_event(event_1, &event_0_rx_0); @@ -785,7 +788,7 @@ mod tests { .send(EventMessage::new(TEST_COMPONENT_ID_0.id(), event_1)) .expect("Triggering Event 1 failed"); let res = event_man.try_event_handling(error_handler); - check_handled_event(res, event_1, 1); + check_handled_event(res, event_1, 1, TEST_COMPONENT_ID_0.id()); } #[test] @@ -793,11 +796,10 @@ mod tests { let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| { panic!("routing error occurred for event {:?}: {:?}", event_msg, e); }; - let (event_sender, manager_queue) = mpsc::channel(); - let event_man_receiver = MpscEventReceiver::new(manager_queue); - let mut event_man = EventManagerWithMpsc::new(event_man_receiver); - let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap(); - let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap(); + let (event_sender, event_receiver) = mpsc::channel(); + let mut event_man = EventManagerWithMpsc::new(event_receiver); + let event_0 = EventU32::new(Severity::Info, 0, 5); + let event_1 = EventU32::new(Severity::High, 1, 0); let (event_0_tx_0, all_events_rx) = mpsc::channel(); let all_events_listener = EventU32SenderMpsc::new(0, event_0_tx_0); event_man.subscribe_all(all_events_listener.target_id()); @@ -809,9 +811,9 @@ mod tests { .send(EventMessage::new(TEST_COMPONENT_ID_1.id(), event_1)) .expect("Triggering event 1 failed"); let res = event_man.try_event_handling(error_handler); - check_handled_event(res, event_0, 1); + check_handled_event(res, event_0, 1, TEST_COMPONENT_ID_0.id()); let res = event_man.try_event_handling(error_handler); - check_handled_event(res, event_1, 1); + check_handled_event(res, event_1, 1, TEST_COMPONENT_ID_1.id()); check_next_event(event_0, &all_events_rx); check_next_event(event_1, &all_events_rx); } diff --git a/satrs/src/events.rs b/satrs/src/events.rs index 032322a..5544eb2 100644 --- a/satrs/src/events.rs +++ b/satrs/src/events.rs @@ -20,10 +20,10 @@ //! ``` //! use satrs::events::{EventU16, EventU32, EventU32TypedSev, Severity, SeverityHigh, SeverityInfo}; //! -//! const MSG_RECVD: EventU32TypedSev = EventU32TypedSev::const_new(1, 0); -//! const MSG_FAILED: EventU32 = EventU32::const_new(Severity::LOW, 1, 1); +//! const MSG_RECVD: EventU32TypedSev = EventU32TypedSev::new(1, 0); +//! const MSG_FAILED: EventU32 = EventU32::new(Severity::LOW, 1, 1); //! -//! const TEMPERATURE_HIGH: EventU32TypedSev = EventU32TypedSev::const_new(2, 0); +//! const TEMPERATURE_HIGH: EventU32TypedSev = EventU32TypedSev::new(2, 0); //! //! let small_event = EventU16::new(Severity::INFO, 3, 0); //! ``` @@ -40,12 +40,17 @@ pub type LargestEventRaw = u32; /// Using a type definition allows to change this to u32 in the future more easily pub type LargestGroupIdRaw = u16; +pub const MAX_GROUP_ID_U32_EVENT: u16 = 2_u16.pow(14) - 1; +pub const MAX_GROUP_ID_U16_EVENT: u16 = 2_u16.pow(6) - 1; + #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum Severity { - INFO = 0, - LOW = 1, - MEDIUM = 2, - HIGH = 3, + Info = 0, + Low = 1, + Medium = 2, + High = 3, } pub trait HasSeverity: Debug + PartialEq + Eq + Copy + Clone { @@ -56,28 +61,28 @@ pub trait HasSeverity: Debug + PartialEq + Eq + Copy + Clone { #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub struct SeverityInfo {} impl HasSeverity for SeverityInfo { - const SEVERITY: Severity = Severity::INFO; + const SEVERITY: Severity = Severity::Info; } /// Type level support struct #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub struct SeverityLow {} impl HasSeverity for SeverityLow { - const SEVERITY: Severity = Severity::LOW; + const SEVERITY: Severity = Severity::Low; } /// Type level support struct #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub struct SeverityMedium {} impl HasSeverity for SeverityMedium { - const SEVERITY: Severity = Severity::MEDIUM; + const SEVERITY: Severity = Severity::Medium; } /// Type level support struct #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub struct SeverityHigh {} impl HasSeverity for SeverityHigh { - const SEVERITY: Severity = Severity::HIGH; + const SEVERITY: Severity = Severity::High; } pub trait GenericEvent: EcssEnumeration + Copy + Clone { @@ -99,27 +104,29 @@ impl TryFrom for Severity { fn try_from(value: u8) -> Result { match value { - x if x == Severity::INFO as u8 => Ok(Severity::INFO), - x if x == Severity::LOW as u8 => Ok(Severity::LOW), - x if x == Severity::MEDIUM as u8 => Ok(Severity::MEDIUM), - x if x == Severity::HIGH as u8 => Ok(Severity::HIGH), + x if x == Severity::Info as u8 => Ok(Severity::Info), + x if x == Severity::Low as u8 => Ok(Severity::Low), + x if x == Severity::Medium as u8 => Ok(Severity::Medium), + x if x == Severity::High as u8 => Ok(Severity::High), _ => Err(()), } } } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -struct EventBase { +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +struct EventBase { severity: Severity, - group_id: GID, - unique_id: UID, - phantom: PhantomData, + group_id: GroupId, + unique_id: UniqueId, + phantom: PhantomData, } -impl EventBase { +impl EventBase { fn write_to_bytes( &self, - raw: RAW, + raw: Raw, buf: &mut [u8], width: usize, ) -> Result { @@ -267,6 +274,7 @@ macro_rules! const_from_fn { } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct EventU32 { base: EventBase, } @@ -309,12 +317,12 @@ impl EventU32 { /// next 14 bits after the severity. Therefore, the size is limited by dec 16383 hex 0x3FFF. /// * `unique_id`: Each event has a unique 16 bit ID occupying the last 16 bits of the /// raw event ID - pub fn new( + pub fn new_checked( severity: Severity, group_id: ::GroupId, unique_id: ::UniqueId, ) -> Option { - if group_id > (2u16.pow(14) - 1) { + if group_id > MAX_GROUP_ID_U32_EVENT { return None; } Some(Self { @@ -326,12 +334,14 @@ impl EventU32 { }, }) } - pub const fn const_new( + + /// This constructor will panic if the passed group is is larger than [MAX_GROUP_ID_U32_EVENT]. + pub const fn new( severity: Severity, group_id: ::GroupId, unique_id: ::UniqueId, ) -> Self { - if group_id > (2u16.pow(14) - 1) { + if group_id > MAX_GROUP_ID_U32_EVENT { panic!("Group ID too large"); } Self { @@ -344,50 +354,16 @@ impl EventU32 { } } + pub fn from_be_bytes(bytes: [u8; 4]) -> Self { + Self::from(u32::from_be_bytes(bytes)) + } + const_from_fn!(const_from_info, EventU32TypedSev, SeverityInfo); const_from_fn!(const_from_low, EventU32TypedSev, SeverityLow); const_from_fn!(const_from_medium, EventU32TypedSev, SeverityMedium); const_from_fn!(const_from_high, EventU32TypedSev, SeverityHigh); } -impl EventU32TypedSev { - /// This is similar to [EventU32::new] but the severity is a type generic, which allows to - /// have distinct types for events with different severities - pub fn new( - group_id: ::GroupId, - unique_id: ::UniqueId, - ) -> Option { - let event = EventU32::new(SEVERITY::SEVERITY, group_id, unique_id)?; - Some(Self { - event, - phantom: PhantomData, - }) - } - - /// Const version of [Self::new], but panics on invalid group ID input values. - pub const fn const_new( - group_id: ::GroupId, - unique_id: ::UniqueId, - ) -> Self { - let event = EventU32::const_new(SEVERITY::SEVERITY, group_id, unique_id); - Self { - event, - phantom: PhantomData, - } - } - - fn try_from_generic(expected: Severity, raw: u32) -> Result { - let severity = Severity::try_from(((raw >> 30) & 0b11) as u8).unwrap(); - if severity != expected { - return Err(severity); - } - Ok(Self::const_new( - ((raw >> 16) & 0x3FFF) as u16, - (raw & 0xFFFF) as u16, - )) - } -} - impl From for EventU32 { fn from(raw: u32) -> Self { // Severity conversion from u8 should never fail @@ -395,15 +371,10 @@ impl From for EventU32 { let group_id = ((raw >> 16) & 0x3FFF) as u16; let unique_id = (raw & 0xFFFF) as u16; // Sanitized input, should never fail - Self::const_new(severity, group_id, unique_id) + Self::new(severity, group_id, unique_id) } } -try_from_impls!(SeverityInfo, Severity::INFO, u32, EventU32TypedSev); -try_from_impls!(SeverityLow, Severity::LOW, u32, EventU32TypedSev); -try_from_impls!(SeverityMedium, Severity::MEDIUM, u32, EventU32TypedSev); -try_from_impls!(SeverityHigh, Severity::HIGH, u32, EventU32TypedSev); - impl UnsignedEnum for EventU32 { fn size(&self) -> usize { core::mem::size_of::() @@ -424,6 +395,49 @@ impl EcssEnumeration for EventU32 { } } +impl EventU32TypedSev { + /// This is similar to [EventU32::new] but the severity is a type generic, which allows to + /// have distinct types for events with different severities + pub fn new_checked( + group_id: ::GroupId, + unique_id: ::UniqueId, + ) -> Option { + let event = EventU32::new_checked(SEVERITY::SEVERITY, group_id, unique_id)?; + Some(Self { + event, + phantom: PhantomData, + }) + } + + /// This constructor will panic if the `group_id` is larger than [MAX_GROUP_ID_U32_EVENT]. + pub const fn new( + group_id: ::GroupId, + unique_id: ::UniqueId, + ) -> Self { + let event = EventU32::new(SEVERITY::SEVERITY, group_id, unique_id); + Self { + event, + phantom: PhantomData, + } + } + + fn try_from_generic(expected: Severity, raw: u32) -> Result { + let severity = Severity::try_from(((raw >> 30) & 0b11) as u8).unwrap(); + if severity != expected { + return Err(severity); + } + Ok(Self::new( + ((raw >> 16) & 0x3FFF) as u16, + (raw & 0xFFFF) as u16, + )) + } +} + +try_from_impls!(SeverityInfo, Severity::Info, u32, EventU32TypedSev); +try_from_impls!(SeverityLow, Severity::Low, u32, EventU32TypedSev); +try_from_impls!(SeverityMedium, Severity::Medium, u32, EventU32TypedSev); +try_from_impls!(SeverityHigh, Severity::High, u32, EventU32TypedSev); + //noinspection RsTraitImplementation impl UnsignedEnum for EventU32TypedSev { delegate!(to self.event { @@ -441,6 +455,8 @@ impl EcssEnumeration for EventU32TypedSev { } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct EventU16 { base: EventBase, } @@ -475,7 +491,7 @@ impl EventU16 { /// next 6 bits after the severity. Therefore, the size is limited by dec 63 hex 0x3F. /// * `unique_id`: Each event has a unique 8 bit ID occupying the last 8 bits of the /// raw event ID - pub fn new( + pub fn new_checked( severity: Severity, group_id: ::GroupId, unique_id: ::UniqueId, @@ -493,8 +509,8 @@ impl EventU16 { }) } - /// Const version of [Self::new], but panics on invalid group ID input values. - pub const fn const_new( + /// This constructor will panic if the `group_id` is larger than [MAX_GROUP_ID_U16_EVENT]. + pub const fn new( severity: Severity, group_id: ::GroupId, unique_id: ::UniqueId, @@ -511,52 +527,26 @@ impl EventU16 { }, } } + pub fn from_be_bytes(bytes: [u8; 2]) -> Self { + Self::from(u16::from_be_bytes(bytes)) + } + const_from_fn!(const_from_info, EventU16TypedSev, SeverityInfo); const_from_fn!(const_from_low, EventU16TypedSev, SeverityLow); const_from_fn!(const_from_medium, EventU16TypedSev, SeverityMedium); const_from_fn!(const_from_high, EventU16TypedSev, SeverityHigh); } -impl EventU16TypedSev { - /// This is similar to [EventU16::new] but the severity is a type generic, which allows to - /// have distinct types for events with different severities - pub fn new( - group_id: ::GroupId, - unique_id: ::UniqueId, - ) -> Option { - let event = EventU16::new(SEVERITY::SEVERITY, group_id, unique_id)?; - Some(Self { - event, - phantom: PhantomData, - }) - } - - /// Const version of [Self::new], but panics on invalid group ID input values. - pub const fn const_new( - group_id: ::GroupId, - unique_id: ::UniqueId, - ) -> Self { - let event = EventU16::const_new(SEVERITY::SEVERITY, group_id, unique_id); - Self { - event, - phantom: PhantomData, - } - } - - fn try_from_generic(expected: Severity, raw: u16) -> Result { +impl From for EventU16 { + fn from(raw: ::Raw) -> Self { let severity = Severity::try_from(((raw >> 14) & 0b11) as u8).unwrap(); - if severity != expected { - return Err(severity); - } - Ok(Self::const_new( - ((raw >> 8) & 0x3F) as u8, - (raw & 0xFF) as u8, - )) + let group_id = ((raw >> 8) & 0x3F) as u8; + let unique_id = (raw & 0xFF) as u8; + // Sanitized input, new call should never fail + Self::new(severity, group_id, unique_id) } } -impl_event_provider!(EventU16, EventU16TypedSev, u16, u8, u8); - impl UnsignedEnum for EventU16 { fn size(&self) -> usize { core::mem::size_of::() @@ -577,6 +567,43 @@ impl EcssEnumeration for EventU16 { } } +impl EventU16TypedSev { + /// This is similar to [EventU16::new] but the severity is a type generic, which allows to + /// have distinct types for events with different severities + pub fn new_checked( + group_id: ::GroupId, + unique_id: ::UniqueId, + ) -> Option { + let event = EventU16::new_checked(SEVERITY::SEVERITY, group_id, unique_id)?; + Some(Self { + event, + phantom: PhantomData, + }) + } + + /// This constructor will panic if the `group_id` is larger than [MAX_GROUP_ID_U16_EVENT]. + pub const fn new( + group_id: ::GroupId, + unique_id: ::UniqueId, + ) -> Self { + let event = EventU16::new(SEVERITY::SEVERITY, group_id, unique_id); + Self { + event, + phantom: PhantomData, + } + } + + fn try_from_generic(expected: Severity, raw: u16) -> Result { + let severity = Severity::try_from(((raw >> 14) & 0b11) as u8).unwrap(); + if severity != expected { + return Err(severity); + } + Ok(Self::new(((raw >> 8) & 0x3F) as u8, (raw & 0xFF) as u8)) + } +} + +impl_event_provider!(EventU16, EventU16TypedSev, u16, u8, u8); + //noinspection RsTraitImplementation impl UnsignedEnum for EventU16TypedSev { delegate!(to self.event { @@ -593,20 +620,10 @@ impl EcssEnumeration for EventU16TypedSev { }); } -impl From for EventU16 { - fn from(raw: ::Raw) -> Self { - let severity = Severity::try_from(((raw >> 14) & 0b11) as u8).unwrap(); - let group_id = ((raw >> 8) & 0x3F) as u8; - let unique_id = (raw & 0xFF) as u8; - // Sanitized input, new call should never fail - Self::const_new(severity, group_id, unique_id) - } -} - -try_from_impls!(SeverityInfo, Severity::INFO, u16, EventU16TypedSev); -try_from_impls!(SeverityLow, Severity::LOW, u16, EventU16TypedSev); -try_from_impls!(SeverityMedium, Severity::MEDIUM, u16, EventU16TypedSev); -try_from_impls!(SeverityHigh, Severity::HIGH, u16, EventU16TypedSev); +try_from_impls!(SeverityInfo, Severity::Info, u16, EventU16TypedSev); +try_from_impls!(SeverityLow, Severity::Low, u16, EventU16TypedSev); +try_from_impls!(SeverityMedium, Severity::Medium, u16, EventU16TypedSev); +try_from_impls!(SeverityHigh, Severity::High, u16, EventU16TypedSev); impl PartialEq for EventU32TypedSev { #[inline] @@ -647,12 +664,10 @@ mod tests { assert_eq!(size_of::(), val); } - const INFO_EVENT: EventU32TypedSev = EventU32TypedSev::const_new(0, 0); - const INFO_EVENT_SMALL: EventU16TypedSev = EventU16TypedSev::const_new(0, 0); - const HIGH_SEV_EVENT: EventU32TypedSev = - EventU32TypedSev::const_new(0x3FFF, 0xFFFF); - const HIGH_SEV_EVENT_SMALL: EventU16TypedSev = - EventU16TypedSev::const_new(0x3F, 0xff); + const INFO_EVENT: EventU32TypedSev = EventU32TypedSev::new(0, 0); + const INFO_EVENT_SMALL: EventU16TypedSev = EventU16TypedSev::new(0, 0); + const HIGH_SEV_EVENT: EventU32TypedSev = EventU32TypedSev::new(0x3FFF, 0xFFFF); + const HIGH_SEV_EVENT_SMALL: EventU16TypedSev = EventU16TypedSev::new(0x3F, 0xff); /// This working is a test in itself. const INFO_REDUCED: EventU32 = EventU32::const_from_info(INFO_EVENT); @@ -683,7 +698,7 @@ mod tests { #[test] fn test_normal_event_getters() { - assert_eq!(INFO_EVENT.severity(), Severity::INFO); + assert_eq!(INFO_EVENT.severity(), Severity::Info); assert_eq!(INFO_EVENT.unique_id(), 0); assert_eq!(INFO_EVENT.group_id(), 0); let raw_event = INFO_EVENT.raw(); @@ -692,7 +707,7 @@ mod tests { #[test] fn test_small_event_getters() { - assert_eq!(INFO_EVENT_SMALL.severity(), Severity::INFO); + assert_eq!(INFO_EVENT_SMALL.severity(), Severity::Info); assert_eq!(INFO_EVENT_SMALL.unique_id(), 0); assert_eq!(INFO_EVENT_SMALL.group_id(), 0); let raw_event = INFO_EVENT_SMALL.raw(); @@ -701,7 +716,7 @@ mod tests { #[test] fn all_ones_event_regular() { - assert_eq!(HIGH_SEV_EVENT.severity(), Severity::HIGH); + assert_eq!(HIGH_SEV_EVENT.severity(), Severity::High); assert_eq!(HIGH_SEV_EVENT.group_id(), 0x3FFF); assert_eq!(HIGH_SEV_EVENT.unique_id(), 0xFFFF); let raw_event = HIGH_SEV_EVENT.raw(); @@ -710,7 +725,7 @@ mod tests { #[test] fn all_ones_event_small() { - assert_eq!(HIGH_SEV_EVENT_SMALL.severity(), Severity::HIGH); + assert_eq!(HIGH_SEV_EVENT_SMALL.severity(), Severity::High); assert_eq!(HIGH_SEV_EVENT_SMALL.group_id(), 0x3F); assert_eq!(HIGH_SEV_EVENT_SMALL.unique_id(), 0xFF); let raw_event = HIGH_SEV_EVENT_SMALL.raw(); @@ -719,18 +734,19 @@ mod tests { #[test] fn invalid_group_id_normal() { - assert!(EventU32TypedSev::::new(2_u16.pow(14), 0).is_none()); + assert!(EventU32TypedSev::::new_checked(2_u16.pow(14), 0).is_none()); } #[test] fn invalid_group_id_small() { - assert!(EventU16TypedSev::::new(2_u8.pow(6), 0).is_none()); + assert!(EventU16TypedSev::::new_checked(2_u8.pow(6), 0).is_none()); } #[test] fn regular_new() { assert_eq!( - EventU32TypedSev::::new(0, 0).expect("Creating regular event failed"), + EventU32TypedSev::::new_checked(0, 0) + .expect("Creating regular event failed"), INFO_EVENT ); } @@ -738,7 +754,8 @@ mod tests { #[test] fn small_new() { assert_eq!( - EventU16TypedSev::::new(0, 0).expect("Creating regular event failed"), + EventU16TypedSev::::new_checked(0, 0) + .expect("Creating regular event failed"), INFO_EVENT_SMALL ); } @@ -777,6 +794,8 @@ mod tests { assert!(HIGH_SEV_EVENT.write_to_be_bytes(&mut buf).is_ok()); let val_from_raw = u32::from_be_bytes(buf); assert_eq!(val_from_raw, 0xFFFFFFFF); + let event_read_back = EventU32::from_be_bytes(buf); + assert_eq!(event_read_back, HIGH_SEV_EVENT); } #[test] @@ -785,6 +804,8 @@ mod tests { assert!(HIGH_SEV_EVENT_SMALL.write_to_be_bytes(&mut buf).is_ok()); let val_from_raw = u16::from_be_bytes(buf); assert_eq!(val_from_raw, 0xFFFF); + let event_read_back = EventU16::from_be_bytes(buf); + assert_eq!(event_read_back, HIGH_SEV_EVENT_SMALL); } #[test] @@ -815,13 +836,13 @@ mod tests { fn severity_from_invalid_raw_val() { let invalid = 0xFF; assert!(Severity::try_from(invalid).is_err()); - let invalid = Severity::HIGH as u8 + 1; + let invalid = Severity::High as u8 + 1; assert!(Severity::try_from(invalid).is_err()); } #[test] fn reduction() { - let event = EventU32TypedSev::::const_new(1, 1); + let event = EventU32TypedSev::::new(1, 1); let raw = event.raw(); let reduced: EventU32 = event.into(); assert_eq!(reduced.group_id(), 1); diff --git a/satrs/src/hal/std/tcp_server.rs b/satrs/src/hal/std/tcp_server.rs index 496ce89..983702a 100644 --- a/satrs/src/hal/std/tcp_server.rs +++ b/satrs/src/hal/std/tcp_server.rs @@ -9,8 +9,6 @@ use mio::{Events, Interest, Poll, Token}; use socket2::{Domain, Socket, Type}; use std::io::{self, Read}; use std::net::SocketAddr; -// use std::net::TcpListener; -// use std::net::{SocketAddr, TcpStream}; use std::thread; use crate::tmtc::{PacketSenderRaw, PacketSource}; @@ -249,8 +247,11 @@ impl< let mut mio_listener = TcpListener::from_std(listener); // Start listening for incoming connections. - poll.registry() - .register(&mut mio_listener, Token(0), Interest::READABLE)?; + poll.registry().register( + &mut mio_listener, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )?; Ok(Self { id: cfg.id, @@ -280,11 +281,11 @@ impl< self.listener.local_addr() } - /// This call is used to handle the next connection to a client. Right now, it performs + /// This call is used to handle all connection from clients. Right now, it performs /// the following steps: /// - /// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API - /// until a client connects. + /// 1. It calls the [std::net::TcpListener::accept] method until a client connects. An optional + /// timeout can be specified for non-blocking acceptance. /// 2. It reads all the telecommands from the client and parses all received data using the /// user specified [TcpTcParser]. /// 3. After reading and parsing all telecommands, it sends back all telemetry using the @@ -317,11 +318,17 @@ impl< loop { match self.listener.accept() { Ok((stream, addr)) => { - self.handle_accepted_connection(stream, addr)?; + if let Err(e) = self.handle_accepted_connection(stream, addr) { + self.reregister_poll_interest()?; + return Err(e); + } handled_connections += 1; } Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break, - Err(err) => return Err(TcpTmtcError::Io(err)), + Err(err) => { + self.reregister_poll_interest()?; + return Err(TcpTmtcError::Io(err)); + } } } } @@ -331,6 +338,14 @@ impl< Ok(ConnectionResult::AcceptTimeout) } + fn reregister_poll_interest(&mut self) -> io::Result<()> { + self.poll.registry().reregister( + &mut self.listener, + Token(0), + Interest::READABLE | Interest::WRITABLE, + ) + } + fn handle_accepted_connection( &mut self, mut stream: TcpStream, diff --git a/satrs/src/pus/event.rs b/satrs/src/pus/event.rs index c4363bd..578167f 100644 --- a/satrs/src/pus/event.rs +++ b/satrs/src/pus/event.rs @@ -331,10 +331,10 @@ mod tests { fn severity_to_subservice(severity: Severity) -> Subservice { match severity { - Severity::INFO => Subservice::TmInfoReport, - Severity::LOW => Subservice::TmLowSeverityReport, - Severity::MEDIUM => Subservice::TmMediumSeverityReport, - Severity::HIGH => Subservice::TmHighSeverityReport, + Severity::Info => Subservice::TmInfoReport, + Severity::Low => Subservice::TmLowSeverityReport, + Severity::Medium => Subservice::TmMediumSeverityReport, + Severity::High => Subservice::TmHighSeverityReport, } } @@ -347,22 +347,22 @@ mod tests { aux_data: Option<&[u8]>, ) { match severity { - Severity::INFO => { + Severity::Info => { reporter .event_info(sender, time_stamp, event, aux_data) .expect("Error reporting info event"); } - Severity::LOW => { + Severity::Low => { reporter .event_low_severity(sender, time_stamp, event, aux_data) .expect("Error reporting low event"); } - Severity::MEDIUM => { + Severity::Medium => { reporter .event_medium_severity(sender, time_stamp, event, aux_data) .expect("Error reporting medium event"); } - Severity::HIGH => { + Severity::High => { reporter .event_high_severity(sender, time_stamp, event, aux_data) .expect("Error reporting high event"); @@ -389,7 +389,7 @@ mod tests { if let Some(err_data) = error_data { error_copy.extend_from_slice(err_data); } - let event = EventU32::new(severity, EXAMPLE_GROUP_ID, EXAMPLE_EVENT_ID_0) + let event = EventU32::new_checked(severity, EXAMPLE_GROUP_ID, EXAMPLE_EVENT_ID_0) .expect("Error creating example event"); report_basic_event( &mut reporter, @@ -417,35 +417,35 @@ mod tests { #[test] fn basic_info_event_generation() { - basic_event_test(4, Severity::INFO, None); + basic_event_test(4, Severity::Info, None); } #[test] fn basic_low_severity_event() { - basic_event_test(4, Severity::LOW, None); + basic_event_test(4, Severity::Low, None); } #[test] fn basic_medium_severity_event() { - basic_event_test(4, Severity::MEDIUM, None); + basic_event_test(4, Severity::Medium, None); } #[test] fn basic_high_severity_event() { - basic_event_test(4, Severity::HIGH, None); + basic_event_test(4, Severity::High, None); } #[test] fn event_with_info_string() { let info_string = "Test Information"; - basic_event_test(32, Severity::INFO, Some(info_string.as_bytes())); + basic_event_test(32, Severity::Info, Some(info_string.as_bytes())); } #[test] fn low_severity_with_raw_err_data() { let raw_err_param: i32 = -1; let raw_err = raw_err_param.to_be_bytes(); - basic_event_test(8, Severity::LOW, Some(&raw_err)) + basic_event_test(8, Severity::Low, Some(&raw_err)) } fn check_buf_too_small( @@ -454,7 +454,7 @@ mod tests { expected_found_len: usize, ) { let time_stamp_empty: [u8; 7] = [0; 7]; - let event = EventU32::new(Severity::INFO, EXAMPLE_GROUP_ID, EXAMPLE_EVENT_ID_0) + let event = EventU32::new_checked(Severity::Info, EXAMPLE_GROUP_ID, EXAMPLE_EVENT_ID_0) .expect("Error creating example event"); let err = reporter.event_info(sender, &time_stamp_empty, event, None); assert!(err.is_err()); diff --git a/satrs/src/pus/event_man.rs b/satrs/src/pus/event_man.rs index b8ddb6b..c8aec2d 100644 --- a/satrs/src/pus/event_man.rs +++ b/satrs/src/pus/event_man.rs @@ -28,7 +28,7 @@ pub use heapless_mod::*; /// structure to track disabled events. A more primitive and embedded friendly /// solution could track this information in a static or pre-allocated list which contains /// the disabled events. -pub trait PusEventMgmtBackendProvider { +pub trait PusEventReportingMapProvider { type Error; fn event_enabled(&self, event: &Event) -> bool; @@ -56,7 +56,7 @@ pub mod heapless_mod { { } - impl PusEventMgmtBackendProvider + impl PusEventReportingMapProvider for HeaplessPusMgmtBackendProvider { type Error = (); @@ -105,20 +105,23 @@ impl From for EventManError { pub mod alloc_mod { use core::marker::PhantomData; - use crate::events::EventU16; + use crate::{ + events::EventU16, + pus::event::{DummyEventHook, EventTmHookProvider}, + }; use super::*; /// Default backend provider which uses a hash set as the event reporting status container - /// like mentioned in the example of the [PusEventMgmtBackendProvider] documentation. + /// like mentioned in the example of the [PusEventReportingMapProvider] documentation. /// /// This provider is a good option for host systems or larger embedded systems where /// the expected occasional memory allocation performed by the [HashSet] is not an issue. - pub struct DefaultPusEventMgmtBackend { + pub struct DefaultPusEventReportingMap { disabled: HashSet, } - impl Default for DefaultPusEventMgmtBackend { + impl Default for DefaultPusEventReportingMap { fn default() -> Self { Self { disabled: HashSet::default(), @@ -126,51 +129,54 @@ pub mod alloc_mod { } } - impl PusEventMgmtBackendProvider - for DefaultPusEventMgmtBackend + impl + PusEventReportingMapProvider for DefaultPusEventReportingMap { type Error = (); - fn event_enabled(&self, event: &EV) -> bool { + fn event_enabled(&self, event: &Event) -> bool { !self.disabled.contains(event) } - fn enable_event_reporting(&mut self, event: &EV) -> Result { + fn enable_event_reporting(&mut self, event: &Event) -> Result { Ok(self.disabled.remove(event)) } - fn disable_event_reporting(&mut self, event: &EV) -> Result { + fn disable_event_reporting(&mut self, event: &Event) -> Result { Ok(self.disabled.insert(*event)) } } - pub struct PusEventDispatcher< - B: PusEventMgmtBackendProvider, - EV: GenericEvent, - E, + pub struct PusEventTmCreatorWithMap< + ReportingMap: PusEventReportingMapProvider, + Event: GenericEvent, + EventTmHook: EventTmHookProvider = DummyEventHook, > { - reporter: EventReporter, - backend: B, - phantom: PhantomData<(E, EV)>, + pub reporter: EventReporter, + reporting_map: ReportingMap, + phantom: PhantomData, } - impl, Event: GenericEvent, E> - PusEventDispatcher + impl< + ReportingMap: PusEventReportingMapProvider, + Event: GenericEvent, + EventTmHook: EventTmHookProvider, + > PusEventTmCreatorWithMap { - pub fn new(reporter: EventReporter, backend: B) -> Self { + pub fn new(reporter: EventReporter, backend: ReportingMap) -> Self { Self { reporter, - backend, + reporting_map: backend, phantom: PhantomData, } } - pub fn enable_tm_for_event(&mut self, event: &Event) -> Result { - self.backend.enable_event_reporting(event) + pub fn enable_tm_for_event(&mut self, event: &Event) -> Result { + self.reporting_map.enable_event_reporting(event) } - pub fn disable_tm_for_event(&mut self, event: &Event) -> Result { - self.backend.disable_event_reporting(event) + pub fn disable_tm_for_event(&mut self, event: &Event) -> Result { + self.reporting_map.disable_event_reporting(event) } pub fn generate_pus_event_tm_generic( @@ -180,26 +186,26 @@ pub mod alloc_mod { event: Event, params: Option<&[u8]>, ) -> Result { - if !self.backend.event_enabled(&event) { + if !self.reporting_map.event_enabled(&event) { return Ok(false); } match event.severity() { - Severity::INFO => self + Severity::Info => self .reporter .event_info(sender, time_stamp, event, params) .map(|_| true) .map_err(|e| e.into()), - Severity::LOW => self + Severity::Low => self .reporter .event_low_severity(sender, time_stamp, event, params) .map(|_| true) .map_err(|e| e.into()), - Severity::MEDIUM => self + Severity::Medium => self .reporter .event_medium_severity(sender, time_stamp, event, params) .map(|_| true) .map_err(|e| e.into()), - Severity::HIGH => self + Severity::High => self .reporter .event_high_severity(sender, time_stamp, event, params) .map(|_| true) @@ -208,31 +214,33 @@ pub mod alloc_mod { } } - impl - PusEventDispatcher, EV, ()> + impl + PusEventTmCreatorWithMap, Event, EventTmHook> { - pub fn new_with_default_backend(reporter: EventReporter) -> Self { + pub fn new_with_default_backend(reporter: EventReporter) -> Self { Self { reporter, - backend: DefaultPusEventMgmtBackend::default(), + reporting_map: DefaultPusEventReportingMap::default(), phantom: PhantomData, } } } - impl, E> PusEventDispatcher { + impl> + PusEventTmCreatorWithMap + { pub fn enable_tm_for_event_with_sev( &mut self, event: &EventU32TypedSev, - ) -> Result { - self.backend.enable_event_reporting(event.as_ref()) + ) -> Result { + self.reporting_map.enable_event_reporting(event.as_ref()) } pub fn disable_tm_for_event_with_sev( &mut self, event: &EventU32TypedSev, - ) -> Result { - self.backend.disable_event_reporting(event.as_ref()) + ) -> Result { + self.reporting_map.disable_event_reporting(event.as_ref()) } pub fn generate_pus_event_tm( @@ -246,10 +254,10 @@ pub mod alloc_mod { } } - pub type DefaultPusEventU16Dispatcher = - PusEventDispatcher, EventU16, E>; - pub type DefaultPusEventU32Dispatcher = - PusEventDispatcher, EventU32, E>; + pub type DefaultPusEventU16TmCreator = + PusEventTmCreatorWithMap, EventU16, EventTmHook>; + pub type DefaultPusEventU32TmCreator = + PusEventTmCreatorWithMap, EventU32, EventTmHook>; } #[cfg(test)] mod tests { @@ -258,23 +266,22 @@ mod tests { use crate::{events::SeverityInfo, tmtc::PacketAsVec}; use std::sync::mpsc::{self, TryRecvError}; - const INFO_EVENT: EventU32TypedSev = - EventU32TypedSev::::const_new(1, 0); - const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5); + const INFO_EVENT: EventU32TypedSev = EventU32TypedSev::::new(1, 0); + const LOW_SEV_EVENT: EventU32 = EventU32::new(Severity::Low, 1, 5); const EMPTY_STAMP: [u8; 7] = [0; 7]; const TEST_APID: u16 = 0x02; const TEST_ID: UniqueApidTargetId = UniqueApidTargetId::new(TEST_APID, 0x05); - fn create_basic_man_1() -> DefaultPusEventU32Dispatcher<()> { + fn create_basic_man_1() -> DefaultPusEventU32TmCreator { let reporter = EventReporter::new(TEST_ID.raw(), TEST_APID, 0, 128) .expect("Creating event repoter failed"); - PusEventDispatcher::new_with_default_backend(reporter) + PusEventTmCreatorWithMap::new_with_default_backend(reporter) } - fn create_basic_man_2() -> DefaultPusEventU32Dispatcher<()> { + fn create_basic_man_2() -> DefaultPusEventU32TmCreator { let reporter = EventReporter::new(TEST_ID.raw(), TEST_APID, 0, 128) .expect("Creating event repoter failed"); - let backend = DefaultPusEventMgmtBackend::default(); - PusEventDispatcher::new(reporter, backend) + let backend = DefaultPusEventReportingMap::default(); + PusEventTmCreatorWithMap::new(reporter, backend) } #[test] diff --git a/satrs/src/pus/event_srv.rs b/satrs/src/pus/event_srv.rs index c782b3a..8ea54ec 100644 --- a/satrs/src/pus/event_srv.rs +++ b/satrs/src/pus/event_srv.rs @@ -181,7 +181,7 @@ mod tests { use super::PusEventServiceHandler; - const TEST_EVENT_0: EventU32 = EventU32::const_new(crate::events::Severity::INFO, 5, 25); + const TEST_EVENT_0: EventU32 = EventU32::new(crate::events::Severity::Info, 5, 25); struct Pus5HandlerWithStoreTester { common: PusServiceHandlerWithSharedStoreCommon, diff --git a/satrs/tests/pus_autogen_events.rs b/satrs/tests/pus_autogen_events.rs index 4aced1d..22800de 100644 --- a/satrs/tests/pus_autogen_events.rs +++ b/satrs/tests/pus_autogen_events.rs @@ -21,7 +21,7 @@ struct EventIntrospection { } //#[event(descr="This is some info event")] -const INFO_EVENT_0: EventU32TypedSev = EventU32TypedSev::const_new(0, 0); +const INFO_EVENT_0: EventU32TypedSev = EventU32TypedSev::new(0, 0); const INFO_EVENT_0_ERASED: EventU32 = EventU32::const_from_info(INFO_EVENT_0); // This is ideally auto-generated @@ -36,7 +36,7 @@ const INFO_EVENT_0_INTROSPECTION: EventIntrospection = EventIntrospection { }; //#[event(descr="This is some low severity event")] -const SOME_LOW_SEV_EVENT: EventU32TypedSev = EventU32TypedSev::const_new(0, 12); +const SOME_LOW_SEV_EVENT: EventU32TypedSev = EventU32TypedSev::new(0, 12); //const EVENT_LIST: [&'static Event; 2] = [&INFO_EVENT_0, &SOME_LOW_SEV_EVENT]; @@ -47,7 +47,7 @@ const TEST_GROUP_NAME_NAME: &str = "TEST_GROUP_NAME"; //#[event(desc="Some medium severity event")] const MEDIUM_SEV_EVENT_IN_OTHER_GROUP: EventU32TypedSev = - EventU32TypedSev::const_new(TEST_GROUP_NAME, 0); + EventU32TypedSev::new(TEST_GROUP_NAME, 0); const MEDIUM_SEV_EVENT_IN_OTHER_GROUP_REDUCED: EventU32 = EventU32::const_from_medium(MEDIUM_SEV_EVENT_IN_OTHER_GROUP); @@ -62,7 +62,7 @@ const MEDIUM_SEV_EVENT_IN_OTHER_GROUP_INTROSPECTION: EventIntrospection = EventI info: "Some medium severity event", }; -const CONST_SLICE: &'static [u8] = &[0, 1, 2, 3]; +const CONST_SLICE: &[u8] = &[0, 1, 2, 3]; const INTROSPECTION_FOR_TEST_GROUP_0: [&EventIntrospection; 2] = [&INFO_EVENT_0_INTROSPECTION, &INFO_EVENT_0_INTROSPECTION]; diff --git a/satrs/tests/pus_events.rs b/satrs/tests/pus_events.rs index a5c3061..d9d572e 100644 --- a/satrs/tests/pus_events.rs +++ b/satrs/tests/pus_events.rs @@ -1,11 +1,11 @@ use satrs::event_man::{ EventManagerWithMpsc, EventMessage, EventMessageU32, EventRoutingError, EventSendProvider, - EventU32SenderMpsc, MpscEventU32Receiver, + EventU32SenderMpsc, }; use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo}; use satrs::params::U32Pair; use satrs::params::{Params, ParamsHeapless, WritableToBeBytes}; -use satrs::pus::event_man::{DefaultPusEventMgmtBackend, EventReporter, PusEventDispatcher}; +use satrs::pus::event_man::{DefaultPusEventReportingMap, EventReporter, PusEventTmCreatorWithMap}; use satrs::pus::test_util::TEST_COMPONENT_ID_0; use satrs::request::UniqueApidTargetId; use satrs::tmtc::PacketAsVec; @@ -14,9 +14,8 @@ use spacepackets::ecss::{PusError, PusPacket}; use std::sync::mpsc::{self, SendError, TryRecvError}; use std::thread; -const INFO_EVENT: EventU32TypedSev = - EventU32TypedSev::::const_new(1, 0); -const LOW_SEV_EVENT: EventU32 = EventU32::const_new(Severity::LOW, 1, 5); +const INFO_EVENT: EventU32TypedSev = EventU32TypedSev::::new(1, 0); +const LOW_SEV_EVENT: EventU32 = EventU32::new(Severity::Low, 1, 5); const EMPTY_STAMP: [u8; 7] = [0; 7]; const TEST_APID: u16 = 0x02; const TEST_ID: UniqueApidTargetId = UniqueApidTargetId::new(TEST_APID, 0x05); @@ -29,18 +28,18 @@ pub enum CustomTmSenderError { #[test] fn test_threaded_usage() { - let (event_sender, event_man_receiver) = mpsc::channel(); - let event_receiver = MpscEventU32Receiver::new(event_man_receiver); - let mut event_man = EventManagerWithMpsc::new(event_receiver); + let (event_tx, event_rx) = mpsc::sync_channel(100); + let mut event_man = EventManagerWithMpsc::new(event_rx); let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel(); let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx); event_man.subscribe_all(pus_event_man_send_provider.target_id()); event_man.add_sender(pus_event_man_send_provider); - let (event_tx, event_rx) = mpsc::channel::(); + let (event_packet_tx, event_packet_rx) = mpsc::channel::(); let reporter = EventReporter::new(TEST_ID.raw(), 0x02, 0, 128).expect("Creating event reporter failed"); - let pus_event_man = PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default()); + let pus_event_man = + PusEventTmCreatorWithMap::new(reporter, DefaultPusEventReportingMap::default()); let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| { panic!("received routing error for event {event_msg:?}: {error:?}"); }; @@ -54,7 +53,7 @@ fn test_threaded_usage() { Ok(event_msg) => { let gen_event = |aux_data| { pus_event_man.generate_pus_event_tm_generic( - &event_tx, + &event_packet_tx, &EMPTY_STAMP, event_msg.event(), aux_data, @@ -100,14 +99,14 @@ fn test_threaded_usage() { // Event sender and TM checker thread let jh1 = thread::spawn(move || { - event_sender + event_tx .send(EventMessage::new( TEST_COMPONENT_ID_0.id(), INFO_EVENT.into(), )) .expect("Sending info event failed"); loop { - match event_rx.try_recv() { + match event_packet_rx.try_recv() { // Event TM received successfully Ok(event_tm) => { let tm = PusTmReader::new(event_tm.packet.as_slice(), 7) @@ -129,7 +128,7 @@ fn test_threaded_usage() { } } } - event_sender + event_tx .send(EventMessage::new_with_params( TEST_COMPONENT_ID_0.id(), LOW_SEV_EVENT, @@ -137,7 +136,7 @@ fn test_threaded_usage() { )) .expect("Sending low severity event failed"); loop { - match event_rx.try_recv() { + match event_packet_rx.try_recv() { // Event TM received successfully Ok(event_tm) => { let tm = PusTmReader::new(event_tm.packet.as_slice(), 7)