From 2ef3057771b8d936536b7b4068c608fe52a9b684 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 5 Feb 2024 17:48:53 +0100 Subject: [PATCH] extracted event handling --- satrs-core/src/pool.rs | 2 +- satrs-example/src/events.rs | 191 ++++++++++++++++++++++++++++++++++++ satrs-example/src/main.rs | 120 +++++----------------- 3 files changed, 219 insertions(+), 94 deletions(-) create mode 100644 satrs-example/src/events.rs diff --git a/satrs-core/src/pool.rs b/satrs-core/src/pool.rs index 3ab67a1..0111d18 100644 --- a/satrs-core/src/pool.rs +++ b/satrs-core/src/pool.rs @@ -337,7 +337,7 @@ mod alloc_mod { } impl StaticPoolConfig { - pub const fn new(cfg: Vec<(NumBlocks, usize)>) -> Self { + pub fn new(cfg: Vec<(NumBlocks, usize)>) -> Self { StaticPoolConfig { cfg } } diff --git a/satrs-example/src/events.rs b/satrs-example/src/events.rs new file mode 100644 index 0000000..5f48d53 --- /dev/null +++ b/satrs-example/src/events.rs @@ -0,0 +1,191 @@ +use std::sync::mpsc::{self, SendError}; + +use satrs_core::{ + event_man::{ + EventManager, EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, + SendEventProvider, + }, + events::EventU32, + params::Params, + pool::StoreAddr, + pus::{ + event_man::{ + DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, + PusEventDispatcher, + }, + verification::{TcStateStarted, VerificationReporterWithSender, VerificationToken}, + MpscTmInStoreSender, + }, + spacepackets::time::cds::{self, TimeProvider}, + tmtc::tm_helper::SharedTmStore, + ChannelId, +}; +use satrs_example::{TmSenderId, PUS_APID}; + +use crate::update_time; + +pub type MpscEventManager = EventManager)>>; + +pub struct PusEventHandler { + event_request_rx: mpsc::Receiver, + pus_event_dispatcher: PusEventDispatcher<(), EventU32>, + pus_event_man_rx: mpsc::Receiver<(EventU32, Option)>, + tm_sender: MpscTmInStoreSender, + time_provider: TimeProvider, + timestamp: [u8; 7], + verif_handler: VerificationReporterWithSender, +} + +impl PusEventHandler { + pub fn new( + shared_tm_store: SharedTmStore, + tm_funnel_tx: mpsc::Sender, + verif_handler: VerificationReporterWithSender, + event_manager: &mut MpscEventManager, + event_request_rx: mpsc::Receiver, + ) -> Self { + let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel(); + + // All events sent to the manager are routed to the PUS event manager, which generates PUS event + // telemetry for each event. + let event_reporter = EventReporter::new(PUS_APID, 128).unwrap(); + let pus_tm_backend = DefaultPusMgmtBackendProvider::::default(); + let pus_event_dispatcher = + PusEventDispatcher::new(event_reporter, Box::new(pus_tm_backend)); + let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx); + + event_manager.subscribe_all(pus_event_man_send_provider.id()); + event_manager.add_sender(pus_event_man_send_provider); + + Self { + event_request_rx, + pus_event_dispatcher, + pus_event_man_rx, + time_provider: cds::TimeProvider::new_with_u16_days(0, 0), + timestamp: [0; 7], + verif_handler, + tm_sender: MpscTmInStoreSender::new( + TmSenderId::AllEvents as ChannelId, + "ALL_EVENTS_TX", + shared_tm_store, + tm_funnel_tx, + ), + } + } + + pub fn handle_event_requests(&mut self) { + let report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { + let started_token: VerificationToken = event_req + .token + .try_into() + .expect("expected start verification token"); + self.verif_handler + .completion_success(started_token, Some(timestamp)) + .expect("Sending completion success failed"); + }; + // handle event requests + if let Ok(event_req) = self.event_request_rx.try_recv() { + match event_req.request { + EventRequest::Enable(event) => { + self.pus_event_dispatcher + .enable_tm_for_event(&event) + .expect("Enabling TM failed"); + update_time(&mut self.time_provider, &mut self.timestamp); + report_completion(event_req, &self.timestamp); + } + EventRequest::Disable(event) => { + self.pus_event_dispatcher + .disable_tm_for_event(&event) + .expect("Disabling TM failed"); + update_time(&mut self.time_provider, &mut self.timestamp); + report_completion(event_req, &self.timestamp); + } + } + } + } + + pub fn generate_pus_event_tm(&mut self) { + // Perform the generation of PUS event packets + if let Ok((event, _param)) = self.pus_event_man_rx.try_recv() { + update_time(&mut self.time_provider, &mut self.timestamp); + self.pus_event_dispatcher + .generate_pus_event_tm_generic(&mut self.tm_sender, &self.timestamp, event, None) + .expect("Sending TM as event failed"); + } + } +} + +pub struct EventManagerWrapper { + event_manager: MpscEventManager, + event_sender: mpsc::Sender<(EventU32, Option)>, +} + +impl EventManagerWrapper { + pub fn new() -> Self { + // The sender handle is the primary sender handle for all components which want to create events. + // The event manager will receive the RX handle to receive all the events. + let (event_sender, event_man_rx) = mpsc::channel(); + let event_recv = MpscEventReceiver::::new(event_man_rx); + Self { + event_manager: EventManagerWithMpscQueue::new(Box::new(event_recv)), + event_sender, + } + } + + pub fn clone_event_sender(&self) -> mpsc::Sender<(EventU32, Option)> { + self.event_sender.clone() + } + + pub fn event_manager(&mut self) -> &mut MpscEventManager { + &mut self.event_manager + } + + pub fn try_event_routing(&mut self) { + // Perform the event routing. + self.event_manager + .try_event_handling() + .expect("event handling failed"); + } +} + +pub struct EventHandler { + pub event_man_wrapper: EventManagerWrapper, + pub pus_event_handler: PusEventHandler, +} + +impl EventHandler { + pub fn new( + shared_tm_store: SharedTmStore, + tm_funnel_tx: mpsc::Sender, + verif_handler: VerificationReporterWithSender, + event_request_rx: mpsc::Receiver, + ) -> Self { + let mut event_man_wrapper = EventManagerWrapper::new(); + let pus_event_handler = PusEventHandler::new( + shared_tm_store, + tm_funnel_tx, + verif_handler, + event_man_wrapper.event_manager(), + event_request_rx, + ); + Self { + event_man_wrapper, + pus_event_handler, + } + } + + pub fn clone_event_sender(&self) -> mpsc::Sender<(EventU32, Option)> { + self.event_man_wrapper.clone_event_sender() + } + + #[allow(dead_code)] + pub fn event_manager(&mut self) -> &mut MpscEventManager { + self.event_man_wrapper.event_manager() + } + + pub fn periodic_operation(&mut self) { + self.pus_event_handler.handle_event_requests(); + self.event_man_wrapper.try_event_routing(); + self.pus_event_handler.generate_pus_event_tm(); + } +} diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 527565e..e763552 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -1,5 +1,6 @@ mod acs; mod ccsds; +mod events; mod hk; mod logger; mod pus; @@ -9,6 +10,7 @@ mod tm_funnel; mod tmtc; mod udp; +use crate::events::EventHandler; use crate::tm_funnel::TmFunnel; use log::info; use satrs_core::hal::std::tcp_server::ServerConfig; @@ -27,22 +29,13 @@ use crate::requests::RequestWithToken; use crate::tcp::{SyncTcpTmSource, TcpTask}; use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmtcTask}; use crate::udp::UdpTmtcServer; -use satrs_core::event_man::{ - EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider, -}; -use satrs_core::events::EventU32; use satrs_core::pool::{StaticMemoryPool, StaticPoolConfig}; -use satrs_core::pus::event_man::{ - DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, - PusEventDispatcher, -}; +use satrs_core::pus::event_man::EventRequestWithToken; use satrs_core::pus::event_srv::PusService5EventHandler; use satrs_core::pus::scheduler::PusScheduler; use satrs_core::pus::scheduler_srv::PusService11SchedHandler; use satrs_core::pus::test::PusService17TestHandler; -use satrs_core::pus::verification::{ - TcStateStarted, VerificationReporterCfg, VerificationReporterWithSender, VerificationToken, -}; +use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; use satrs_core::pus::{ EcssTcInSharedStoreConverter, MpscTcReceiver, MpscTmInStoreSender, PusServiceHelper, }; @@ -56,7 +49,7 @@ use satrs_example::{ }; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; -use std::sync::mpsc::channel; +use std::sync::mpsc::{self, channel}; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; @@ -73,7 +66,6 @@ fn main() { (15, 2048), ])); let shared_tm_store = SharedTmStore::new(tm_pool); - let tm_store_event = shared_tm_store.clone(); let tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ (30, 32), (15, 64), @@ -108,30 +100,6 @@ fn main() { // Every software component which needs to generate verification telemetry, gets a cloned // verification reporter. let verif_reporter = VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); - let reporter_event_handler = verif_reporter.clone(); - let reporter_aocs = verif_reporter.clone(); - - // Create event handling components - // These sender handles are used to send event requests, for example to enable or disable - // certain events - let (event_request_tx, event_request_rx) = channel::(); - // The sender handle is the primary sender handle for all components which want to create events. - // The event manager will receive the RX handle to receive all the events. - let (event_sender, event_man_rx) = channel(); - let event_recv = MpscEventReceiver::::new(event_man_rx); - let test_srv_event_sender = event_sender; - let mut event_man = EventManagerWithMpscQueue::new(Box::new(event_recv)); - - // All events sent to the manager are routed to the PUS event manager, which generates PUS event - // telemetry for each event. - let event_reporter = EventReporter::new(PUS_APID, 128).unwrap(); - let pus_tm_backend = DefaultPusMgmtBackendProvider::::default(); - let mut pus_event_dispatcher = - PusEventDispatcher::new(event_reporter, Box::new(pus_tm_backend)); - let (pus_event_man_tx, pus_event_man_rx) = channel(); - let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx); - event_man.subscribe_all(pus_event_man_send_provider.id()); - event_man.add_sender(pus_event_man_send_provider); // Some request are targetable. This map is used to retrieve sender handles based on a target ID. let mut request_map = HashMap::new(); @@ -155,6 +123,19 @@ fn main() { tm_udp_server_rx: tm_server_rx, }; + // Create event handling components + // These sender handles are used to send event requests, for example to enable or disable + // certain events. + let (event_request_tx, event_request_rx) = mpsc::channel::(); + // The event task is the core handler to perform the event routing and TM handling as specified + // in the sat-rs documentation. + let mut event_handler = EventHandler::new( + shared_tm_store.clone(), + tm_funnel_tx.clone(), + verif_reporter.clone(), + event_request_rx, + ); + let (pus_test_tx, pus_test_rx) = channel(); let (pus_event_tx, pus_event_rx) = channel(); let (pus_sched_tx, pus_sched_rx) = channel(); @@ -187,7 +168,7 @@ fn main() { )); let mut pus_17_wrapper = Service17CustomWrapper { pus17_handler, - test_srv_event_sender, + test_srv_event_sender: event_handler.clone_event_sender(), }; let sched_srv_tm_sender = MpscTmInStoreSender::new( @@ -284,7 +265,10 @@ fn main() { let ccsds_receiver = CcsdsReceiver { tc_source: tc_args.tc_source.clone(), }; - let mut tmtc_task = TmtcTask::new(tc_args, PusReceiver::new(verif_reporter, pus_router)); + let mut tmtc_task = TmtcTask::new( + tc_args, + PusReceiver::new(verif_reporter.clone(), pus_router), + ); let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone())); let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) @@ -299,7 +283,7 @@ fn main() { shared_tm_store.clone(), tm_funnel_tx.clone(), acs_thread_rx, - reporter_aocs, + verif_reporter, ); let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); @@ -354,59 +338,9 @@ fn main() { info!("Starting event handling task"); let jh2 = thread::Builder::new() .name("Event".to_string()) - .spawn(move || { - let mut timestamp: [u8; 7] = [0; 7]; - let mut sender = MpscTmInStoreSender::new( - TmSenderId::AllEvents as ChannelId, - "ALL_EVENTS_TX", - tm_store_event.clone(), - tm_funnel_tx, - ); - let mut time_provider = TimeProvider::new_with_u16_days(0, 0); - let report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { - let started_token: VerificationToken = event_req - .token - .try_into() - .expect("expected start verification token"); - reporter_event_handler - .completion_success(started_token, Some(timestamp)) - .expect("Sending completion success failed"); - }; - loop { - // handle event requests - if let Ok(event_req) = event_request_rx.try_recv() { - match event_req.request { - EventRequest::Enable(event) => { - pus_event_dispatcher - .enable_tm_for_event(&event) - .expect("Enabling TM failed"); - update_time(&mut time_provider, &mut timestamp); - report_completion(event_req, ×tamp); - } - EventRequest::Disable(event) => { - pus_event_dispatcher - .disable_tm_for_event(&event) - .expect("Disabling TM failed"); - update_time(&mut time_provider, &mut timestamp); - report_completion(event_req, ×tamp); - } - } - } - - // Perform the event routing. - event_man - .try_event_handling() - .expect("event handling failed"); - - // Perform the generation of PUS event packets - if let Ok((event, _param)) = pus_event_man_rx.try_recv() { - update_time(&mut time_provider, &mut timestamp); - pus_event_dispatcher - .generate_pus_event_tm_generic(&mut sender, ×tamp, event, None) - .expect("Sending TM as event failed"); - } - thread::sleep(Duration::from_millis(400)); - } + .spawn(move || loop { + event_handler.periodic_operation(); + thread::sleep(Duration::from_millis(400)); }) .unwrap();