From 66a18e08e5cb0e68e429b2ea39535edac002fa71 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 2 May 2024 12:32:57 +0200 Subject: [PATCH] bump sat-rs and improve PUS stack --- Cargo.lock | 26 ++++-- Cargo.toml | 1 + src/interface/udp_server.rs | 4 +- src/lib.rs | 6 -- src/pus/action.rs | 84 +++++++------------ src/pus/event.rs | 76 +++++++++++------- src/pus/hk.rs | 63 +++++---------- src/pus/mod.rs | 74 ++++++++++++++--- src/pus/mode.rs | 62 +++++--------- src/pus/scheduler.rs | 156 ++++++++++++++++-------------------- src/pus/stack.rs | 94 +++++++++++----------- src/pus/test.rs | 141 +++++++++++++++++--------------- src/tmtc/tc_source.rs | 6 +- 13 files changed, 397 insertions(+), 396 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63cc8de..b1b1c1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -248,6 +248,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "delegate" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e018fccbeeb50ff26562ece792ed06659b9c2dae79ece77c4456bb10d9bf79b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "derive-new" version = "0.6.0" @@ -631,6 +642,7 @@ name = "ops-sat-rs" version = "0.1.1" dependencies = [ "chrono", + "delegate 0.12.0", "derive-new", "env_logger", "fern", @@ -776,13 +788,13 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" version = "0.2.0-rc.5" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#29f71c2a571e7492cf5d997d4c11c3f844de83bc" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#9ffe4d0ae02064444da606b2cd1a3c0dd6858fbb" dependencies = [ "bus", "cobs", "crc", "crossbeam-channel", - "delegate", + "delegate 0.10.0", "derive-new", "downcast-rs", "dyn-clone", @@ -835,9 +847,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.199" +version = "1.0.200" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c9f6e76df036c77cd94996771fb40db98187f096dd0b9af39c6c6e452ba966a" +checksum = "ddc6f9cc94d67c0e21aaf7eda3a010fd3af78ebf6e096aa6e2e13c79749cce4f" dependencies = [ "serde_derive", ] @@ -855,9 +867,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.199" +version = "1.0.200" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11bd257a6541e141e42ca6d24ae26f7714887b47e89aa739099104c7e4d3b7fc" +checksum = "856f046b9400cee3c8c94ed572ecdb752444c24528c035cd35882aad6f492bcb" dependencies = [ "proc-macro2", "quote", @@ -926,7 +938,7 @@ checksum = "fa9f4d7df5fa3bc25ecfc95f1f612fc3d16c566df538d3d3c82db0e523096216" dependencies = [ "chrono", "crc", - "delegate", + "delegate 0.10.0", "num-traits", "num_enum", "serde", diff --git a/Cargo.toml b/Cargo.toml index 3f999f1..d2fb76b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ fern = "0.6" toml = "0.8" chrono = "0.4" log = "0.4" +delegate = "0.12" lazy_static = "1" humantime = "2" strum = { version = "0.26", features = ["derive"] } diff --git a/src/interface/udp_server.rs b/src/interface/udp_server.rs index 858ce27..0737d90 100644 --- a/src/interface/udp_server.rs +++ b/src/interface/udp_server.rs @@ -2,8 +2,8 @@ use std::net::{SocketAddr, UdpSocket}; use std::sync::mpsc; use log::{info, warn}; -use ops_sat_rs::HandlingStatus; use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer}; +use satrs::pus::HandlingStatus; use satrs::queue::GenericSendError; use satrs::tmtc::PacketAsVec; @@ -125,7 +125,7 @@ mod tests { let (tx, rx) = mpsc::channel(); let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap(); let tm_handler = TestTmHandler::default(); - let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); + let _tm_handler_calls = tm_handler.addrs_to_send_to.clone(); let mut udp_dyn_server = UdpTmtcServer { udp_tc_server, tm_handler, diff --git a/src/lib.rs b/src/lib.rs index 698da0b..2b6f688 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,12 +3,6 @@ use satrs::spacepackets::time::TimeWriter; pub mod config; -#[derive(Debug, PartialEq, Eq, Copy, Clone)] -pub enum HandlingStatus { - Empty, - HandledOne, -} - #[derive(Debug)] pub struct TimeStampHelper { stamper: CdsTime, diff --git a/src/pus/action.rs b/src/pus/action.rs index 5b270c2..c208562 100644 --- a/src/pus/action.rs +++ b/src/pus/action.rs @@ -1,4 +1,4 @@ -use log::{error, warn}; +use log::warn; use ops_sat_rs::config::components::PUS_ACTION_SERVICE; use ops_sat_rs::config::tmtc_err; use ops_sat_rs::TimeStampHelper; @@ -13,13 +13,13 @@ use satrs::pus::verification::{ }; use satrs::pus::{ ActiveRequestProvider, EcssTcAndToken, EcssTcInVecConverter, EcssTmSender, EcssTmtcError, - GenericConversionError, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, - PusTcToRequestConverter, PusTmVariant, + GenericConversionError, HandlingStatus, PusPacketHandlingError, PusReplyHandler, + PusServiceHelper, PusTcToRequestConverter, PusTmVariant, }; use satrs::request::{GenericMessage, UniqueApidTargetId}; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader}; -use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket}; +use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket, PusServiceId}; use satrs::spacepackets::SpHeader; use satrs::tmtc::PacketAsVec; use std::sync::mpsc; @@ -28,8 +28,8 @@ use std::time::Duration; use crate::requests::GenericRequestRouter; use super::{ - create_verification_reporter, generic_pus_request_timeout_handler, HandlingStatus, - PusTargetedRequestService, TargetedPusService, + create_verification_reporter, generic_pus_request_timeout_handler, PusTargetedRequestService, + TargetedPusService, }; pub const DATA_REPLY: u8 = 130; @@ -248,47 +248,23 @@ pub struct ActionServiceWrapper { } impl TargetedPusService for ActionServiceWrapper { - /// Returns [true] if the packet handling is finished. - fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { - match self.service.poll_and_handle_next_tc(time_stamp) { - Ok(result) => match result { - PusPacketHandlerResult::RequestHandled => { - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { - warn!("PUS 8 partial packet handling success: {e:?}"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::CustomSubservice(invalid, _) => { - warn!("PUS 8 invalid subservice {invalid}"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { - warn!("PUS 8 subservice {subservice} not implemented"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::Empty => (), - }, - Err(error) => { - error!("PUS packet handling error: {error:?}"); - } + const SERVICE_ID: u8 = PusServiceId::Action as u8; + const SERVICE_STR: &'static str = "action"; + + delegate::delegate! { + to self.service { + fn poll_and_handle_next_tc( + &mut self, + time_stamp: &[u8], + ) -> Result; + + fn poll_and_handle_next_reply( + &mut self, + time_stamp: &[u8], + ) -> Result; + + fn check_for_request_timeouts(&mut self); } - // To avoid permanent loops, treat queue empty by default (all tasks done). - HandlingStatus::Empty - } - - fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus { - // This only fails if all senders disconnected. Treat it like an empty queue. - self.service - .poll_and_check_next_reply(time_stamp) - .unwrap_or_else(|e| { - warn!("PUS 8: Handling reply failed with error {e:?}"); - HandlingStatus::Empty - }) - } - - fn check_for_request_timeouts(&mut self) { - self.service.check_for_request_timeouts(); } } @@ -317,8 +293,8 @@ mod tests { use satrs::pus::test_util::{ TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1, }; + use satrs::pus::verification; use satrs::pus::verification::test_util::TestVerificationReporter; - use satrs::pus::{verification, TcInMemory}; use satrs::request::MessageMetadata; use satrs::ComponentId; use satrs::{ @@ -423,7 +399,7 @@ mod tests { } let result = result.unwrap(); match result { - PusPacketHandlerResult::RequestHandled => (), + HandlingStatus::HandledOne => (), _ => panic!("unexpected result {result:?}"), } } @@ -435,19 +411,19 @@ mod tests { } let result = result.unwrap(); match result { - PusPacketHandlerResult::Empty => (), + HandlingStatus::Empty => (), _ => panic!("unexpected result {result:?}"), } } pub fn verify_next_reply_is_handled_properly(&mut self, time_stamp: &[u8]) { - let result = self.service.poll_and_check_next_reply(time_stamp); + let result = self.service.poll_and_handle_next_reply(time_stamp); assert!(result.is_ok()); assert_eq!(result.unwrap(), HandlingStatus::HandledOne); } pub fn verify_all_replies_handled(&mut self, time_stamp: &[u8]) { - let result = self.service.poll_and_check_next_reply(time_stamp); + let result = self.service.poll_and_handle_next_reply(time_stamp); assert!(result.is_ok()); assert_eq!(result.unwrap(), HandlingStatus::Empty); } @@ -472,11 +448,7 @@ mod tests { .check_next_is_acceptance_success(id, accepted_token.request_id()); self.pus_packet_tx .send(EcssTcAndToken::new( - TcInMemory::Vec(PacketAsVec::new( - self.service.service_helper.id(), - //tc.to_vec().unwrap().into(), - tc.to_vec().unwrap(), - )), + PacketAsVec::new(self.service.service_helper.id(), tc.to_vec().unwrap()), accepted_token, )) .unwrap(); diff --git a/src/pus/event.rs b/src/pus/event.rs index aab9081..ffd6fb6 100644 --- a/src/pus/event.rs +++ b/src/pus/event.rs @@ -1,15 +1,16 @@ use std::sync::mpsc; -use super::HandlingStatus; +use super::{DirectPusService, HandlingStatus}; use crate::pus::create_verification_reporter; -use log::{error, warn}; use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT; use satrs::pus::event_man::EventRequestWithToken; use satrs::pus::event_srv::PusEventServiceHandler; use satrs::pus::verification::VerificationReporter; use satrs::pus::{ - EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper, + DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, + PartialPusHandlingError, PusServiceHelper, }; +use satrs::spacepackets::ecss::PusServiceId; use satrs::tmtc::PacketAsVec; pub fn create_event_service( @@ -41,32 +42,51 @@ pub struct EventServiceWrapper { >, } -impl EventServiceWrapper { - pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { - match self.handler.poll_and_handle_next_tc(time_stamp) { - Ok(result) => match result { - PusPacketHandlerResult::RequestHandled => { - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { - warn!("PUS 5 partial packet handling success: {e:?}"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::CustomSubservice(invalid, _) => { - warn!("PUS 5 invalid subservice {invalid}"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { - warn!("PUS 5 subservice {subservice} not implemented"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::Empty => (), - }, - Err(error) => { - error!("PUS packet handling error: {error:?}"); +impl DirectPusService for EventServiceWrapper { + const SERVICE_ID: u8 = PusServiceId::Event as u8; + + const SERVICE_STR: &'static str = "events"; + + fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { + let error_handler = |partial_error: &PartialPusHandlingError| { + log::warn!( + "PUS {}({}) partial error: {:?}", + Self::SERVICE_ID, + Self::SERVICE_STR, + partial_error + ); + }; + let result = self + .handler + .poll_and_handle_next_tc(error_handler, time_stamp); + if let Err(e) = result { + log::warn!( + "PUS {}({}) error: {:?}", + Self::SERVICE_ID, + Self::SERVICE_STR, + e + ); + return HandlingStatus::HandledOne; + } + match result.unwrap() { + DirectPusPacketHandlerResult::Handled(handling_status) => return handling_status, + DirectPusPacketHandlerResult::CustomSubservice(subservice, _) => { + log::warn!( + "PUS {}({}) subservice {} not implemented", + Self::SERVICE_ID, + Self::SERVICE_STR, + subservice + ); + } + DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { + log::warn!( + "PUS {}({}) subservice {} not implemented", + Self::SERVICE_ID, + Self::SERVICE_STR, + subservice + ); } } - // To avoid permanent loops, treat queue empty by default (all tasks done). - HandlingStatus::Empty + HandlingStatus::HandledOne } } diff --git a/src/pus/hk.rs b/src/pus/hk.rs index a005df6..58b11bf 100644 --- a/src/pus/hk.rs +++ b/src/pus/hk.rs @@ -1,5 +1,4 @@ use derive_new::new; -use log::{error, warn}; use ops_sat_rs::config::components::PUS_HK_SERVICE; use ops_sat_rs::config::{hk_err, tmtc_err}; use satrs::hk::{CollectionIntervalFactor, HkRequest, HkRequestVariant, UniqueId}; @@ -10,7 +9,7 @@ use satrs::pus::verification::{ use satrs::pus::{ ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken, EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, - PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter, + PusPacketHandlingError, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter, }; use satrs::request::{GenericMessage, UniqueApidTargetId}; use satrs::spacepackets::ecss::tc::PusTcReader; @@ -22,7 +21,7 @@ use std::time::Duration; use crate::pus::{create_verification_reporter, generic_pus_request_timeout_handler}; use crate::requests::GenericRequestRouter; -use super::{HandlingStatus, PusTargetedRequestService}; +use super::{HandlingStatus, PusTargetedRequestService, TargetedPusService}; #[derive(Clone, PartialEq, Debug, new)] pub struct HkReply { @@ -267,47 +266,25 @@ pub struct HkServiceWrapper { >, } -impl HkServiceWrapper { - pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { - match self.service.poll_and_handle_next_tc(time_stamp) { - Ok(result) => match result { - PusPacketHandlerResult::RequestHandled => { - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { - warn!("PUS 3 partial packet handling success: {e:?}"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::CustomSubservice(invalid, _) => { - warn!("PUS 3 invalid subservice {invalid}"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { - warn!("PUS 3 subservice {subservice} not implemented"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::Empty => (), - }, - Err(error) => { - error!("PUS packet handling error: {error:?}"); - } +impl TargetedPusService for HkServiceWrapper { + const SERVICE_ID: u8 = 3; + + const SERVICE_STR: &'static str = "housekeeping"; + + delegate::delegate! { + to self.service { + fn poll_and_handle_next_tc( + &mut self, + time_stamp: &[u8], + ) -> Result; + + fn poll_and_handle_next_reply( + &mut self, + time_stamp: &[u8], + ) -> Result; + + fn check_for_request_timeouts(&mut self); } - // To avoid permanent loops, treat queue empty by default (all tasks done). - HandlingStatus::Empty - } - - pub fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus { - // This only fails if all senders disconnected. Treat it like an empty queue. - self.service - .poll_and_check_next_reply(time_stamp) - .unwrap_or_else(|e| { - warn!("PUS 3: Handling reply failed with error {e:?}"); - HandlingStatus::Empty - }) - } - - pub fn check_for_request_timeouts(&mut self) { - self.service.check_for_request_timeouts(); } } diff --git a/src/pus/mod.rs b/src/pus/mod.rs index a89570c..d8bba72 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -10,7 +10,7 @@ use crate::requests::GenericRequestRouter; use log::warn; use ops_sat_rs::config::components::PUS_ROUTING_SERVICE; use ops_sat_rs::config::{tmtc_err, CustomPusServiceId}; -use ops_sat_rs::{HandlingStatus, TimeStampHelper}; +use ops_sat_rs::TimeStampHelper; use satrs::pus::verification::{ self, FailParams, TcStateAccepted, TcStateStarted, VerificationReporter, VerificationReporterCfg, VerificationReportingProvider, VerificationToken, @@ -18,8 +18,8 @@ use satrs::pus::verification::{ use satrs::pus::{ ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError, - MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusPacketHandlingError, - PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory, + HandlingStatus, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlingError, PusReplyHandler, + PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory, }; use satrs::queue::{GenericReceiveError, GenericSendError}; use satrs::request::{Apid, GenericMessage, MessageMetadata}; @@ -78,7 +78,7 @@ impl PusTcDistributor { &mut self, sender_id: ComponentId, tc: Vec, - ) -> Result { + ) -> Result { let pus_tc_result = PusTcReader::new(&tc); if pus_tc_result.is_err() { log::warn!( @@ -87,7 +87,7 @@ impl PusTcDistributor { pus_tc_result.unwrap_err() ); log::warn!("raw data: {:x?}", tc); - return Ok(PusPacketHandlerResult::RequestHandled); + return Ok(HandlingStatus::HandledOne); } let pus_tc = pus_tc_result.unwrap().0; let init_token = self.verif_reporter.add_tc(&pus_tc); @@ -159,17 +159,65 @@ impl PusTcDistributor { } } } - Ok(PusPacketHandlerResult::RequestHandled) + Ok(HandlingStatus::HandledOne) } } pub trait TargetedPusService { - /// Returns [true] interface the packet handling is finished. - fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus; - fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus; + const SERVICE_ID: u8; + const SERVICE_STR: &'static str; + + fn poll_and_handle_next_tc_default_handler(&mut self, time_stamp: &[u8]) -> HandlingStatus { + let result = self.poll_and_handle_next_tc(time_stamp); + if let Err(e) = result { + log::error!( + "PUS service {}({})packet handling error: {:?}", + Self::SERVICE_ID, + Self::SERVICE_STR, + e + ); + // To avoid permanent loops on error cases. + return HandlingStatus::Empty; + } + result.unwrap() + } + + fn poll_and_handle_next_reply_default_handler(&mut self, time_stamp: &[u8]) -> HandlingStatus { + // This only fails if all senders disconnected. Treat it like an empty queue. + self.poll_and_handle_next_reply(time_stamp) + .unwrap_or_else(|e| { + warn!( + "PUS servce {}({}): Handling reply failed with error {:?}", + Self::SERVICE_ID, + Self::SERVICE_STR, + e + ); + HandlingStatus::Empty + }) + } + + fn poll_and_handle_next_tc( + &mut self, + time_stamp: &[u8], + ) -> Result; + + fn poll_and_handle_next_reply( + &mut self, + time_stamp: &[u8], + ) -> Result; + fn check_for_request_timeouts(&mut self); } +/// Generic trait for services which handle packets directly. Kept minimal right now because +/// of the difficulty to allow flexible user code for these services.. +pub trait DirectPusService { + const SERVICE_ID: u8; + const SERVICE_STR: &'static str; + + fn poll_and_handle_next_tc(&mut self, timestamp: &[u8]) -> HandlingStatus; +} + /// This is a generic handlers class for all PUS services where a PUS telecommand is converted /// to a targeted request. /// @@ -262,10 +310,10 @@ where pub fn poll_and_handle_next_tc( &mut self, time_stamp: &[u8], - ) -> Result { + ) -> Result { let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?; if possible_packet.is_none() { - return Ok(PusPacketHandlerResult::Empty); + return Ok(HandlingStatus::Empty); } let ecss_tc_and_token = possible_packet.unwrap(); self.service_helper @@ -321,7 +369,7 @@ where return Err(e.into()); } } - Ok(PusPacketHandlerResult::RequestHandled) + Ok(HandlingStatus::HandledOne) } fn handle_conversion_to_request_error( @@ -374,7 +422,7 @@ where } } - pub fn poll_and_check_next_reply( + pub fn poll_and_handle_next_reply( &mut self, time_stamp: &[u8], ) -> Result { diff --git a/src/pus/mode.rs b/src/pus/mode.rs index 90f822c..5adf460 100644 --- a/src/pus/mode.rs +++ b/src/pus/mode.rs @@ -1,15 +1,14 @@ use derive_new::new; -use log::{error, warn}; use satrs::tmtc::PacketAsVec; use std::sync::mpsc; use std::time::Duration; use crate::requests::GenericRequestRouter; use ops_sat_rs::config::components::PUS_MODE_SERVICE; -use ops_sat_rs::config::{mode_err, tmtc_err}; +use ops_sat_rs::config::{mode_err, tmtc_err, CustomPusServiceId}; use satrs::pus::verification::VerificationReporter; use satrs::pus::{ - DefaultActiveRequestMap, EcssTcAndToken, EcssTcInVecConverter, PusPacketHandlerResult, + DefaultActiveRequestMap, EcssTcAndToken, EcssTcInVecConverter, PusPacketHandlingError, PusServiceHelper, }; use satrs::request::GenericMessage; @@ -239,48 +238,27 @@ pub struct ModeServiceWrapper { } impl TargetedPusService for ModeServiceWrapper { - /// Returns [true] if the packet handling is finished. - fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { - match self.service.poll_and_handle_next_tc(time_stamp) { - Ok(result) => match result { - PusPacketHandlerResult::RequestHandled => { - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { - warn!("PUS mode service: partial packet handling success: {e:?}"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::CustomSubservice(invalid, _) => { - warn!("PUS mode service: invalid subservice {invalid}"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { - warn!("PUS mode service: {subservice} not implemented"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::Empty => (), - }, - Err(error) => { - error!("PUS mode service: packet handling error: {error:?}"); - } + const SERVICE_ID: u8 = CustomPusServiceId::Mode as u8; + + const SERVICE_STR: &'static str = "mode"; + + delegate::delegate! { + to self.service { + fn poll_and_handle_next_tc( + &mut self, + time_stamp: &[u8], + ) -> Result; + + fn poll_and_handle_next_reply( + &mut self, + time_stamp: &[u8], + ) -> Result; + + fn check_for_request_timeouts(&mut self); } - // To avoid permanent loops, treat queue empty by default (all tasks done). - HandlingStatus::Empty - } - - fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus { - self.service - .poll_and_check_next_reply(time_stamp) - .unwrap_or_else(|e| { - warn!("PUS action service: Handling reply failed with error {e:?}"); - HandlingStatus::HandledOne - }) - } - - fn check_for_request_timeouts(&mut self) { - self.service.check_for_request_timeouts(); } } + #[cfg(test)] mod tests { use ops_sat_rs::config::tmtc_err; diff --git a/src/pus/scheduler.rs b/src/pus/scheduler.rs index c4d6227..cf3d08a 100644 --- a/src/pus/scheduler.rs +++ b/src/pus/scheduler.rs @@ -2,65 +2,20 @@ use std::sync::mpsc; use std::time::Duration; use crate::pus::create_verification_reporter; -use log::{error, info, warn}; +use log::info; use ops_sat_rs::config::components::PUS_SCHEDULER_SERVICE; -use satrs::pool::{PoolProvider, StaticMemoryPool}; +use satrs::pool::StaticMemoryPool; use satrs::pus::scheduler::{PusScheduler, TcInfo}; use satrs::pus::scheduler_srv::PusSchedServiceHandler; use satrs::pus::verification::VerificationReporter; use satrs::pus::{ - EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper, + DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, HandlingStatus, + MpscTcReceiver, PartialPusHandlingError, PusServiceHelper, }; -use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool}; -use satrs::ComponentId; +use satrs::spacepackets::ecss::PusServiceId; +use satrs::tmtc::PacketAsVec; -use super::HandlingStatus; - -pub trait TcReleaser { - fn release(&mut self, sender_id: ComponentId, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool; -} - -impl TcReleaser for PacketSenderWithSharedPool { - fn release( - &mut self, - sender_id: ComponentId, - enabled: bool, - _info: &TcInfo, - tc: &[u8], - ) -> bool { - if enabled { - let shared_pool = self.shared_pool.get_mut(); - // Transfer TC from scheduler TC pool to shared TC pool. - let released_tc_addr = shared_pool - .0 - .write() - .expect("locking pool failed") - .add(tc) - .expect("adding TC to shared pool failed"); - self.sender - .send(PacketInPool::new(sender_id, released_tc_addr)) - .expect("sending TC to TC source failed"); - } - true - } -} - -impl TcReleaser for mpsc::Sender { - fn release( - &mut self, - sender_id: ComponentId, - enabled: bool, - _info: &TcInfo, - tc: &[u8], - ) -> bool { - if enabled { - // Send released TC to centralized TC source. - self.send(PacketAsVec::new(sender_id, tc.to_vec())) - .expect("sending TC to TC source failed"); - } - true - } -} +use super::DirectPusService; pub struct SchedulingService { pub pus_11_handler: PusSchedServiceHandler< @@ -72,14 +27,72 @@ pub struct SchedulingService { >, pub sched_tc_pool: StaticMemoryPool, pub releaser_buf: [u8; 4096], - pub tc_releaser: Box, + pub tc_releaser: mpsc::Sender, +} + +impl DirectPusService for SchedulingService { + const SERVICE_ID: u8 = PusServiceId::Verification as u8; + + const SERVICE_STR: &'static str = "verification"; + + fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { + let error_handler = |partial_error: &PartialPusHandlingError| { + log::warn!( + "PUS {}({}) partial error: {:?}", + Self::SERVICE_ID, + Self::SERVICE_STR, + partial_error + ); + }; + + let result = self.pus_11_handler.poll_and_handle_next_tc( + error_handler, + time_stamp, + &mut self.sched_tc_pool, + ); + if let Err(e) = result { + log::warn!( + "PUS {}({}) error: {:?}", + Self::SERVICE_ID, + Self::SERVICE_STR, + e + ); + return HandlingStatus::HandledOne; + } + match result.unwrap() { + DirectPusPacketHandlerResult::Handled(handling_status) => return handling_status, + DirectPusPacketHandlerResult::CustomSubservice(subservice, _) => { + log::warn!( + "PUS {}({}) subservice {} not implemented", + Self::SERVICE_ID, + Self::SERVICE_STR, + subservice + ); + } + DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { + log::warn!( + "PUS {}({}) subservice {} not implemented", + Self::SERVICE_ID, + Self::SERVICE_STR, + subservice + ); + } + } + HandlingStatus::HandledOne + } } impl SchedulingService { pub fn release_tcs(&mut self) { let id = self.pus_11_handler.service_helper.id(); - let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool { - self.tc_releaser.release(id, enabled, info, tc) + let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool { + if enabled { + // Send released TC to centralized TC source. + self.tc_releaser + .send(PacketAsVec::new(id, tc.to_vec())) + .expect("sending TC to TC source failed"); + } + true }; self.pus_11_handler @@ -99,37 +112,6 @@ impl SchedulingService { info!("{released_tcs} TC(s) released from scheduler"); } } - - pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { - match self - .pus_11_handler - .poll_and_handle_next_tc(time_stamp, &mut self.sched_tc_pool) - { - Ok(result) => match result { - PusPacketHandlerResult::RequestHandled => { - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { - warn!("PUS11 partial packet handling success: {e:?}"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::CustomSubservice(invalid, _) => { - warn!("PUS11 invalid subservice {invalid}"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { - warn!("PUS11: Subservice {subservice} not implemented"); - return HandlingStatus::HandledOne; - } - PusPacketHandlerResult::Empty => (), - }, - Err(error) => { - error!("PUS packet handling error: {error:?}") - } - } - // To avoid permanent loops, treat queue empty by default (all tasks done). - HandlingStatus::Empty - } } pub fn create_scheduler_service( @@ -158,6 +140,6 @@ pub fn create_scheduler_service( pus_11_handler, sched_tc_pool, releaser_buf: [0; 4096], - tc_releaser: Box::new(tc_source_sender), + tc_releaser: tc_source_sender, } } diff --git a/src/pus/stack.rs b/src/pus/stack.rs index cbed611..3914ce0 100644 --- a/src/pus/stack.rs +++ b/src/pus/stack.rs @@ -5,7 +5,7 @@ use satrs::spacepackets::time::{cds, TimeWriter}; use super::{ action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper, - mode::ModeServiceWrapper, scheduler::SchedulingService, TargetedPusService, + mode::ModeServiceWrapper, scheduler::SchedulingService, DirectPusService, TargetedPusService, }; #[derive(new)] @@ -23,59 +23,28 @@ impl PusStack { // Release all telecommands which reached their release time before calling the service // handlers. self.schedule_srv.release_tcs(); - let time_stamp = cds::CdsTime::now_with_u16_days() + let timestamp = cds::CdsTime::now_with_u16_days() .expect("time stamp generation error") .to_vec() .unwrap(); - let mut loop_count = 0; + let mut loop_count = 0_u32; + // Hot loop which will run continuously until all request and reply handling is done. loop { let mut nothing_to_do = true; - let mut is_srv_finished = - |_srv_id: u8, - tc_handling_status: HandlingStatus, - reply_handling_status: Option| { - if tc_handling_status == HandlingStatus::HandledOne - || (reply_handling_status.is_some() - && reply_handling_status.unwrap() == HandlingStatus::HandledOne) - { - nothing_to_do = false; - } - }; - is_srv_finished( - 17, - self.test_srv.poll_and_handle_next_packet(&time_stamp), - None, + Self::direct_service_checker(&mut self.test_srv, ×tamp, &mut nothing_to_do); + Self::direct_service_checker(&mut self.schedule_srv, ×tamp, &mut nothing_to_do); + Self::direct_service_checker(&mut self.event_srv, ×tamp, &mut nothing_to_do); + Self::targeted_service_checker( + &mut self.action_srv_wrapper, + ×tamp, + &mut nothing_to_do, ); - is_srv_finished( - 11, - self.schedule_srv.poll_and_handle_next_tc(&time_stamp), - None, + Self::targeted_service_checker( + &mut self.hk_srv_wrapper, + ×tamp, + &mut nothing_to_do, ); - is_srv_finished(5, self.event_srv.poll_and_handle_next_tc(&time_stamp), None); - is_srv_finished( - 8, - self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp), - Some( - self.action_srv_wrapper - .poll_and_handle_next_reply(&time_stamp), - ), - ); - is_srv_finished( - 3, - self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp), - Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)), - ); - is_srv_finished( - 200, - self.mode_srv.poll_and_handle_next_tc(&time_stamp), - Some(self.mode_srv.poll_and_handle_next_reply(&time_stamp)), - ); - // Safety mechanism to avoid infinite loops. - loop_count += 1; - if loop_count >= 500 { - log::warn!("reached PUS stack loop count 500, breaking"); - break; - } + Self::targeted_service_checker(&mut self.mode_srv, ×tamp, &mut nothing_to_do); if nothing_to_do { // Timeout checking is only done once. self.action_srv_wrapper.check_for_request_timeouts(); @@ -83,6 +52,37 @@ impl PusStack { self.mode_srv.check_for_request_timeouts(); break; } + // Safety mechanism to avoid infinite loops. + loop_count += 1; + if loop_count >= 500 { + log::warn!("reached PUS stack loop count 500, breaking"); + break; + } + } + } + + pub fn direct_service_checker( + service: &mut S, + timestamp: &[u8], + nothing_to_do: &mut bool, + ) { + let handling_status = service.poll_and_handle_next_tc(timestamp); + if handling_status == HandlingStatus::HandledOne { + *nothing_to_do = false; + } + } + + pub fn targeted_service_checker( + service: &mut S, + timestamp: &[u8], + nothing_to_do: &mut bool, + ) { + let request_handling = service.poll_and_handle_next_tc_default_handler(timestamp); + let reply_handling = service.poll_and_handle_next_reply_default_handler(timestamp); + if request_handling == HandlingStatus::HandledOne + || reply_handling == HandlingStatus::HandledOne + { + *nothing_to_do = false; } } } diff --git a/src/pus/test.rs b/src/pus/test.rs index 4c55d16..31830ea 100644 --- a/src/pus/test.rs +++ b/src/pus/test.rs @@ -1,22 +1,20 @@ use crate::pus::create_verification_reporter; -use log::{info, warn}; +use log::info; use ops_sat_rs::config::components::PUS_TEST_SERVICE; use ops_sat_rs::config::{tmtc_err, TEST_EVENT}; use satrs::event_man::{EventMessage, EventMessageU32}; use satrs::pus::test::PusService17TestHandler; use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; use satrs::pus::{ - EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, - PusPacketHandlerResult, PusServiceHelper, + DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, HandlingStatus, + MpscTcReceiver, MpscTmAsVecSender, PartialPusHandlingError, PusServiceHelper, }; -use satrs::spacepackets::ecss::tc::PusTcReader; -use satrs::spacepackets::ecss::PusPacket; -use satrs::spacepackets::time::cds::CdsTime; -use satrs::spacepackets::time::TimeWriter; +use satrs::queue::GenericSendError; +use satrs::spacepackets::ecss::PusServiceId; use satrs::tmtc::PacketAsVec; use std::sync::mpsc; -use super::HandlingStatus; +use super::DirectPusService; pub fn create_test_service( tm_funnel_tx: mpsc::Sender, @@ -46,64 +44,84 @@ pub struct TestCustomServiceWrapper { pub event_tx: mpsc::SyncSender, } -impl TestCustomServiceWrapper { - pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> HandlingStatus { - let res = self.handler.poll_and_handle_next_tc(time_stamp); - if res.is_err() { - warn!("PUS17 handlers failed with error {:?}", res.unwrap_err()); - return HandlingStatus::Empty; +impl DirectPusService for TestCustomServiceWrapper { + const SERVICE_ID: u8 = PusServiceId::Test as u8; + const SERVICE_STR: &'static str = "test"; + + fn poll_and_handle_next_tc(&mut self, timestamp: &[u8]) -> HandlingStatus { + let error_handler = |partial_error: &PartialPusHandlingError| { + log::warn!( + "PUS {}({}) partial error: {:?}", + Self::SERVICE_ID, + Self::SERVICE_STR, + partial_error + ); + }; + let res = self + .handler + .poll_and_handle_next_tc(error_handler, timestamp); + if let Err(e) = res { + log::warn!( + "PUS {}({}) error: {:?}", + Self::SERVICE_ID, + Self::SERVICE_STR, + e + ); + return HandlingStatus::HandledOne; } match res.unwrap() { - PusPacketHandlerResult::RequestHandled => { - info!("Received PUS ping command TC[17,1]"); - info!("Sent ping reply PUS TM[17,2]"); - return HandlingStatus::HandledOne; + DirectPusPacketHandlerResult::Handled(handling_status) => { + if handling_status == HandlingStatus::HandledOne { + info!("Received PUS ping command TC[17,1]"); + info!("Sent ping reply PUS TM[17,2]"); + } + return handling_status; } - PusPacketHandlerResult::RequestHandledPartialSuccess(partial_err) => { - warn!( - "Handled PUS ping command with partial success: {:?}", - partial_err + DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { + log::warn!( + "PUS {}({}) subservice {} not implemented", + Self::SERVICE_ID, + Self::SERVICE_STR, + subservice ); - return HandlingStatus::HandledOne; } - PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { - warn!("PUS17: Subservice {subservice} not implemented"); - return HandlingStatus::HandledOne; - } - // TODO: adapt interface events are implemented - PusPacketHandlerResult::CustomSubservice(subservice, token) => { - let (tc, _) = PusTcReader::new( - self.handler - .service_helper - .tc_in_mem_converter - .tc_slice_raw(), - ) - .unwrap(); - let time_stamper = CdsTime::now_with_u16_days().unwrap(); - let mut stamp_buf: [u8; 7] = [0; 7]; - time_stamper.write_to_bytes(&mut stamp_buf).unwrap(); + DirectPusPacketHandlerResult::CustomSubservice(subservice, token) => { if subservice == 128 { - info!("Generating test event"); - self.event_tx + info!("generating test event"); + if let Err(e) = self + .event_tx .send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into())) - .expect("Sending test event failed"); - let start_token = self - .handler - .service_helper - .verif_reporter() - .start_success(self.handler.service_helper.tm_sender(), token, &stamp_buf) - .expect("Error sending start success"); - self.handler - .service_helper - .verif_reporter() - .completion_success( - self.handler.service_helper.tm_sender(), - start_token, - &stamp_buf, - ) - .expect("Error sending completion success"); + .map_err(|_| GenericSendError::RxDisconnected) + { + // This really should not happen but I want to avoid panicking.. + log::warn!("failed to send test event: {:?}", e); + } + + match self.handler.service_helper.verif_reporter().start_success( + self.handler.service_helper.tm_sender(), + token, + timestamp, + ) { + Ok(started_token) => { + if let Err(e) = self + .handler + .service_helper + .verif_reporter() + .completion_success( + self.handler.service_helper.tm_sender(), + started_token, + timestamp, + ) + { + error_handler(&PartialPusHandlingError::Verification(e)); + } + } + Err(e) => { + error_handler(&PartialPusHandlingError::Verification(e)); + } + } } else { - let fail_data = [tc.subservice()]; + let fail_data = [subservice]; self.handler .service_helper .verif_reporter() @@ -111,18 +129,15 @@ impl TestCustomServiceWrapper { self.handler.service_helper.tm_sender(), token, FailParams::new( - &stamp_buf, + timestamp, &tmtc_err::INVALID_PUS_SUBSERVICE, &fail_data, ), ) .expect("Sending start failure verification failed"); } - return HandlingStatus::HandledOne; } - PusPacketHandlerResult::Empty => (), } - // To avoid permanent loops, treat queue empty by default (all tasks done). - HandlingStatus::Empty + HandlingStatus::HandledOne } } diff --git a/src/tmtc/tc_source.rs b/src/tmtc/tc_source.rs index 055628e..3dac8e2 100644 --- a/src/tmtc/tc_source.rs +++ b/src/tmtc/tc_source.rs @@ -1,7 +1,9 @@ use std::sync::mpsc::{self, TryRecvError}; -use ops_sat_rs::HandlingStatus; -use satrs::{pus::MpscTmAsVecSender, tmtc::PacketAsVec}; +use satrs::{ + pus::{HandlingStatus, MpscTmAsVecSender}, + tmtc::PacketAsVec, +}; use crate::pus::PusTcDistributor;