From f6a6b005af7a120651b5b79879db96d4472be6f3 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 23 Apr 2024 15:00:00 +0200 Subject: [PATCH 1/3] updated pyclient --- pytmtc/pyclient.py | 26 ++++++++++++++++---------- pytmtc/requirements.txt | 2 +- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/pytmtc/pyclient.py b/pytmtc/pyclient.py index a02d648..4bb0829 100755 --- a/pytmtc/pyclient.py +++ b/pytmtc/pyclient.py @@ -103,7 +103,9 @@ class PusHandler(GenericApidHandlerBase): def handle_tm(self, apid: int, packet: bytes, _user_args: Any): try: - pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) + pus_tm = PusTelemetry.unpack( + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE + ) except ValueError as e: _LOGGER.warning("Could not generate PUS TM object from raw data") _LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") @@ -111,7 +113,7 @@ class PusHandler(GenericApidHandlerBase): service = pus_tm.service if service == 1: tm_packet = Service1Tm.unpack( - data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2) + data=packet, params=UnpackParams(CdsShortTimestamp.TIMESTAMP_SIZE, 1, 2) ) res = self.verif_wrapper.add_tm(tm_packet) if res is None: @@ -128,7 +130,9 @@ class PusHandler(GenericApidHandlerBase): elif service == 3: _LOGGER.info("No handling for HK packets implemented") _LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]") - pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) + pus_tm = PusTelemetry.unpack( + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE + ) if pus_tm.subservice == 25: if len(pus_tm.source_data) < 8: raise ValueError("No addressable ID in HK packet") @@ -136,7 +140,7 @@ class PusHandler(GenericApidHandlerBase): _LOGGER.info(json_str) elif service == 5: tm_packet = PusTelemetry.unpack( - packet, time_reader=CdsShortTimestamp.empty() + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE ) src_data = tm_packet.source_data event_u32 = EventU32.unpack(src_data) @@ -145,7 +149,7 @@ class PusHandler(GenericApidHandlerBase): _LOGGER.info("Received test event") elif service == 17: tm_packet = Service17Tm.unpack( - packet, time_reader=CdsShortTimestamp.empty() + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE ) if tm_packet.subservice == 2: self.file_logger.info("Received Ping Reply TM[17,2]") @@ -162,7 +166,7 @@ class PusHandler(GenericApidHandlerBase): f"The service {service} is not implemented in Telemetry Factory" ) tm_packet = PusTelemetry.unpack( - packet, time_reader=CdsShortTimestamp.empty() + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE ) self.raw_logger.log_tm(pus_tm) @@ -197,15 +201,15 @@ class TcHandler(TcHandlerBase): _LOGGER.info(log_entry.log_str) def queue_finished_cb(self, info: ProcedureWrapper): - if info.proc_type == TcProcedureType.DEFAULT: - def_proc = info.to_def_procedure() + if info.proc_type == TcProcedureType.TREE_COMMANDING: + def_proc = info.to_tree_commanding_procedure() _LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}") def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): q = self.queue_helper q.queue_wrapper = wrapper.queue_wrapper - if info.proc_type == TcProcedureType.DEFAULT: - def_proc = info.to_def_procedure() + if info.proc_type == TcProcedureType.TREE_COMMANDING: + def_proc = info.to_tree_commanding_procedure() assert def_proc.cmd_path is not None pus_tc.pack_pus_telecommands(q, def_proc.cmd_path) @@ -256,6 +260,7 @@ def main(): while True: state = tmtc_backend.periodic_op(None) if state.request == BackendRequest.TERMINATION_NO_ERROR: + tmtc_backend.close_com_if() sys.exit(0) elif state.request == BackendRequest.DELAY_IDLE: _LOGGER.info("TMTC Client in IDLE mode") @@ -270,6 +275,7 @@ def main(): elif state.request == BackendRequest.CALL_NEXT: pass except KeyboardInterrupt: + tmtc_backend.close_com_if() sys.exit(0) diff --git a/pytmtc/requirements.txt b/pytmtc/requirements.txt index b3f6f2a..325615c 100644 --- a/pytmtc/requirements.txt +++ b/pytmtc/requirements.txt @@ -1,2 +1,2 @@ -tmtccmd == 8.0.0rc1 +tmtccmd == 8.0.0rc2 # -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd From 511214f903e571a81004dce5297e6ba788d27e11 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 23 Apr 2024 16:14:16 +0200 Subject: [PATCH 2/3] add scheduler service --- src/config.rs | 20 ++++++ src/main.rs | 21 +++--- src/pus/mod.rs | 3 +- src/pus/scheduler.rs | 153 +++++++++++++++++++++++++++++++++++++++++++ src/pus/stack.rs | 12 ++-- 5 files changed, 194 insertions(+), 15 deletions(-) create mode 100644 src/pus/scheduler.rs diff --git a/src/config.rs b/src/config.rs index e48b764..121b683 100644 --- a/src/config.rs +++ b/src/config.rs @@ -172,6 +172,23 @@ pub mod action_err { pub const ACTION_RESULTS: &[ResultU16Info] = &[INVALID_ACTION_ID_EXT]; } +pub mod pool { + use satrs::pool::{StaticMemoryPool, StaticPoolConfig}; + + pub fn create_sched_tc_pool() -> StaticMemoryPool { + StaticMemoryPool::new(StaticPoolConfig::new( + vec![ + (100, 32), + (50, 64), + (50, 128), + (50, 256), + (50, 1024), + (100, 2048), + ], + true, + )) + } +} pub mod components { use satrs::request::UniqueApidTargetId; @@ -191,6 +208,7 @@ pub mod components { UdpServer = 7, TcpServer = 8, TcpSppClient = 9, + PusScheduler = 10, } pub const CONTROLLER_ID: UniqueApidTargetId = @@ -205,6 +223,8 @@ pub mod components { UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusTest as u32); pub const PUS_MODE_SERVICE: UniqueApidTargetId = UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusMode as u32); + pub const PUS_SCHEDULER_SERVICE: UniqueApidTargetId = + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusScheduler as u32); pub const PUS_HK_SERVICE: UniqueApidTargetId = UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusHk as u32); pub const UDP_SERVER: UniqueApidTargetId = diff --git a/src/main.rs b/src/main.rs index ee9b48d..22d024c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,13 +9,14 @@ use log::info; use ops_sat_rs::config::{ cfg_file::create_app_config, components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, + pool::create_sched_tc_pool, tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY}, VALID_PACKET_ID_LIST, }; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}; -use crate::pus::{PusTcDistributor, PusTcMpscRouter}; +use crate::pus::{scheduler::create_scheduler_service, PusTcDistributor, PusTcMpscRouter}; use crate::tmtc::tm_sink::TmFunnelDynamic; use crate::{controller::ExperimentController, pus::test::create_test_service}; use crate::{ @@ -53,7 +54,7 @@ fn main() { let (pus_test_tx, pus_test_rx) = mpsc::channel(); // let (pus_event_tx, pus_event_rx) = mpsc::channel(); - // let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); + let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); // let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); let (pus_action_tx, pus_action_rx) = mpsc::channel(); // let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); @@ -73,7 +74,7 @@ fn main() { let pus_router = PusTcMpscRouter { test_tc_sender: pus_test_tx, // event_tc_sender: pus_event_tx, - // sched_tc_sender: pus_sched_tx, + sched_tc_sender: pus_sched_tx, // hk_tc_sender: pus_hk_tx, action_tc_sender: pus_action_tx, // mode_tc_sender: pus_mode_tx, @@ -84,12 +85,12 @@ fn main() { // event_handler.clone_event_sender(), pus_test_rx, ); - // let pus_scheduler_service = create_scheduler_service_dynamic( - // tm_funnel_tx.clone(), - // tc_source.0.clone(), - // pus_sched_rx, - // create_sched_tc_pool(), - // ); + let pus_scheduler_service = create_scheduler_service( + tm_funnel_tx.clone(), + tc_source_tx.clone(), + pus_sched_rx, + create_sched_tc_pool(), + ); // // let pus_event_service = // create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx); @@ -116,7 +117,7 @@ fn main() { // pus_hk_service, // pus_event_service, pus_action_service, - // pus_scheduler_service, + pus_scheduler_service, // pus_mode_service, ); diff --git a/src/pus/mod.rs b/src/pus/mod.rs index e39c7d1..791a97e 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -1,4 +1,5 @@ pub mod action; +pub mod scheduler; pub mod stack; pub mod test; @@ -52,7 +53,7 @@ pub fn create_verification_reporter(owner_id: ComponentId, apid: Apid) -> Verifi pub struct PusTcMpscRouter { pub test_tc_sender: Sender, // pub event_tc_sender: Sender, - // pub sched_tc_sender: Sender, + pub sched_tc_sender: Sender, // pub hk_tc_sender: Sender, pub action_tc_sender: Sender, // pub mode_tc_sender: Sender, diff --git a/src/pus/scheduler.rs b/src/pus/scheduler.rs new file mode 100644 index 0000000..08cd738 --- /dev/null +++ b/src/pus/scheduler.rs @@ -0,0 +1,153 @@ +use std::sync::mpsc; +use std::time::Duration; + +use crate::pus::create_verification_reporter; +use log::{error, info, warn}; +use ops_sat_rs::config::components::PUS_SCHEDULER_SERVICE; +use satrs::pool::{PoolProvider, StaticMemoryPool}; +use satrs::pus::scheduler::{PusScheduler, TcInfo}; +use satrs::pus::scheduler_srv::PusSchedServiceHandler; +use satrs::pus::verification::VerificationReporter; +use satrs::pus::{ + EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper, +}; +use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool}; +use satrs::ComponentId; + +use super::HandlingStatus; + +pub trait TcReleaser { + fn release(&mut self, sender_id: ComponentId, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool; +} + +impl TcReleaser for PacketSenderWithSharedPool { + fn release( + &mut self, + sender_id: ComponentId, + enabled: bool, + _info: &TcInfo, + tc: &[u8], + ) -> bool { + if enabled { + let shared_pool = self.shared_pool.get_mut(); + // Transfer TC from scheduler TC pool to shared TC pool. + let released_tc_addr = shared_pool + .0 + .write() + .expect("locking pool failed") + .add(tc) + .expect("adding TC to shared pool failed"); + self.sender + .send(PacketInPool::new(sender_id, released_tc_addr)) + .expect("sending TC to TC source failed"); + } + true + } +} + +impl TcReleaser for mpsc::Sender { + fn release( + &mut self, + sender_id: ComponentId, + enabled: bool, + _info: &TcInfo, + tc: &[u8], + ) -> bool { + if enabled { + // Send released TC to centralized TC source. + self.send(PacketAsVec::new(sender_id, tc.to_vec())) + .expect("sending TC to TC source failed"); + } + true + } +} + +pub struct SchedulingService { + pub pus_11_handler: PusSchedServiceHandler< + MpscTcReceiver, + mpsc::Sender, + EcssTcInVecConverter, + VerificationReporter, + PusScheduler, + >, + pub sched_tc_pool: StaticMemoryPool, + pub releaser_buf: [u8; 4096], + pub tc_releaser: Box, +} + +impl SchedulingService { + pub fn release_tcs(&mut self) { + let id = self.pus_11_handler.service_helper.id(); + let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool { + self.tc_releaser.release(id, enabled, info, tc) + }; + + self.pus_11_handler + .scheduler_mut() + .update_time_from_now() + .unwrap(); + let released_tcs = self + .pus_11_handler + .scheduler_mut() + .release_telecommands_with_buffer( + releaser, + &mut self.sched_tc_pool, + &mut self.releaser_buf, + ) + .expect("releasing TCs failed"); + if released_tcs > 0 { + info!("{released_tcs} TC(s) released from scheduler"); + } + } + + pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { + match self + .pus_11_handler + .poll_and_handle_next_tc(time_stamp, &mut self.sched_tc_pool) + { + Ok(result) => match result { + PusPacketHandlerResult::RequestHandled => {} + PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { + warn!("PUS11 partial packet handling success: {e:?}") + } + PusPacketHandlerResult::CustomSubservice(invalid, _) => { + warn!("PUS11 invalid subservice {invalid}"); + } + PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { + warn!("PUS11: Subservice {subservice} not implemented"); + } + PusPacketHandlerResult::Empty => return HandlingStatus::Empty, + }, + Err(error) => { + error!("PUS packet handling error: {error:?}") + } + } + HandlingStatus::HandledOne + } +} + +pub fn create_scheduler_service( + tm_funnel_tx: mpsc::Sender, + tc_source_sender: mpsc::Sender, + pus_sched_rx: mpsc::Receiver, + sched_tc_pool: StaticMemoryPool, +) -> SchedulingService { + let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) + .expect("Creating PUS Scheduler failed"); + let pus_11_handler = PusSchedServiceHandler::new( + PusServiceHelper::new( + PUS_SCHEDULER_SERVICE.id(), + pus_sched_rx, + tm_funnel_tx, + create_verification_reporter(PUS_SCHEDULER_SERVICE.id(), PUS_SCHEDULER_SERVICE.apid), + EcssTcInVecConverter::default(), + ), + scheduler, + ); + SchedulingService { + pus_11_handler, + sched_tc_pool, + releaser_buf: [0; 4096], + tc_releaser: Box::new(tc_source_sender), + } +} diff --git a/src/pus/stack.rs b/src/pus/stack.rs index 0699046..7e70ac6 100644 --- a/src/pus/stack.rs +++ b/src/pus/stack.rs @@ -3,7 +3,7 @@ use crate::pus::HandlingStatus; use derive_new::new; use satrs::spacepackets::time::{cds, TimeWriter}; -use super::{action::ActionServiceWrapper, TargetedPusService}; +use super::{action::ActionServiceWrapper, scheduler::SchedulingService, TargetedPusService}; // use super::{ // action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper, @@ -17,7 +17,7 @@ pub struct PusStack { // hk_srv_wrapper: HkServiceWrapper, // event_srv: EventServiceWrapper, action_srv_wrapper: ActionServiceWrapper, - // schedule_srv: SchedulingServiceWrapper, + schedule_srv: SchedulingService, // mode_srv: ModeServiceWrapper, } @@ -25,7 +25,7 @@ impl PusStack { pub fn periodic_operation(&mut self) { // Release all telecommands which reached their release time before calling the service // handlers. - // self.schedule_srv.release_tcs(); + self.schedule_srv.release_tcs(); let time_stamp = cds::CdsTime::now_with_u16_days() .expect("time stamp generation error") .to_vec() @@ -48,7 +48,11 @@ impl PusStack { self.test_srv.poll_and_handle_next_packet(&time_stamp), None, ); - // is_srv_finished(self.schedule_srv.poll_and_handle_next_tc(&time_stamp), None); + is_srv_finished( + 11, + self.schedule_srv.poll_and_handle_next_tc(&time_stamp), + None, + ); // is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None); is_srv_finished( 8, From e3ad841d04bf5d52fa7d8d1bda29504af180d03d Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 23 Apr 2024 16:49:06 +0200 Subject: [PATCH 3/3] added HK and mode service --- src/config.rs | 30 +++ src/main.rs | 45 ++-- src/pus/hk.rs | 532 +++++++++++++++++++++++++++++++++++++++++++++++ src/pus/mod.rs | 14 +- src/pus/mode.rs | 402 +++++++++++++++++++++++++++++++++++ src/pus/stack.rs | 37 ++-- 6 files changed, 1010 insertions(+), 50 deletions(-) create mode 100644 src/pus/hk.rs create mode 100644 src/pus/mode.rs diff --git a/src/config.rs b/src/config.rs index 121b683..fad0b9d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -172,6 +172,36 @@ pub mod action_err { pub const ACTION_RESULTS: &[ResultU16Info] = &[INVALID_ACTION_ID_EXT]; } + +pub mod hk_err { + use super::*; + use satrs::res_code::ResultU16; + + #[resultcode] + pub const TARGET_ID_MISSING: ResultU16 = ResultU16::new(GroupId::Hk as u8, 0); + #[resultcode] + pub const UNIQUE_ID_MISSING: ResultU16 = ResultU16::new(GroupId::Hk as u8, 1); + #[resultcode] + pub const UNKNOWN_TARGET_ID: ResultU16 = ResultU16::new(GroupId::Hk as u8, 2); + #[resultcode] + pub const COLLECTION_INTERVAL_MISSING: ResultU16 = ResultU16::new(GroupId::Hk as u8, 3); + + pub const HK_ERR_RESULTS: &[ResultU16Info] = &[ + TARGET_ID_MISSING_EXT, + UNKNOWN_TARGET_ID_EXT, + UNKNOWN_TARGET_ID_EXT, + COLLECTION_INTERVAL_MISSING_EXT, + ]; +} + +pub mod mode_err { + use super::*; + use satrs::res_code::ResultU16; + + #[resultcode] + pub const WRONG_MODE: ResultU16 = ResultU16::new(GroupId::Mode as u8, 0); +} + pub mod pool { use satrs::pool::{StaticMemoryPool, StaticPoolConfig}; diff --git a/src/main.rs b/src/main.rs index 22d024c..f3faaab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,10 @@ use ops_sat_rs::config::{ use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}; -use crate::pus::{scheduler::create_scheduler_service, PusTcDistributor, PusTcMpscRouter}; +use crate::pus::{ + hk::create_hk_service, mode::create_mode_service, scheduler::create_scheduler_service, + PusTcDistributor, PusTcMpscRouter, +}; use crate::tmtc::tm_sink::TmFunnelDynamic; use crate::{controller::ExperimentController, pus::test::create_test_service}; use crate::{ @@ -55,13 +58,13 @@ fn main() { let (pus_test_tx, pus_test_rx) = mpsc::channel(); // let (pus_event_tx, pus_event_rx) = mpsc::channel(); let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); - // let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); + let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); let (pus_action_tx, pus_action_rx) = mpsc::channel(); - // let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); + let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); let (pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); - // let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); - // let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel(); + let (_pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); + let (_pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel(); let (controller_composite_tx, controller_composite_rx) = mpsc::channel(); // let (controller_action_reply_tx, controller_action_reply_rx) = mpsc::channel(); @@ -75,9 +78,9 @@ fn main() { test_tc_sender: pus_test_tx, // event_tc_sender: pus_event_tx, sched_tc_sender: pus_sched_tx, - // hk_tc_sender: pus_hk_tx, + hk_tc_sender: pus_hk_tx, action_tc_sender: pus_action_tx, - // mode_tc_sender: pus_mode_tx, + mode_tc_sender: pus_mode_tx, }; let pus_test_service = create_test_service( @@ -100,25 +103,25 @@ fn main() { request_map.clone(), pus_action_reply_rx, ); - // let pus_hk_service = create_hk_service_dynamic( - // tm_funnel_tx.clone(), - // pus_hk_rx, - // request_map.clone(), - // pus_hk_reply_rx, - // ); - // let pus_mode_service = create_mode_service_dynamic( - // tm_funnel_tx.clone(), - // pus_mode_rx, - // request_map, - // pus_mode_reply_rx, - // ); + let pus_hk_service = create_hk_service( + tm_funnel_tx.clone(), + pus_hk_rx, + request_map.clone(), + pus_hk_reply_rx, + ); + let pus_mode_service = create_mode_service( + tm_funnel_tx.clone(), + pus_mode_rx, + request_map, + pus_mode_reply_rx, + ); let mut pus_stack = PusStack::new( pus_test_service, - // pus_hk_service, + pus_hk_service, // pus_event_service, pus_action_service, pus_scheduler_service, - // pus_mode_service, + pus_mode_service, ); let mut tmtc_task = TcSourceTaskDynamic::new( diff --git a/src/pus/hk.rs b/src/pus/hk.rs new file mode 100644 index 0000000..1f156a3 --- /dev/null +++ b/src/pus/hk.rs @@ -0,0 +1,532 @@ +use derive_new::new; +use log::{error, warn}; +use ops_sat_rs::config::components::PUS_HK_SERVICE; +use ops_sat_rs::config::{hk_err, tmtc_err}; +use satrs::hk::{CollectionIntervalFactor, HkRequest, HkRequestVariant, UniqueId}; +use satrs::pus::verification::{ + FailParams, TcStateAccepted, TcStateStarted, VerificationReporter, + VerificationReportingProvider, VerificationToken, +}; +use satrs::pus::{ + ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken, + EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, + PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter, +}; +use satrs::request::{GenericMessage, UniqueApidTargetId}; +use satrs::spacepackets::ecss::tc::PusTcReader; +use satrs::spacepackets::ecss::{hk, PusPacket}; +use satrs::tmtc::PacketAsVec; +use std::sync::mpsc; +use std::time::Duration; + +use crate::pus::{create_verification_reporter, generic_pus_request_timeout_handler}; +use crate::requests::GenericRequestRouter; + +use super::{HandlingStatus, PusTargetedRequestService}; + +#[derive(Clone, PartialEq, Debug, new)] +pub struct HkReply { + pub unique_id: UniqueId, + pub variant: HkReplyVariant, +} + +#[derive(Clone, PartialEq, Debug)] +#[allow(dead_code)] +pub enum HkReplyVariant { + Ack, +} + +#[derive(Default)] +pub struct HkReplyHandler {} + +impl PusReplyHandler for HkReplyHandler { + type Error = EcssTmtcError; + + fn handle_unrequested_reply( + &mut self, + reply: &GenericMessage, + _tm_sender: &impl EcssTmSender, + ) -> Result<(), Self::Error> { + log::warn!("received unexpected reply for service 3: {reply:?}"); + Ok(()) + } + + fn handle_reply( + &mut self, + reply: &GenericMessage, + active_request: &ActivePusRequestStd, + tm_sender: &impl EcssTmSender, + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result { + let started_token: VerificationToken = active_request + .token() + .try_into() + .expect("invalid token state"); + match reply.message.variant { + HkReplyVariant::Ack => { + verification_handler + .completion_success(tm_sender, started_token, time_stamp) + .expect("sending completion success verification failed"); + } + }; + Ok(true) + } + + fn handle_request_timeout( + &mut self, + active_request: &ActivePusRequestStd, + tm_sender: &impl EcssTmSender, + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result<(), Self::Error> { + generic_pus_request_timeout_handler( + tm_sender, + active_request, + verification_handler, + time_stamp, + "HK", + )?; + Ok(()) + } +} + +pub struct HkRequestConverter { + timeout: Duration, +} + +impl Default for HkRequestConverter { + fn default() -> Self { + Self { + timeout: Duration::from_secs(60), + } + } +} + +impl PusTcToRequestConverter for HkRequestConverter { + type Error = GenericConversionError; + + fn convert( + &mut self, + token: VerificationToken, + tc: &PusTcReader, + tm_sender: &(impl EcssTmSender + ?Sized), + verif_reporter: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result<(ActivePusRequestStd, HkRequest), Self::Error> { + let user_data = tc.user_data(); + if user_data.is_empty() { + let user_data_len = user_data.len() as u32; + let user_data_len_raw = user_data_len.to_be_bytes(); + verif_reporter + .start_failure( + tm_sender, + token, + FailParams::new( + time_stamp, + &tmtc_err::NOT_ENOUGH_APP_DATA, + &user_data_len_raw, + ), + ) + .expect("Sending start failure TM failed"); + return Err(GenericConversionError::NotEnoughAppData { + expected: 4, + found: 0, + }); + } + if user_data.len() < 8 { + let err = if user_data.len() < 4 { + &hk_err::TARGET_ID_MISSING + } else { + &hk_err::UNIQUE_ID_MISSING + }; + let user_data_len = user_data.len() as u32; + let user_data_len_raw = user_data_len.to_be_bytes(); + verif_reporter + .start_failure( + tm_sender, + token, + FailParams::new(time_stamp, err, &user_data_len_raw), + ) + .expect("Sending start failure TM failed"); + return Err(GenericConversionError::NotEnoughAppData { + expected: 8, + found: 4, + }); + } + let subservice = tc.subservice(); + let target_id_and_apid = UniqueApidTargetId::from_pus_tc(tc).expect("invalid tc format"); + let unique_id = u32::from_be_bytes(tc.user_data()[4..8].try_into().unwrap()); + + let standard_subservice = hk::Subservice::try_from(subservice); + if standard_subservice.is_err() { + verif_reporter + .start_failure( + tm_sender, + token, + FailParams::new(time_stamp, &tmtc_err::INVALID_PUS_SUBSERVICE, &[subservice]), + ) + .expect("Sending start failure TM failed"); + return Err(GenericConversionError::InvalidSubservice(subservice)); + } + let request = match standard_subservice.unwrap() { + hk::Subservice::TcEnableHkGeneration | hk::Subservice::TcEnableDiagGeneration => { + HkRequest::new(unique_id, HkRequestVariant::EnablePeriodic) + } + hk::Subservice::TcDisableHkGeneration | hk::Subservice::TcDisableDiagGeneration => { + HkRequest::new(unique_id, HkRequestVariant::DisablePeriodic) + } + hk::Subservice::TcReportHkReportStructures => todo!(), + hk::Subservice::TmHkPacket => todo!(), + hk::Subservice::TcGenerateOneShotHk | hk::Subservice::TcGenerateOneShotDiag => { + HkRequest::new(unique_id, HkRequestVariant::OneShot) + } + hk::Subservice::TcModifyDiagCollectionInterval + | hk::Subservice::TcModifyHkCollectionInterval => { + if user_data.len() < 12 { + verif_reporter + .start_failure( + tm_sender, + token, + FailParams::new_no_fail_data( + time_stamp, + &tmtc_err::NOT_ENOUGH_APP_DATA, + ), + ) + .expect("Sending start failure TM failed"); + return Err(GenericConversionError::NotEnoughAppData { + expected: 12, + found: user_data.len(), + }); + } + HkRequest::new( + unique_id, + HkRequestVariant::ModifyCollectionInterval( + CollectionIntervalFactor::from_be_bytes( + user_data[8..12].try_into().unwrap(), + ), + ), + ) + } + _ => { + verif_reporter + .start_failure( + tm_sender, + token, + FailParams::new( + time_stamp, + &tmtc_err::PUS_SUBSERVICE_NOT_IMPLEMENTED, + &[subservice], + ), + ) + .expect("Sending start failure TM failed"); + return Err(GenericConversionError::InvalidSubservice(subservice)); + } + }; + Ok(( + ActivePusRequestStd::new(target_id_and_apid.into(), token, self.timeout), + request, + )) + } +} + +pub fn create_hk_service( + tm_funnel_tx: mpsc::Sender, + pus_hk_rx: mpsc::Receiver, + request_router: GenericRequestRouter, + reply_receiver: mpsc::Receiver>, +) -> HkServiceWrapper { + let pus_3_handler = PusTargetedRequestService::new( + PusServiceHelper::new( + PUS_HK_SERVICE.id(), + pus_hk_rx, + tm_funnel_tx, + create_verification_reporter(PUS_HK_SERVICE.id(), PUS_HK_SERVICE.apid), + EcssTcInVecConverter::default(), + ), + HkRequestConverter::default(), + DefaultActiveRequestMap::default(), + HkReplyHandler::default(), + request_router, + reply_receiver, + ); + HkServiceWrapper { + service: pus_3_handler, + } +} + +pub struct HkServiceWrapper { + pub(crate) service: PusTargetedRequestService< + VerificationReporter, + HkRequestConverter, + HkReplyHandler, + DefaultActiveRequestMap, + ActivePusRequestStd, + HkRequest, + HkReply, + >, +} + +impl HkServiceWrapper { + pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { + match self.service.poll_and_handle_next_tc(time_stamp) { + Ok(result) => match result { + PusPacketHandlerResult::RequestHandled => {} + PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { + warn!("PUS 3 partial packet handling success: {e:?}") + } + PusPacketHandlerResult::CustomSubservice(invalid, _) => { + warn!("PUS 3 invalid subservice {invalid}"); + } + PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { + warn!("PUS 3 subservice {subservice} not implemented"); + } + PusPacketHandlerResult::Empty => return HandlingStatus::Empty, + }, + Err(error) => { + error!("PUS packet handling error: {error:?}"); + // To avoid permanent loops on error cases. + return HandlingStatus::Empty; + } + } + HandlingStatus::HandledOne + } + + pub fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus { + // This only fails if all senders disconnected. Treat it like an empty queue. + self.service + .poll_and_check_next_reply(time_stamp) + .unwrap_or_else(|e| { + warn!("PUS 3: Handling reply failed with error {e:?}"); + HandlingStatus::Empty + }) + } + + pub fn check_for_request_timeouts(&mut self) { + self.service.check_for_request_timeouts(); + } +} + +#[cfg(test)] +mod tests { + use ops_sat_rs::config::tmtc_err; + use satrs::pus::test_util::{ + TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1, + }; + use satrs::request::MessageMetadata; + use satrs::{ + hk::HkRequestVariant, + pus::test_util::TEST_APID, + request::GenericMessage, + spacepackets::{ + ecss::{hk::Subservice, tc::PusTcCreator}, + SpHeader, + }, + }; + + use crate::pus::{ + hk::HkReplyVariant, + tests::{PusConverterTestbench, ReplyHandlerTestbench}, + }; + + use super::{HkReply, HkReplyHandler, HkRequestConverter}; + + #[test] + fn hk_converter_one_shot_req() { + let mut hk_bench = + PusConverterTestbench::new(TEST_COMPONENT_ID_0.id(), HkRequestConverter::default()); + let sp_header = SpHeader::new_for_unseg_tc(TEST_APID, 0, 0); + let target_id = TEST_UNIQUE_ID_0; + let unique_id = 5_u32; + let mut app_data: [u8; 8] = [0; 8]; + app_data[0..4].copy_from_slice(&target_id.to_be_bytes()); + app_data[4..8].copy_from_slice(&unique_id.to_be_bytes()); + + let hk_req = PusTcCreator::new_simple( + sp_header, + 3, + Subservice::TcGenerateOneShotHk as u8, + &app_data, + true, + ); + let accepted_token = hk_bench.add_tc(&hk_req); + let (_active_req, req) = hk_bench + .convert(accepted_token, &[], TEST_APID, TEST_UNIQUE_ID_0) + .expect("conversion failed"); + + assert_eq!(req.unique_id, unique_id); + if let HkRequestVariant::OneShot = req.variant { + } else { + panic!("unexpected HK request") + } + } + + #[test] + fn hk_converter_enable_periodic_generation() { + let mut hk_bench = + PusConverterTestbench::new(TEST_COMPONENT_ID_0.id(), HkRequestConverter::default()); + let sp_header = SpHeader::new_for_unseg_tc(TEST_APID, 0, 0); + let target_id = TEST_UNIQUE_ID_0; + let unique_id = 5_u32; + let mut app_data: [u8; 8] = [0; 8]; + app_data[0..4].copy_from_slice(&target_id.to_be_bytes()); + app_data[4..8].copy_from_slice(&unique_id.to_be_bytes()); + let mut generic_check = |tc: &PusTcCreator| { + let accepted_token = hk_bench.add_tc(tc); + let (_active_req, req) = hk_bench + .convert(accepted_token, &[], TEST_APID, TEST_UNIQUE_ID_0) + .expect("conversion failed"); + assert_eq!(req.unique_id, unique_id); + if let HkRequestVariant::EnablePeriodic = req.variant { + } else { + panic!("unexpected HK request") + } + }; + let tc0 = PusTcCreator::new_simple( + sp_header, + 3, + Subservice::TcEnableHkGeneration as u8, + &app_data, + true, + ); + generic_check(&tc0); + let tc1 = PusTcCreator::new_simple( + sp_header, + 3, + Subservice::TcEnableDiagGeneration as u8, + &app_data, + true, + ); + generic_check(&tc1); + } + + #[test] + fn hk_conversion_disable_periodic_generation() { + let mut hk_bench = + PusConverterTestbench::new(TEST_COMPONENT_ID_0.id(), HkRequestConverter::default()); + let sp_header = SpHeader::new_for_unseg_tc(TEST_APID, 0, 0); + let target_id = TEST_UNIQUE_ID_0; + let unique_id = 5_u32; + let mut app_data: [u8; 8] = [0; 8]; + app_data[0..4].copy_from_slice(&target_id.to_be_bytes()); + app_data[4..8].copy_from_slice(&unique_id.to_be_bytes()); + let mut generic_check = |tc: &PusTcCreator| { + let accepted_token = hk_bench.add_tc(tc); + let (_active_req, req) = hk_bench + .convert(accepted_token, &[], TEST_APID, TEST_UNIQUE_ID_0) + .expect("conversion failed"); + assert_eq!(req.unique_id, unique_id); + if let HkRequestVariant::DisablePeriodic = req.variant { + } else { + panic!("unexpected HK request") + } + }; + let tc0 = PusTcCreator::new_simple( + sp_header, + 3, + Subservice::TcDisableHkGeneration as u8, + &app_data, + true, + ); + generic_check(&tc0); + let tc1 = PusTcCreator::new_simple( + sp_header, + 3, + Subservice::TcDisableDiagGeneration as u8, + &app_data, + true, + ); + generic_check(&tc1); + } + + #[test] + fn hk_conversion_modify_interval() { + let mut hk_bench = + PusConverterTestbench::new(TEST_COMPONENT_ID_0.id(), HkRequestConverter::default()); + let sp_header = SpHeader::new_for_unseg_tc(TEST_APID, 0, 0); + let target_id = TEST_UNIQUE_ID_0; + let unique_id = 5_u32; + let mut app_data: [u8; 12] = [0; 12]; + let collection_interval_factor = 5_u32; + app_data[0..4].copy_from_slice(&target_id.to_be_bytes()); + app_data[4..8].copy_from_slice(&unique_id.to_be_bytes()); + app_data[8..12].copy_from_slice(&collection_interval_factor.to_be_bytes()); + + let mut generic_check = |tc: &PusTcCreator| { + let accepted_token = hk_bench.add_tc(tc); + let (_active_req, req) = hk_bench + .convert(accepted_token, &[], TEST_APID, TEST_UNIQUE_ID_0) + .expect("conversion failed"); + assert_eq!(req.unique_id, unique_id); + if let HkRequestVariant::ModifyCollectionInterval(interval_factor) = req.variant { + assert_eq!(interval_factor, collection_interval_factor); + } else { + panic!("unexpected HK request") + } + }; + let tc0 = PusTcCreator::new_simple( + sp_header, + 3, + Subservice::TcModifyHkCollectionInterval as u8, + &app_data, + true, + ); + generic_check(&tc0); + let tc1 = PusTcCreator::new_simple( + sp_header, + 3, + Subservice::TcModifyDiagCollectionInterval as u8, + &app_data, + true, + ); + generic_check(&tc1); + } + + #[test] + fn hk_reply_handler() { + let mut reply_testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), HkReplyHandler::default()); + let sender_id = 2_u64; + let apid_target_id = 3_u32; + let unique_id = 5_u32; + let (req_id, active_req) = reply_testbench.add_tc(TEST_APID, apid_target_id, &[]); + let reply = GenericMessage::new( + MessageMetadata::new(req_id.into(), sender_id), + HkReply::new(unique_id, HkReplyVariant::Ack), + ); + let result = reply_testbench.handle_reply(&reply, &active_req, &[]); + assert!(result.is_ok()); + assert!(result.unwrap()); + reply_testbench + .verif_reporter + .assert_full_completion_success(TEST_COMPONENT_ID_0.raw(), req_id, None); + } + + #[test] + fn reply_handling_unrequested_reply() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_1.id(), HkReplyHandler::default()); + let action_reply = HkReply::new(5_u32, HkReplyVariant::Ack); + let unrequested_reply = + GenericMessage::new(MessageMetadata::new(10_u32, 15_u64), action_reply); + // Right now this function does not do a lot. We simply check that it does not panic or do + // weird stuff. + let result = testbench.handle_unrequested_reply(&unrequested_reply); + assert!(result.is_ok()); + } + + #[test] + fn reply_handling_reply_timeout() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_1.id(), HkReplyHandler::default()); + let (req_id, active_request) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_1, &[]); + let result = testbench.handle_request_timeout(&active_request, &[]); + assert!(result.is_ok()); + testbench.verif_reporter.assert_completion_failure( + TEST_COMPONENT_ID_1.raw(), + req_id, + None, + tmtc_err::REQUEST_TIMEOUT.raw() as u64, + ); + } +} diff --git a/src/pus/mod.rs b/src/pus/mod.rs index 791a97e..268ead0 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -1,4 +1,6 @@ pub mod action; +pub mod hk; +pub mod mode; pub mod scheduler; pub mod stack; pub mod test; @@ -27,14 +29,6 @@ use satrs::ComponentId; use std::fmt::Debug; use std::sync::mpsc::{self, Sender}; -// pub mod action; -// pub mod event; -// pub mod hk; -// pub mod mode; -// pub mod scheduler; -// pub mod stack; -// pub mod test; - #[derive(Debug, PartialEq, Eq, Copy, Clone)] #[allow(dead_code)] pub enum HandlingStatus { @@ -54,9 +48,9 @@ pub struct PusTcMpscRouter { pub test_tc_sender: Sender, // pub event_tc_sender: Sender, pub sched_tc_sender: Sender, - // pub hk_tc_sender: Sender, + pub hk_tc_sender: Sender, pub action_tc_sender: Sender, - // pub mode_tc_sender: Sender, + pub mode_tc_sender: Sender, } pub struct PusTcDistributor { diff --git a/src/pus/mode.rs b/src/pus/mode.rs new file mode 100644 index 0000000..b6fb569 --- /dev/null +++ b/src/pus/mode.rs @@ -0,0 +1,402 @@ +use derive_new::new; +use log::{error, warn}; +use satrs::tmtc::PacketAsVec; +use std::sync::mpsc; +use std::time::Duration; + +use crate::requests::GenericRequestRouter; +use ops_sat_rs::config::components::PUS_MODE_SERVICE; +use ops_sat_rs::config::{mode_err, tmtc_err}; +use satrs::pus::verification::VerificationReporter; +use satrs::pus::{ + DefaultActiveRequestMap, EcssTcAndToken, EcssTcInVecConverter, PusPacketHandlerResult, + PusServiceHelper, +}; +use satrs::request::GenericMessage; +use satrs::{ + mode::{ModeAndSubmode, ModeReply, ModeRequest}, + pus::{ + mode::Subservice, + verification::{ + self, FailParams, TcStateAccepted, TcStateStarted, VerificationReportingProvider, + VerificationToken, + }, + ActivePusRequestStd, ActiveRequestProvider, EcssTmSender, EcssTmtcError, + GenericConversionError, PusReplyHandler, PusTcToRequestConverter, PusTmVariant, + }, + request::UniqueApidTargetId, + spacepackets::{ + ecss::{ + tc::PusTcReader, + tm::{PusTmCreator, PusTmSecondaryHeader}, + PusPacket, + }, + SpHeader, + }, + ComponentId, +}; + +use super::{ + create_verification_reporter, generic_pus_request_timeout_handler, HandlingStatus, + PusTargetedRequestService, TargetedPusService, +}; + +#[derive(new)] +pub struct ModeReplyHandler { + owner_id: ComponentId, +} + +impl PusReplyHandler for ModeReplyHandler { + type Error = EcssTmtcError; + + fn handle_unrequested_reply( + &mut self, + reply: &GenericMessage, + _tm_sender: &impl EcssTmSender, + ) -> Result<(), Self::Error> { + log::warn!("received unexpected reply for mode service 5: {reply:?}"); + Ok(()) + } + + fn handle_reply( + &mut self, + reply: &GenericMessage, + active_request: &ActivePusRequestStd, + tm_sender: &impl EcssTmSender, + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result { + let started_token: VerificationToken = active_request + .token() + .try_into() + .expect("invalid token state"); + match reply.message { + ModeReply::ModeReply(mode_reply) => { + let mut source_data: [u8; 12] = [0; 12]; + mode_reply + .write_to_be_bytes(&mut source_data) + .expect("writing mode reply failed"); + let req_id = verification::RequestId::from(reply.request_id()); + let sp_header = SpHeader::new_for_unseg_tm(req_id.packet_id().apid(), 0, 0); + let sec_header = + PusTmSecondaryHeader::new(200, Subservice::TmModeReply as u8, 0, 0, time_stamp); + let pus_tm = PusTmCreator::new(sp_header, sec_header, &source_data, true); + tm_sender.send_tm(self.owner_id, PusTmVariant::Direct(pus_tm))?; + verification_handler.completion_success(tm_sender, started_token, time_stamp)?; + } + ModeReply::CantReachMode(error_code) => { + verification_handler.completion_failure( + tm_sender, + started_token, + FailParams::new(time_stamp, &error_code, &[]), + )?; + } + ModeReply::WrongMode { expected, reached } => { + let mut error_info: [u8; 24] = [0; 24]; + let mut written_len = expected + .write_to_be_bytes(&mut error_info[0..ModeAndSubmode::RAW_LEN]) + .expect("writing expected mode failed"); + written_len += reached + .write_to_be_bytes(&mut error_info[ModeAndSubmode::RAW_LEN..]) + .expect("writing reached mode failed"); + verification_handler.completion_failure( + tm_sender, + started_token, + FailParams::new( + time_stamp, + &mode_err::WRONG_MODE, + &error_info[..written_len], + ), + )?; + } + }; + Ok(true) + } + + fn handle_request_timeout( + &mut self, + active_request: &ActivePusRequestStd, + tm_sender: &impl EcssTmSender, + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result<(), Self::Error> { + generic_pus_request_timeout_handler( + tm_sender, + active_request, + verification_handler, + time_stamp, + "HK", + )?; + Ok(()) + } +} + +#[derive(Default)] +pub struct ModeRequestConverter {} + +impl PusTcToRequestConverter for ModeRequestConverter { + type Error = GenericConversionError; + + fn convert( + &mut self, + token: VerificationToken, + tc: &PusTcReader, + tm_sender: &(impl EcssTmSender + ?Sized), + verif_reporter: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result<(ActivePusRequestStd, ModeRequest), Self::Error> { + let subservice = tc.subservice(); + let user_data = tc.user_data(); + let not_enough_app_data = |expected: usize| { + verif_reporter + .start_failure( + tm_sender, + token, + FailParams::new_no_fail_data(time_stamp, &tmtc_err::NOT_ENOUGH_APP_DATA), + ) + .expect("Sending start failure failed"); + Err(GenericConversionError::NotEnoughAppData { + expected, + found: user_data.len(), + }) + }; + if user_data.len() < core::mem::size_of::() { + return not_enough_app_data(4); + } + let target_id_and_apid = UniqueApidTargetId::from_pus_tc(tc).unwrap(); + let active_request = + ActivePusRequestStd::new(target_id_and_apid.into(), token, Duration::from_secs(30)); + let subservice_typed = Subservice::try_from(subservice); + let invalid_subservice = || { + // Invalid subservice + verif_reporter + .start_failure( + tm_sender, + token, + FailParams::new_no_fail_data(time_stamp, &tmtc_err::INVALID_PUS_SUBSERVICE), + ) + .expect("Sending start failure failed"); + Err(GenericConversionError::InvalidSubservice(subservice)) + }; + if subservice_typed.is_err() { + return invalid_subservice(); + } + let subservice_typed = subservice_typed.unwrap(); + match subservice_typed { + Subservice::TcSetMode => { + if user_data.len() < core::mem::size_of::() + ModeAndSubmode::RAW_LEN { + return not_enough_app_data(4 + ModeAndSubmode::RAW_LEN); + } + let mode_and_submode = ModeAndSubmode::from_be_bytes(&tc.user_data()[4..]) + .expect("mode and submode extraction failed"); + Ok((active_request, ModeRequest::SetMode(mode_and_submode))) + } + Subservice::TcReadMode => Ok((active_request, ModeRequest::ReadMode)), + Subservice::TcAnnounceMode => Ok((active_request, ModeRequest::AnnounceMode)), + Subservice::TcAnnounceModeRecursive => { + Ok((active_request, ModeRequest::AnnounceModeRecursive)) + } + _ => invalid_subservice(), + } + } +} + +pub fn create_mode_service( + tm_funnel_tx: mpsc::Sender, + pus_action_rx: mpsc::Receiver, + mode_router: GenericRequestRouter, + reply_receiver: mpsc::Receiver>, +) -> ModeServiceWrapper { + let mode_request_handler = PusTargetedRequestService::new( + PusServiceHelper::new( + PUS_MODE_SERVICE.id(), + pus_action_rx, + tm_funnel_tx, + create_verification_reporter(PUS_MODE_SERVICE.id(), PUS_MODE_SERVICE.apid), + EcssTcInVecConverter::default(), + ), + ModeRequestConverter::default(), + DefaultActiveRequestMap::default(), + ModeReplyHandler::new(PUS_MODE_SERVICE.id()), + mode_router, + reply_receiver, + ); + ModeServiceWrapper { + service: mode_request_handler, + } +} + +pub struct ModeServiceWrapper { + pub(crate) service: PusTargetedRequestService< + VerificationReporter, + ModeRequestConverter, + ModeReplyHandler, + DefaultActiveRequestMap, + ActivePusRequestStd, + ModeRequest, + ModeReply, + >, +} + +impl TargetedPusService for ModeServiceWrapper { + /// Returns [true] if the packet handling is finished. + fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { + match self.service.poll_and_handle_next_tc(time_stamp) { + Ok(result) => match result { + PusPacketHandlerResult::RequestHandled => {} + PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { + warn!("PUS mode service: partial packet handling success: {e:?}") + } + PusPacketHandlerResult::CustomSubservice(invalid, _) => { + warn!("PUS mode service: invalid subservice {invalid}"); + } + PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { + warn!("PUS mode service: {subservice} not implemented"); + } + PusPacketHandlerResult::Empty => return HandlingStatus::Empty, + }, + Err(error) => { + error!("PUS mode service: packet handling error: {error:?}"); + // To avoid permanent loops on error cases. + return HandlingStatus::Empty; + } + } + HandlingStatus::HandledOne + } + + fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus { + self.service + .poll_and_check_next_reply(time_stamp) + .unwrap_or_else(|e| { + warn!("PUS action service: Handling reply failed with error {e:?}"); + HandlingStatus::HandledOne + }) + } + + fn check_for_request_timeouts(&mut self) { + self.service.check_for_request_timeouts(); + } +} +#[cfg(test)] +mod tests { + use ops_sat_rs::config::tmtc_err; + use satrs::pus::test_util::{TEST_APID, TEST_COMPONENT_ID_0, TEST_UNIQUE_ID_0}; + use satrs::request::MessageMetadata; + use satrs::{ + mode::{ModeAndSubmode, ModeReply, ModeRequest}, + pus::mode::Subservice, + request::GenericMessage, + spacepackets::{ + ecss::tc::{PusTcCreator, PusTcSecondaryHeader}, + SpHeader, + }, + }; + + use crate::pus::{ + mode::ModeReplyHandler, + tests::{PusConverterTestbench, ReplyHandlerTestbench}, + }; + + use super::ModeRequestConverter; + + #[test] + fn mode_converter_read_mode_request() { + let mut testbench = + PusConverterTestbench::new(TEST_COMPONENT_ID_0.id(), ModeRequestConverter::default()); + let sp_header = SpHeader::new_for_unseg_tc(TEST_APID, 0, 0); + let sec_header = PusTcSecondaryHeader::new_simple(200, Subservice::TcReadMode as u8); + let mut app_data: [u8; 4] = [0; 4]; + app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_0.to_be_bytes()); + let tc = PusTcCreator::new(sp_header, sec_header, &app_data, true); + let token = testbench.add_tc(&tc); + let (_active_req, req) = testbench + .convert(token, &[], TEST_APID, TEST_UNIQUE_ID_0) + .expect("conversion has failed"); + assert_eq!(req, ModeRequest::ReadMode); + } + + #[test] + fn mode_converter_set_mode_request() { + let mut testbench = + PusConverterTestbench::new(TEST_COMPONENT_ID_0.id(), ModeRequestConverter::default()); + let sp_header = SpHeader::new_for_unseg_tc(TEST_APID, 0, 0); + let sec_header = PusTcSecondaryHeader::new_simple(200, Subservice::TcSetMode as u8); + let mut app_data: [u8; 4 + ModeAndSubmode::RAW_LEN] = [0; 4 + ModeAndSubmode::RAW_LEN]; + let mode_and_submode = ModeAndSubmode::new(2, 1); + app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_0.to_be_bytes()); + mode_and_submode + .write_to_be_bytes(&mut app_data[4..]) + .unwrap(); + let tc = PusTcCreator::new(sp_header, sec_header, &app_data, true); + let token = testbench.add_tc(&tc); + let (_active_req, req) = testbench + .convert(token, &[], TEST_APID, TEST_UNIQUE_ID_0) + .expect("conversion has failed"); + assert_eq!(req, ModeRequest::SetMode(mode_and_submode)); + } + + #[test] + fn mode_converter_announce_mode() { + let mut testbench = + PusConverterTestbench::new(TEST_COMPONENT_ID_0.id(), ModeRequestConverter::default()); + let sp_header = SpHeader::new_for_unseg_tc(TEST_APID, 0, 0); + let sec_header = PusTcSecondaryHeader::new_simple(200, Subservice::TcAnnounceMode as u8); + let mut app_data: [u8; 4] = [0; 4]; + app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_0.to_be_bytes()); + let tc = PusTcCreator::new(sp_header, sec_header, &app_data, true); + let token = testbench.add_tc(&tc); + let (_active_req, req) = testbench + .convert(token, &[], TEST_APID, TEST_UNIQUE_ID_0) + .expect("conversion has failed"); + assert_eq!(req, ModeRequest::AnnounceMode); + } + + #[test] + fn mode_converter_announce_mode_recursively() { + let mut testbench = + PusConverterTestbench::new(TEST_COMPONENT_ID_0.id(), ModeRequestConverter::default()); + let sp_header = SpHeader::new_for_unseg_tc(TEST_APID, 0, 0); + let sec_header = + PusTcSecondaryHeader::new_simple(200, Subservice::TcAnnounceModeRecursive as u8); + let mut app_data: [u8; 4] = [0; 4]; + app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_0.to_be_bytes()); + let tc = PusTcCreator::new(sp_header, sec_header, &app_data, true); + let token = testbench.add_tc(&tc); + let (_active_req, req) = testbench + .convert(token, &[], TEST_APID, TEST_UNIQUE_ID_0) + .expect("conversion has failed"); + assert_eq!(req, ModeRequest::AnnounceModeRecursive); + } + + #[test] + fn reply_handling_unrequested_reply() { + let mut testbench = ReplyHandlerTestbench::new( + TEST_COMPONENT_ID_0.id(), + ModeReplyHandler::new(TEST_COMPONENT_ID_0.id()), + ); + let mode_reply = ModeReply::ModeReply(ModeAndSubmode::new(5, 1)); + let unrequested_reply = + GenericMessage::new(MessageMetadata::new(10_u32, 15_u64), mode_reply); + // Right now this function does not do a lot. We simply check that it does not panic or do + // weird stuff. + let result = testbench.handle_unrequested_reply(&unrequested_reply); + assert!(result.is_ok()); + } + + #[test] + fn reply_handling_reply_timeout() { + let mut testbench = ReplyHandlerTestbench::new( + TEST_COMPONENT_ID_0.id(), + ModeReplyHandler::new(TEST_COMPONENT_ID_0.id()), + ); + let (req_id, active_request) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let result = testbench.handle_request_timeout(&active_request, &[]); + assert!(result.is_ok()); + testbench.verif_reporter.assert_completion_failure( + TEST_COMPONENT_ID_0.raw(), + req_id, + None, + tmtc_err::REQUEST_TIMEOUT.raw() as u64, + ); + } +} diff --git a/src/pus/stack.rs b/src/pus/stack.rs index 7e70ac6..759a682 100644 --- a/src/pus/stack.rs +++ b/src/pus/stack.rs @@ -3,22 +3,19 @@ use crate::pus::HandlingStatus; use derive_new::new; use satrs::spacepackets::time::{cds, TimeWriter}; -use super::{action::ActionServiceWrapper, scheduler::SchedulingService, TargetedPusService}; - -// use super::{ -// action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper, -// scheduler::SchedulingServiceWrapper, test::TestCustomServiceWrapper, HandlingStatus, -// TargetedPusService, -// }; +use super::{ + action::ActionServiceWrapper, hk::HkServiceWrapper, mode::ModeServiceWrapper, + scheduler::SchedulingService, TargetedPusService, +}; #[derive(new)] pub struct PusStack { test_srv: TestCustomServiceWrapper, - // hk_srv_wrapper: HkServiceWrapper, + hk_srv_wrapper: HkServiceWrapper, // event_srv: EventServiceWrapper, action_srv_wrapper: ActionServiceWrapper, schedule_srv: SchedulingService, - // mode_srv: ModeServiceWrapper, + mode_srv: ModeServiceWrapper, } impl PusStack { @@ -62,19 +59,21 @@ impl PusStack { .poll_and_handle_next_reply(&time_stamp), ), ); - // is_srv_finished( - // self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp), - // Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)), - // ); - // is_srv_finished( - // self.mode_srv.poll_and_handle_next_tc(&time_stamp), - // Some(self.mode_srv.poll_and_handle_next_reply(&time_stamp)), - // ); + is_srv_finished( + 3, + self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp), + Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)), + ); + is_srv_finished( + 200, + self.mode_srv.poll_and_handle_next_tc(&time_stamp), + Some(self.mode_srv.poll_and_handle_next_reply(&time_stamp)), + ); if nothing_to_do { // Timeout checking is only done once. self.action_srv_wrapper.check_for_request_timeouts(); - // self.hk_srv_wrapper.check_for_request_timeouts(); - // self.mode_srv.check_for_request_timeouts(); + self.hk_srv_wrapper.check_for_request_timeouts(); + self.mode_srv.check_for_request_timeouts(); break; } }