diff --git a/Cargo.lock b/Cargo.lock index d468c5f..8c1147c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -733,9 +733,9 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" -version = "0.2.0-rc.3" +version = "0.2.0-rc.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aa9241e4d6cb0cc395927cfe653d8bc4a9cb6b2c27f28fec713d5e6ceb0ba23" +checksum = "2adc1d9369e3f7e21dabb3181e36c914d1a3f68f4900207a2baa129c2fd5baba" dependencies = [ "bus", "cobs", @@ -783,9 +783,9 @@ dependencies = [ [[package]] name = "satrs-shared" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f01df804b902334a23c539db5e37f11bf41ecb86596292e7cc091628bf2c4f67" +checksum = "6042477018c2d43fffccaaa5099bc299a58485139b4d31c5b276889311e474f1" dependencies = [ "serde", "spacepackets", diff --git a/Cargo.toml b/Cargo.toml index e5d4c9d..0d8acae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ homedir = "0.2" socket2 = "0.5" [dependencies.satrs] -version = "0.2.0-rc.3" +version = "0.2.0-rc.5" # git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" # branch = "main" features = ["test_util"] diff --git a/src/config.rs b/src/config.rs index f7a260a..e3d3554 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,6 @@ use lazy_static::lazy_static; use num_enum::{IntoPrimitive, TryFromPrimitive}; +use satrs::events::{EventU32TypedSev, SeverityInfo}; use satrs::spacepackets::PacketId; use satrs_mib::res_code::ResultU16Info; use satrs_mib::resultcode; @@ -39,6 +40,9 @@ pub enum GroupId { Action = 3, } +pub const TEST_EVENT: EventU32TypedSev = + EventU32TypedSev::::new(GroupId::Tmtc as u16, 0); + lazy_static! { pub static ref HOME_PATH: PathBuf = { let mut home_path = PathBuf::new(); @@ -280,6 +284,7 @@ pub mod tasks { pub const FREQ_MS_AOCS: u64 = 500; pub const FREQ_MS_PUS_STACK: u64 = 200; pub const FREQ_MS_CTRL: u64 = 400; + pub const FREQ_MS_CAMERA_HANDLING: u64 = 400; pub const STOP_CHECK_FREQUENCY_MS: u64 = 400; pub const STOP_CHECK_FREQUENCY: Duration = Duration::from_millis(STOP_CHECK_FREQUENCY_MS); diff --git a/src/events.rs b/src/events.rs new file mode 100644 index 0000000..67a33c9 --- /dev/null +++ b/src/events.rs @@ -0,0 +1,287 @@ +use std::sync::mpsc::{self}; + +use crate::pus::create_verification_reporter; +use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT; +use satrs::event_man::{EventMessageU32, EventRoutingError}; +use satrs::params::WritableToBeBytes; +use satrs::pus::event::EventTmHookProvider; +use satrs::pus::verification::VerificationReporter; +use satrs::request::UniqueApidTargetId; +use satrs::tmtc::PacketAsVec; +use satrs::{ + event_man::{EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded}, + pus::{ + event_man::{ + DefaultPusEventU32TmCreator, EventReporter, EventRequest, EventRequestWithToken, + }, + verification::{TcStateStarted, VerificationReportingProvider, VerificationToken}, + }, + spacepackets::time::cds::CdsTime, +}; + +use ops_sat_rs::update_time; + +// This helper sets the APID of the event sender for the PUS telemetry. +#[derive(Default)] +pub struct EventApidSetter { + pub next_apid: u16, +} + +impl EventTmHookProvider for EventApidSetter { + fn modify_tm(&self, tm: &mut satrs::spacepackets::ecss::tm::PusTmCreator) { + tm.set_apid(self.next_apid); + } +} + +/// The PUS event handler subscribes for all events and converts them into ECSS PUS 5 event +/// packets. It also handles the verification completion of PUS event service requests. +pub struct PusEventHandler { + event_request_rx: mpsc::Receiver, + pus_event_tm_creator: DefaultPusEventU32TmCreator, + pus_event_man_rx: mpsc::Receiver, + tm_sender: mpsc::Sender, + time_provider: CdsTime, + timestamp: [u8; 7], + verif_handler: VerificationReporter, +} + +impl PusEventHandler { + pub fn new( + tm_sender: mpsc::Sender, + verif_handler: VerificationReporter, + event_manager: &mut EventManagerWithBoundedMpsc, + event_request_rx: mpsc::Receiver, + ) -> Self { + let event_queue_cap = 30; + let (pus_event_man_tx, pus_event_man_rx) = mpsc::sync_channel(event_queue_cap); + + // All events sent to the manager are routed to the PUS event manager, which generates PUS event + // telemetry for each event. + let event_reporter = EventReporter::new_with_hook( + PUS_EVENT_MANAGEMENT.raw(), + 0, + 0, + 128, + EventApidSetter::default(), + ) + .unwrap(); + let pus_event_dispatcher = + DefaultPusEventU32TmCreator::new_with_default_backend(event_reporter); + let pus_event_man_send_provider = EventU32SenderMpscBounded::new( + PUS_EVENT_MANAGEMENT.raw(), + pus_event_man_tx, + event_queue_cap, + ); + + event_manager.subscribe_all(pus_event_man_send_provider.target_id()); + event_manager.add_sender(pus_event_man_send_provider); + + Self { + event_request_rx, + pus_event_tm_creator: pus_event_dispatcher, + pus_event_man_rx, + time_provider: CdsTime::new_with_u16_days(0, 0), + timestamp: [0; 7], + verif_handler, + tm_sender, + } + } + + pub fn handle_event_requests(&mut self) { + let report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { + let started_token: VerificationToken = event_req + .token + .try_into() + .expect("expected start verification token"); + self.verif_handler + .completion_success(&self.tm_sender, started_token, timestamp) + .expect("Sending completion success failed"); + }; + loop { + // handle event requests + match self.event_request_rx.try_recv() { + Ok(event_req) => match event_req.request { + EventRequest::Enable(event) => { + self.pus_event_tm_creator + .enable_tm_for_event(&event) + .expect("Enabling TM failed"); + update_time(&mut self.time_provider, &mut self.timestamp); + report_completion(event_req, &self.timestamp); + } + EventRequest::Disable(event) => { + self.pus_event_tm_creator + .disable_tm_for_event(&event) + .expect("Disabling TM failed"); + update_time(&mut self.time_provider, &mut self.timestamp); + report_completion(event_req, &self.timestamp); + } + }, + Err(e) => match e { + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + log::warn!("all event request senders have disconnected"); + break; + } + }, + } + } + } + + pub fn generate_pus_event_tm(&mut self) { + loop { + // Perform the generation of PUS event packets + match self.pus_event_man_rx.try_recv() { + Ok(event_msg) => { + update_time(&mut self.time_provider, &mut self.timestamp); + let param_vec = event_msg.params().map_or(Vec::new(), |param| { + param.to_vec().expect("failed to convert params to vec") + }); + // We use the TM modification hook to set the sender APID for each event. + self.pus_event_tm_creator.reporter.tm_hook.next_apid = + UniqueApidTargetId::from(event_msg.sender_id()).apid; + self.pus_event_tm_creator + .generate_pus_event_tm_generic( + &self.tm_sender, + &self.timestamp, + event_msg.event(), + Some(¶m_vec), + ) + .expect("Sending TM as event failed"); + } + Err(e) => match e { + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + log::warn!("All event senders have disconnected"); + break; + } + }, + } + } + } +} + +pub struct EventHandler { + pub pus_event_handler: PusEventHandler, + event_manager: EventManagerWithBoundedMpsc, +} + +impl EventHandler { + pub fn new( + tm_sender: mpsc::Sender, + event_rx: mpsc::Receiver, + event_request_rx: mpsc::Receiver, + ) -> Self { + let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx); + let pus_event_handler = PusEventHandler::new( + tm_sender, + create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid), + &mut event_manager, + event_request_rx, + ); + + Self { + pus_event_handler, + event_manager, + } + } + + #[allow(dead_code)] + pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc { + &mut self.event_manager + } + + pub fn periodic_operation(&mut self) { + self.pus_event_handler.handle_event_requests(); + self.try_event_routing(); + self.pus_event_handler.generate_pus_event_tm(); + } + + pub fn try_event_routing(&mut self) { + let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| { + self.routing_error_handler(event_msg, error) + }; + // Perform the event routing. + self.event_manager.try_event_handling(error_handler); + } + + pub fn routing_error_handler(&self, event_msg: &EventMessageU32, error: EventRoutingError) { + log::warn!("event routing error for event {event_msg:?}: {error:?}"); + } +} + +#[cfg(test)] +mod tests { + use satrs::{ + events::EventU32, + pus::verification::VerificationReporterCfg, + spacepackets::{ + ecss::{tm::PusTmReader, PusPacket}, + CcsdsPacket, + }, + tmtc::PacketAsVec, + }; + + use super::*; + + const TEST_CREATOR_ID: UniqueApidTargetId = UniqueApidTargetId::new(1, 2); + const TEST_EVENT: EventU32 = EventU32::new(satrs::events::Severity::Info, 1, 1); + + pub struct EventManagementTestbench { + pub event_tx: mpsc::SyncSender, + pub event_manager: EventManagerWithBoundedMpsc, + pub tm_receiver: mpsc::Receiver, + pub pus_event_handler: PusEventHandler, + } + + impl EventManagementTestbench { + pub fn new() -> Self { + let (event_tx, event_rx) = mpsc::sync_channel(10); + let (_event_req_tx, event_req_rx) = mpsc::sync_channel(10); + let (tm_sender, tm_receiver) = mpsc::channel(); + let verif_reporter_cfg = VerificationReporterCfg::new(0x05, 2, 2, 128).unwrap(); + let verif_reporter = + VerificationReporter::new(PUS_EVENT_MANAGEMENT.id(), &verif_reporter_cfg); + let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx); + let pus_event_handler = + PusEventHandler::new(tm_sender, verif_reporter, &mut event_manager, event_req_rx); + Self { + event_tx, + tm_receiver, + event_manager, + pus_event_handler, + } + } + } + + #[test] + fn test_basic_event_generation() { + let mut testbench = EventManagementTestbench::new(); + testbench + .event_tx + .send(EventMessageU32::new( + TEST_CREATOR_ID.id(), + EventU32::new(satrs::events::Severity::Info, 1, 1), + )) + .expect("failed to send event"); + testbench.pus_event_handler.handle_event_requests(); + testbench.event_manager.try_event_handling(|_, _| {}); + testbench.pus_event_handler.generate_pus_event_tm(); + let tm_packet = testbench + .tm_receiver + .try_recv() + .expect("failed to receive TM packet"); + assert_eq!(tm_packet.sender_id, PUS_EVENT_MANAGEMENT.id()); + let tm_reader = PusTmReader::new(&tm_packet.packet, 7) + .expect("failed to create TM reader") + .0; + assert_eq!(tm_reader.apid(), TEST_CREATOR_ID.apid); + assert_eq!(tm_reader.user_data().len(), 4); + let event_read_back = EventU32::from_be_bytes(tm_reader.user_data().try_into().unwrap()); + assert_eq!(event_read_back, TEST_EVENT); + } + + #[test] + fn test_basic_event_disabled() { + // TODO: Add test. + } +} diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index fe34e52..d935712 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -44,7 +44,6 @@ pub struct TcpSppClientCommon { #[allow(dead_code)] impl TcpSppClientCommon { pub fn handle_read_bytstream(&mut self, read_bytes: usize) -> Result<(), ClientError> { - let mut dummy = 0; if SPP_CLIENT_WIRETAPPING_RX { log::debug!( "SPP TCP RX {} bytes: {:x?}", @@ -54,11 +53,10 @@ impl TcpSppClientCommon { } // This parser is able to deal with broken tail packets, but we ignore those for now.. parse_buffer_for_ccsds_space_packets( - &mut self.read_buf[..read_bytes], + &self.read_buf[..read_bytes], &self.validator, self.id, &self.tc_source_tx, - &mut dummy, )?; Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 014c607..2b6f688 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,3 +32,12 @@ impl Default for TimeStampHelper { } } } + +pub fn update_time(time_provider: &mut CdsTime, timestamp: &mut [u8]) { + time_provider + .update_from_now() + .expect("Could not get current time"); + time_provider + .write_to_bytes(timestamp) + .expect("Writing timestamp failed"); +} diff --git a/src/main.rs b/src/main.rs index 046f4fb..a8b56af 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,25 +6,31 @@ use std::{ }; use log::info; -use ops_sat_rs::config::components::CAMERA_HANDLER; use ops_sat_rs::config::{ cfg_file::create_app_config, components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, pool::create_sched_tc_pool, - tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY}, + tasks::{FREQ_MS_CAMERA_HANDLING, FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY}, VALID_PACKET_ID_LIST, }; +use ops_sat_rs::config::{components::CAMERA_HANDLER, tasks::FREQ_MS_EVENT_HANDLING}; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use ops_sat_rs::TimeStampHelper; -use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}; - -use crate::handlers::camera::IMS100BatchHandler; -use crate::pus::{ - hk::create_hk_service, mode::create_mode_service, scheduler::create_scheduler_service, - PusTcDistributor, PusTcMpscRouter, +use satrs::{ + hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, + pus::event_man::EventRequestWithToken, }; + use crate::tmtc::tm_sink::TmFunnelDynamic; use crate::{controller::ExperimentController, pus::test::create_test_service}; +use crate::{ + events::EventHandler, + pus::{ + hk::create_hk_service, mode::create_mode_service, scheduler::create_scheduler_service, + PusTcDistributor, PusTcMpscRouter, + }, +}; +use crate::{handlers::camera::IMS100BatchHandler, pus::event::create_event_service}; use crate::{ interface::tcp_server::{SyncTcpTmSource, TcpTask}, interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer}, @@ -37,6 +43,7 @@ use crate::{ }; mod controller; +mod events; mod handlers; mod interface; mod logger; @@ -59,12 +66,21 @@ fn main() { let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); let (pus_test_tx, pus_test_rx) = mpsc::channel(); - // let (pus_event_tx, pus_event_rx) = mpsc::channel(); + let (pus_event_tx, pus_event_rx) = mpsc::channel(); let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); let (pus_action_tx, pus_action_rx) = mpsc::channel(); let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); + // Create event handling components + // These sender handles are used to send event requests, for example to enable or disable + // certain events. + let (event_tx, event_rx) = mpsc::sync_channel(100); + let (event_request_tx, event_request_rx) = mpsc::channel::(); + // The event task is the core handler to perform the event routing and TM handling as specified + // in the sat-rs documentation. + let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_rx, event_request_rx); + let (pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); let (_pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); let (_pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel(); @@ -83,27 +99,22 @@ fn main() { let pus_router = PusTcMpscRouter { test_tc_sender: pus_test_tx, - // event_tc_sender: pus_event_tx, + event_tc_sender: pus_event_tx, sched_tc_sender: pus_sched_tx, hk_tc_sender: pus_hk_tx, action_tc_sender: pus_action_tx, mode_tc_sender: pus_mode_tx, }; - let pus_test_service = create_test_service( - tm_funnel_tx.clone(), - // event_handler.clone_event_sender(), - pus_test_rx, - ); + let pus_test_service = create_test_service(tm_funnel_tx.clone(), event_tx.clone(), pus_test_rx); let pus_scheduler_service = create_scheduler_service( tm_funnel_tx.clone(), tc_source_tx.clone(), pus_sched_rx, create_sched_tc_pool(), ); - // - // let pus_event_service = - // create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx); + let pus_event_service = + create_event_service(tm_funnel_tx.clone(), pus_event_rx, event_request_tx); let pus_action_service = create_action_service( tm_funnel_tx.clone(), pus_action_rx, @@ -125,7 +136,7 @@ fn main() { let mut pus_stack = PusStack::new( pus_test_service, pus_hk_service, - // pus_event_service, + pus_event_service, pus_action_service, pus_scheduler_service, pus_mode_service, @@ -163,7 +174,7 @@ fn main() { ) .expect("tcp server creation failed"); - let mut tm_funnel = TmFunnelDynamic::new( + let mut tm_sink = TmFunnelDynamic::new( sync_tm_tcp_source, tm_funnel_rx, tm_tcp_server_tx, @@ -270,17 +281,26 @@ fn main() { // TM Funnel Task info!("Starting TM funnel task"); - let funnel_stop_signal = stop_signal.clone(); + let tm_sink_stop_signal = stop_signal.clone(); let jh_tm_funnel = thread::Builder::new() - .name("ops-sat tm-funnel".to_string()) + .name("ops-sat tm-sink".to_string()) .spawn(move || loop { - tm_funnel.operation(); - if funnel_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + tm_sink.operation(); + if tm_sink_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } }) .unwrap(); + info!("Starting event handling task"); + let jh_event_handling = thread::Builder::new() + .name("sat-rs events".to_string()) + .spawn(move || loop { + event_handler.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING)); + }) + .unwrap(); + // PUS Handler Task info!("Starting PUS handlers task"); let pus_stop_signal = stop_signal.clone(); @@ -305,6 +325,7 @@ fn main() { if camera_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } + thread::sleep(Duration::from_millis(FREQ_MS_CAMERA_HANDLING)); }) .unwrap(); @@ -327,6 +348,9 @@ fn main() { jh_pus_handler .join() .expect("Joining PUS handlers thread failed"); + jh_event_handling + .join() + .expect("Joining PUS handlers thread failed"); jh_camera_handler .join() .expect("Joining camera handler thread failed"); diff --git a/src/pus/event.rs b/src/pus/event.rs new file mode 100644 index 0000000..8700823 --- /dev/null +++ b/src/pus/event.rs @@ -0,0 +1,66 @@ +use std::sync::mpsc; + +use super::HandlingStatus; +use crate::pus::create_verification_reporter; +use log::{error, warn}; +use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT; +use satrs::pus::event_man::EventRequestWithToken; +use satrs::pus::event_srv::PusEventServiceHandler; +use satrs::pus::verification::VerificationReporter; +use satrs::pus::{ + EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper, +}; +use satrs::tmtc::PacketAsVec; + +pub fn create_event_service( + tm_funnel_tx: mpsc::Sender, + pus_event_rx: mpsc::Receiver, + event_request_tx: mpsc::Sender, +) -> EventServiceWrapper { + let pus_5_handler = PusEventServiceHandler::new( + PusServiceHelper::new( + PUS_EVENT_MANAGEMENT.id(), + pus_event_rx, + tm_funnel_tx, + create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid), + EcssTcInVecConverter::default(), + ), + event_request_tx, + ); + EventServiceWrapper { + handler: pus_5_handler, + } +} + +pub struct EventServiceWrapper { + pub handler: PusEventServiceHandler< + MpscTcReceiver, + mpsc::Sender, + EcssTcInVecConverter, + VerificationReporter, + >, +} + +impl EventServiceWrapper { + pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { + match self.handler.poll_and_handle_next_tc(time_stamp) { + Ok(result) => match result { + PusPacketHandlerResult::RequestHandled => {} + PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { + warn!("PUS 5 partial packet handling success: {e:?}") + } + PusPacketHandlerResult::CustomSubservice(invalid, _) => { + warn!("PUS 5 invalid subservice {invalid}"); + } + PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { + warn!("PUS 5 subservice {subservice} not implemented"); + } + PusPacketHandlerResult::Empty => return HandlingStatus::Empty, + }, + Err(error) => { + error!("PUS packet handling error: {error:?}") + } + } + HandlingStatus::HandledOne + } +} diff --git a/src/pus/mod.rs b/src/pus/mod.rs index 268ead0..b8fc9c7 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -1,4 +1,5 @@ pub mod action; +pub mod event; pub mod hk; pub mod mode; pub mod scheduler; @@ -46,7 +47,7 @@ pub fn create_verification_reporter(owner_id: ComponentId, apid: Apid) -> Verifi /// Simple router structure which forwards PUS telecommands to dedicated handlers. pub struct PusTcMpscRouter { pub test_tc_sender: Sender, - // pub event_tc_sender: Sender, + pub event_tc_sender: Sender, pub sched_tc_sender: Sender, pub hk_tc_sender: Sender, pub action_tc_sender: Sender, @@ -109,16 +110,16 @@ impl PusTcDistributor { tc_in_memory, token: Some(accepted_token.into()), })?, - // PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken { - // tc_in_memory, - // token: Some(accepted_token.into()), - // })?, - // PusServiceId::Scheduling => { - // self.pus_router.sched_tc_sender.send(EcssTcAndToken { - // tc_in_memory, - // token: Some(accepted_token.into()), - // })? - // } + PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken { + tc_in_memory, + token: Some(accepted_token.into()), + })?, + PusServiceId::Scheduling => { + self.pus_router.sched_tc_sender.send(EcssTcAndToken { + tc_in_memory, + token: Some(accepted_token.into()), + })? + } _ => { let result = self.verif_reporter.start_failure( &self.tm_sender, @@ -138,10 +139,10 @@ impl PusTcDistributor { if let Ok(custom_service) = CustomPusServiceId::try_from(e.number) { match custom_service { CustomPusServiceId::Mode => { - // self.pus_router.mode_tc_sender.send(EcssTcAndToken { - // tc_in_memory, - // token: Some(accepted_token.into()), - // })? + self.pus_router.mode_tc_sender.send(EcssTcAndToken { + tc_in_memory, + token: Some(accepted_token.into()), + })? } CustomPusServiceId::Health => {} } diff --git a/src/pus/stack.rs b/src/pus/stack.rs index 759a682..fcc06e6 100644 --- a/src/pus/stack.rs +++ b/src/pus/stack.rs @@ -4,15 +4,15 @@ use derive_new::new; use satrs::spacepackets::time::{cds, TimeWriter}; use super::{ - action::ActionServiceWrapper, hk::HkServiceWrapper, mode::ModeServiceWrapper, - scheduler::SchedulingService, TargetedPusService, + action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper, + mode::ModeServiceWrapper, scheduler::SchedulingService, TargetedPusService, }; #[derive(new)] pub struct PusStack { test_srv: TestCustomServiceWrapper, hk_srv_wrapper: HkServiceWrapper, - // event_srv: EventServiceWrapper, + event_srv: EventServiceWrapper, action_srv_wrapper: ActionServiceWrapper, schedule_srv: SchedulingService, mode_srv: ModeServiceWrapper, @@ -50,7 +50,7 @@ impl PusStack { self.schedule_srv.poll_and_handle_next_tc(&time_stamp), None, ); - // is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None); + is_srv_finished(5, self.event_srv.poll_and_handle_next_tc(&time_stamp), None); is_srv_finished( 8, self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp), diff --git a/src/pus/test.rs b/src/pus/test.rs index 3a14a2a..f10696c 100644 --- a/src/pus/test.rs +++ b/src/pus/test.rs @@ -1,8 +1,8 @@ use crate::pus::create_verification_reporter; use log::{info, warn}; use ops_sat_rs::config::components::PUS_TEST_SERVICE; -use ops_sat_rs::config::tmtc_err; -// use satrs::event_man::{EventMessage, EventMessageU32}; +use ops_sat_rs::config::{tmtc_err, TEST_EVENT}; +use satrs::event_man::{EventMessage, EventMessageU32}; use satrs::pus::test::PusService17TestHandler; use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; use satrs::pus::{ @@ -20,6 +20,7 @@ use super::HandlingStatus; pub fn create_test_service( tm_funnel_tx: mpsc::Sender, + event_tx: mpsc::SyncSender, pus_test_rx: mpsc::Receiver, ) -> TestCustomServiceWrapper { let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( @@ -31,7 +32,7 @@ pub fn create_test_service( )); TestCustomServiceWrapper { handler: pus17_handler, - // test_srv_event_sender: event_sender, + event_tx, } } @@ -42,6 +43,7 @@ pub struct TestCustomServiceWrapper { EcssTcInVecConverter, VerificationReporter, >, + pub event_tx: mpsc::SyncSender, } impl TestCustomServiceWrapper { @@ -79,9 +81,9 @@ impl TestCustomServiceWrapper { time_stamper.write_to_bytes(&mut stamp_buf).unwrap(); if subservice == 128 { info!("Generating test event"); - // self.test_srv_event_sender - // .send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into())) - // .expect("Sending test event failed"); + self.event_tx + .send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into())) + .expect("Sending test event failed"); let start_token = self .handler .service_helper