From 59a06b5c50dd94461dbac51a1f1db054c3b5f270 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 10 Apr 2024 14:59:34 +0200 Subject: [PATCH] add action service and controller obj --- Cargo.lock | 128 ++++++++ Cargo.toml | 3 + src/config.rs | 68 ++--- src/interface/tcp.rs | 2 +- src/interface/udp.rs | 2 +- src/main.rs | 46 +-- src/pus/action.rs | 714 +++++++++++++++++++++++++++++++++++++++++++ src/pus/mod.rs | 51 ++-- src/pus/stack.rs | 36 +-- src/pus/test.rs | 37 ++- 10 files changed, 950 insertions(+), 137 deletions(-) create mode 100644 src/pus/action.rs diff --git a/Cargo.lock b/Cargo.lock index 05ceabe..461c5fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14,6 +14,15 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + [[package]] name = "allocator-api2" version = "0.2.16" @@ -35,6 +44,54 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" + +[[package]] +name = "anstyle-parse" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" +dependencies = [ + "anstyle", + "windows-sys", +] + [[package]] name = "array-init" version = "0.0.4" @@ -111,6 +168,12 @@ name = "cobs" version = "0.2.3" source = "git+https://github.com/robamu/cobs.rs.git?branch=all_features#c70a7f30fd00a7cbdb7666dec12b437977385d40" +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -202,6 +265,29 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" +[[package]] +name = "env_filter" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -239,6 +325,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -375,6 +467,7 @@ version = "0.0.1" dependencies = [ "chrono", "derive-new", + "env_logger", "fern", "lazy_static", "log", @@ -440,6 +533,35 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" + [[package]] name = "rustversion" version = "1.0.15" @@ -666,6 +788,12 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "version_check" version = "0.9.4" diff --git a/Cargo.toml b/Cargo.toml index 5a74807..0b0fd40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,3 +25,6 @@ features = ["test_util"] version = "0.1.1" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" branch = "main" + +[dev-dependencies] +env_logger = "0.11" diff --git a/src/config.rs b/src/config.rs index cccd73c..8041828 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,30 +1,12 @@ -use lazy_static::lazy_static; use num_enum::{IntoPrimitive, TryFromPrimitive}; -use satrs::spacepackets::{PacketId, PacketType}; use satrs_mib::res_code::ResultU16Info; use satrs_mib::resultcode; -use std::{collections::HashSet, net::Ipv4Addr}; -use strum::IntoEnumIterator; +use std::net::Ipv4Addr; pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const SERVER_PORT: u16 = 7301; - -lazy_static! { - pub static ref PACKET_ID_VALIDATOR: HashSet = { - let mut set = HashSet::new(); - for id in components::Apid::iter() { - set.insert(PacketId::new(PacketType::Tc, true, id as u16)); - } - set - }; - pub static ref APID_VALIDATOR: HashSet = { - let mut set = HashSet::new(); - for id in components::Apid::iter() { - set.insert(id as u16); - } - set - }; -} +pub const EXPERIMENT_ID: u32 = 278; +pub const EXPERIMENT_APID: u16 = 1024 + EXPERIMENT_ID as u16; #[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)] #[repr(u8)] @@ -78,45 +60,35 @@ pub mod tmtc_err { pub mod components { use satrs::request::UniqueApidTargetId; - use strum::EnumIter; - #[derive(Copy, Clone, PartialEq, Eq, EnumIter)] - pub enum Apid { - Sched = 1, - GenericPus = 2, - Cfdp = 4, - } + use super::EXPERIMENT_APID; // Component IDs for components with the PUS APID. #[derive(Copy, Clone, PartialEq, Eq)] - pub enum PusId { - PusEventManagement = 0, - PusRouting = 1, - PusTest = 2, - PusAction = 3, - PusMode = 4, - PusHk = 5, - } - - #[derive(Copy, Clone, PartialEq, Eq)] - pub enum AcsId { - Mgm0 = 0, + pub enum UniqueId { + Controller = 0, + PusEventManagement = 1, + PusRouting = 2, + PusTest = 3, + PusAction = 4, + PusMode = 5, + PusHk = 6, } + pub const CONTROLLER_ID: UniqueApidTargetId = + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::Controller as u32); pub const PUS_ACTION_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusAction as u32); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusAction as u32); pub const PUS_EVENT_MANAGEMENT: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, 0); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusEventManagement as u32); pub const PUS_ROUTING_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusRouting as u32); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusRouting as u32); pub const PUS_TEST_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusTest as u32); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusTest as u32); pub const PUS_MODE_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusMode as u32); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusMode as u32); pub const PUS_HK_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusHk as u32); - pub const PUS_SCHED_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::Sched as u16, 0); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusHk as u32); } pub mod tasks { diff --git a/src/interface/tcp.rs b/src/interface/tcp.rs index 04bb136..8055af8 100644 --- a/src/interface/tcp.rs +++ b/src/interface/tcp.rs @@ -101,7 +101,7 @@ impl< cfg: ServerConfig, tm_source: SyncTcpTmSource, tc_receiver: CcsdsDistributor, MpscErrorType>, - packet_id_lookup: HashSet, + packet_id_lookup: Vec, ) -> Result { Ok(Self { server: TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)?, diff --git a/src/interface/udp.rs b/src/interface/udp.rs index 5c45e9e..65193a9 100644 --- a/src/interface/udp.rs +++ b/src/interface/udp.rs @@ -82,6 +82,7 @@ mod tests { sync::{Arc, Mutex}, }; + use ops_sat_rs::config::{components, OBSW_SERVER_ADDR}; use satrs::{ spacepackets::{ ecss::{tc::PusTcCreator, WritablePusPacket}, @@ -89,7 +90,6 @@ mod tests { }, tmtc::ReceivesTcCore, }; - use ops_sat_rs::config::{components, OBSW_SERVER_ADDR}; use super::*; diff --git a/src/main.rs b/src/main.rs index 2db5ca5..b4ee981 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,17 +6,15 @@ use std::{ }; use log::info; -use ops_sat_rs::config::tasks::FREQ_MS_PUS_STACK; -use ops_sat_rs::config::{ - tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT, -}; +use ops_sat_rs::config::{components::CONTROLLER_ID, tasks::FREQ_MS_PUS_STACK, EXPERIMENT_APID}; +use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use satrs::{ hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, + spacepackets::PacketId, tmtc::CcsdsDistributor, }; -use crate::pus::stack::PusStack; -use crate::pus::test::create_test_service_dynamic; +use crate::pus::test::create_test_service; use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::tm_funnel::TmFunnelDynamic; use crate::tmtc::TcSourceTaskDynamic; @@ -27,6 +25,10 @@ use crate::{ logger::setup_logger, tmtc::PusTcSourceProviderDynamic, }; +use crate::{ + pus::{action::create_action_service, stack::PusStack}, + requests::GenericRequestRouter, +}; mod ccsds; mod interface; @@ -51,23 +53,30 @@ fn main() { // let (pus_event_tx, pus_event_rx) = mpsc::channel(); // let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); // let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); - // let (pus_action_tx, pus_action_rx) = mpsc::channel(); + let (pus_action_tx, pus_action_rx) = mpsc::channel(); // let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); - // let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); + let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); // let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); // let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel(); + let (controller_composite_tx, controller_composite_rx) = mpsc::channel(); + + // Some request are targetable. This map is used to retrieve sender handles based on a target ID. + let mut request_map = GenericRequestRouter::default(); + request_map + .composite_router_map + .insert(CONTROLLER_ID.id(), controller_composite_tx); let pus_router = PusTcMpscRouter { test_tc_sender: pus_test_tx, // event_tc_sender: pus_event_tx, // sched_tc_sender: pus_sched_tx, // hk_tc_sender: pus_hk_tx, - // action_tc_sender: pus_action_tx, + action_tc_sender: pus_action_tx, // mode_tc_sender: pus_mode_tx, }; - let pus_test_service = create_test_service_dynamic( + let pus_test_service = create_test_service( tm_funnel_tx.clone(), // event_handler.clone_event_sender(), pus_test_rx, @@ -81,12 +90,12 @@ fn main() { // // let pus_event_service = // create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx); - // let pus_action_service = create_action_service_dynamic( - // tm_funnel_tx.clone(), - // pus_action_rx, - // request_map.clone(), - // pus_action_reply_rx, - // ); + let pus_action_service = create_action_service( + tm_funnel_tx.clone(), + pus_action_rx, + request_map.clone(), + pus_action_reply_rx, + ); // let pus_hk_service = create_hk_service_dynamic( // tm_funnel_tx.clone(), // pus_hk_rx, @@ -103,7 +112,7 @@ fn main() { pus_test_service, // pus_hk_service, // pus_event_service, - // pus_action_service, + pus_action_service, // pus_scheduler_service, // pus_mode_service, ); @@ -133,7 +142,8 @@ fn main() { tcp_server_cfg, sync_tm_tcp_source.clone(), tcp_ccsds_distributor, - PACKET_ID_VALIDATOR.clone(), + vec![PacketId::new_for_tc(true, EXPERIMENT_APID)], + // PACKET_ID_VALIDATOR.clone(), ) .expect("tcp server creation failed"); diff --git a/src/pus/action.rs b/src/pus/action.rs new file mode 100644 index 0000000..dc5f133 --- /dev/null +++ b/src/pus/action.rs @@ -0,0 +1,714 @@ +use log::{error, warn}; +use ops_sat_rs::config::components::PUS_ACTION_SERVICE; +use ops_sat_rs::config::tmtc_err; +use satrs::action::{ActionRequest, ActionRequestVariant}; +use satrs::params::WritableToBeBytes; +use satrs::pus::action::{ + ActionReplyVariant, ActivePusActionRequestStd, DefaultActiveActionRequestMap, PusActionReply, +}; +use satrs::pus::verification::{ + FailParams, FailParamsWithStep, TcStateAccepted, TcStateStarted, VerificationReporter, + VerificationReportingProvider, VerificationToken, +}; +use satrs::pus::{ + ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, + EcssTmSenderCore, EcssTmtcError, GenericConversionError, MpscTmAsVecSender, + PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter, PusTmAsVec, +}; +use satrs::request::{GenericMessage, UniqueApidTargetId}; +use satrs::spacepackets::ecss::tc::PusTcReader; +use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket}; +use std::sync::mpsc; +use std::time::Duration; + +use crate::requests::GenericRequestRouter; + +use super::{ + create_verification_reporter, generic_pus_request_timeout_handler, HandlingStatus, + PusTargetedRequestService, TargetedPusService, +}; + +pub struct ActionReplyHandler { + fail_data_buf: [u8; 128], +} + +impl Default for ActionReplyHandler { + fn default() -> Self { + Self { + fail_data_buf: [0; 128], + } + } +} + +impl PusReplyHandler for ActionReplyHandler { + type Error = EcssTmtcError; + + fn handle_unrequested_reply( + &mut self, + reply: &GenericMessage, + _tm_sender: &impl EcssTmSenderCore, + ) -> Result<(), Self::Error> { + warn!("received unexpected reply for service 8: {reply:?}"); + Ok(()) + } + + fn handle_reply( + &mut self, + reply: &GenericMessage, + active_request: &ActivePusActionRequestStd, + tm_sender: &(impl EcssTmSenderCore + ?Sized), + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result { + let verif_token: VerificationToken = active_request + .token() + .try_into() + .expect("invalid token state"); + let remove_entry = match &reply.message.variant { + ActionReplyVariant::CompletionFailed { error_code, params } => { + let mut fail_data_len = 0; + if let Some(params) = params { + fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?; + } + verification_handler.completion_failure( + tm_sender, + verif_token, + FailParams::new(time_stamp, error_code, &self.fail_data_buf[..fail_data_len]), + )?; + true + } + ActionReplyVariant::StepFailed { + error_code, + step, + params, + } => { + let mut fail_data_len = 0; + if let Some(params) = params { + fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?; + } + verification_handler.step_failure( + tm_sender, + verif_token, + FailParamsWithStep::new( + time_stamp, + &EcssEnumU16::new(*step), + error_code, + &self.fail_data_buf[..fail_data_len], + ), + )?; + true + } + ActionReplyVariant::Completed => { + verification_handler.completion_success(tm_sender, verif_token, time_stamp)?; + true + } + ActionReplyVariant::StepSuccess { step } => { + verification_handler.step_success( + tm_sender, + &verif_token, + time_stamp, + EcssEnumU16::new(*step), + )?; + false + } + _ => false, + }; + Ok(remove_entry) + } + + fn handle_request_timeout( + &mut self, + active_request: &ActivePusActionRequestStd, + tm_sender: &impl EcssTmSenderCore, + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result<(), Self::Error> { + generic_pus_request_timeout_handler( + tm_sender, + active_request, + verification_handler, + time_stamp, + "action", + ) + } +} + +#[derive(Default)] +pub struct ActionRequestConverter {} + +impl PusTcToRequestConverter for ActionRequestConverter { + type Error = GenericConversionError; + + fn convert( + &mut self, + token: VerificationToken, + tc: &PusTcReader, + tm_sender: &(impl EcssTmSenderCore + ?Sized), + verif_reporter: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result<(ActivePusActionRequestStd, ActionRequest), Self::Error> { + let subservice = tc.subservice(); + let user_data = tc.user_data(); + if user_data.len() < 8 { + verif_reporter + .start_failure( + tm_sender, + token, + FailParams::new_no_fail_data(time_stamp, &tmtc_err::NOT_ENOUGH_APP_DATA), + ) + .expect("Sending start failure failed"); + return Err(GenericConversionError::NotEnoughAppData { + expected: 8, + found: user_data.len(), + }); + } + let target_id_and_apid = UniqueApidTargetId::from_pus_tc(tc).unwrap(); + let action_id = u32::from_be_bytes(user_data[4..8].try_into().unwrap()); + if subservice == 128 { + let req_variant = if user_data.len() == 8 { + ActionRequestVariant::NoData + } else { + ActionRequestVariant::VecData(user_data[8..].to_vec()) + }; + Ok(( + ActivePusActionRequestStd::new( + action_id, + target_id_and_apid.into(), + token.into(), + Duration::from_secs(30), + ), + ActionRequest::new(action_id, req_variant), + )) + } else { + verif_reporter + .start_failure( + tm_sender, + token, + FailParams::new_no_fail_data(time_stamp, &tmtc_err::INVALID_PUS_SUBSERVICE), + ) + .expect("Sending start failure failed"); + Err(GenericConversionError::InvalidSubservice(subservice)) + } + } +} + +pub fn create_action_service( + tm_funnel_tx: mpsc::Sender, + pus_action_rx: mpsc::Receiver, + action_router: GenericRequestRouter, + reply_receiver: mpsc::Receiver>, +) -> ActionServiceWrapper { + let action_request_handler = PusTargetedRequestService::new( + PusServiceHelper::new( + PUS_ACTION_SERVICE.id(), + pus_action_rx, + tm_funnel_tx, + create_verification_reporter(PUS_ACTION_SERVICE.id(), PUS_ACTION_SERVICE.apid), + EcssTcInVecConverter::default(), + ), + ActionRequestConverter::default(), + DefaultActiveActionRequestMap::default(), + ActionReplyHandler::default(), + action_router, + reply_receiver, + ); + ActionServiceWrapper { + service: action_request_handler, + } +} + +pub struct ActionServiceWrapper { + pub(crate) service: PusTargetedRequestService< + VerificationReporter, + ActionRequestConverter, + ActionReplyHandler, + DefaultActiveActionRequestMap, + ActivePusActionRequestStd, + ActionRequest, + PusActionReply, + >, +} + +impl TargetedPusService for ActionServiceWrapper { + /// Returns [true] if the packet handling is finished. + fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { + match self.service.poll_and_handle_next_tc(time_stamp) { + Ok(result) => match result { + PusPacketHandlerResult::RequestHandled => {} + PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { + warn!("PUS 8 partial packet handling success: {e:?}") + } + PusPacketHandlerResult::CustomSubservice(invalid, _) => { + warn!("PUS 8 invalid subservice {invalid}"); + } + PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { + warn!("PUS 8 subservice {subservice} not implemented"); + } + PusPacketHandlerResult::Empty => return HandlingStatus::Empty, + }, + Err(error) => { + error!("PUS packet handling error: {error:?}") + } + } + HandlingStatus::HandledOne + } + + fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus { + // This only fails if all senders disconnected. Treat it like an empty queue. + self.service + .poll_and_check_next_reply(time_stamp) + .unwrap_or_else(|e| { + warn!("PUS 8: Handling reply failed with error {e:?}"); + HandlingStatus::Empty + }) + } + + fn check_for_request_timeouts(&mut self) { + self.service.check_for_request_timeouts(); + } +} + +#[cfg(test)] +mod tests { + use satrs::pus::test_util::{ + TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1, + }; + use satrs::pus::verification; + use satrs::pus::verification::test_util::TestVerificationReporter; + use satrs::request::MessageMetadata; + use satrs::ComponentId; + use satrs::{ + res_code::ResultU16, + spacepackets::{ + ecss::{ + tc::{PusTcCreator, PusTcSecondaryHeader}, + tm::PusTmReader, + WritablePusPacket, + }, + SpHeader, + }, + }; + + use crate::{ + pus::tests::{PusConverterTestbench, ReplyHandlerTestbench, TargetedPusRequestTestbench}, + requests::CompositeRequest, + }; + + use super::*; + + impl + TargetedPusRequestTestbench< + ActionRequestConverter, + ActionReplyHandler, + DefaultActiveActionRequestMap, + ActivePusActionRequestStd, + ActionRequest, + PusActionReply, + > + { + pub fn new_for_action(owner_id: ComponentId, target_id: ComponentId) -> Self { + let _ = env_logger::builder().is_test(true).try_init(); + let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); + let (pus_action_tx, pus_action_rx) = mpsc::channel(); + let (action_reply_tx, action_reply_rx) = mpsc::channel(); + let (action_req_tx, action_req_rx) = mpsc::channel(); + let verif_reporter = TestVerificationReporter::new(owner_id); + let mut generic_req_router = GenericRequestRouter::default(); + generic_req_router + .composite_router_map + .insert(target_id, action_req_tx); + Self { + service: PusTargetedRequestService::new( + PusServiceHelper::new( + owner_id, + pus_action_rx, + tm_funnel_tx.clone(), + verif_reporter, + EcssTcInVecConverter::default(), + ), + ActionRequestConverter::default(), + DefaultActiveActionRequestMap::default(), + ActionReplyHandler::default(), + generic_req_router, + action_reply_rx, + ), + request_id: None, + pus_packet_tx: pus_action_tx, + tm_funnel_rx, + reply_tx: action_reply_tx, + request_rx: action_req_rx, + } + } + + pub fn verify_packet_started(&self) { + self.service + .service_helper + .common + .verif_reporter + .check_next_is_started_success( + self.service.service_helper.id(), + self.request_id.expect("request ID not set").into(), + ); + } + + pub fn verify_packet_completed(&self) { + self.service + .service_helper + .common + .verif_reporter + .check_next_is_completion_success( + self.service.service_helper.id(), + self.request_id.expect("request ID not set").into(), + ); + } + + pub fn verify_tm_empty(&self) { + let packet = self.tm_funnel_rx.try_recv(); + if let Err(mpsc::TryRecvError::Empty) = packet { + } else { + let tm = packet.unwrap(); + let unexpected_tm = PusTmReader::new(&tm.packet, 7).unwrap().0; + panic!("unexpected TM packet {unexpected_tm:?}"); + } + } + + pub fn verify_next_tc_is_handled_properly(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_handle_next_tc(time_stamp); + if let Err(e) = result { + panic!("unexpected error {:?}", e); + } + let result = result.unwrap(); + match result { + PusPacketHandlerResult::RequestHandled => (), + _ => panic!("unexpected result {result:?}"), + } + } + + pub fn verify_all_tcs_handled(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_handle_next_tc(time_stamp); + if let Err(e) = result { + panic!("unexpected error {:?}", e); + } + let result = result.unwrap(); + match result { + PusPacketHandlerResult::Empty => (), + _ => panic!("unexpected result {result:?}"), + } + } + + pub fn verify_next_reply_is_handled_properly(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_check_next_reply(time_stamp); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), HandlingStatus::HandledOne); + } + + pub fn verify_all_replies_handled(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_check_next_reply(time_stamp); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), HandlingStatus::Empty); + } + + pub fn add_tc(&mut self, tc: &PusTcCreator) { + self.request_id = Some(verification::RequestId::new(tc).into()); + let token = self.service.service_helper.verif_reporter_mut().add_tc(tc); + let accepted_token = self + .service + .service_helper + .verif_reporter() + .acceptance_success(self.service.service_helper.tm_sender(), token, &[0; 7]) + .expect("TC acceptance failed"); + self.service + .service_helper + .verif_reporter() + .check_next_was_added(accepted_token.request_id()); + let id = self.service.service_helper.id(); + self.service + .service_helper + .verif_reporter() + .check_next_is_acceptance_success(id, accepted_token.request_id()); + self.pus_packet_tx + .send(EcssTcAndToken::new(tc.to_vec().unwrap(), accepted_token)) + .unwrap(); + } + } + + #[test] + fn basic_request() { + let mut testbench = TargetedPusRequestTestbench::new_for_action( + TEST_COMPONENT_ID_0.id(), + TEST_COMPONENT_ID_1.id(), + ); + // Create a basic action request and verify forwarding. + let sp_header = SpHeader::new_from_apid(TEST_APID); + let sec_header = PusTcSecondaryHeader::new_simple(8, 128); + let action_id = 5_u32; + let mut app_data: [u8; 8] = [0; 8]; + app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_1.to_be_bytes()); + app_data[4..8].copy_from_slice(&action_id.to_be_bytes()); + let pus8_packet = PusTcCreator::new(sp_header, sec_header, &app_data, true); + testbench.add_tc(&pus8_packet); + let time_stamp: [u8; 7] = [0; 7]; + testbench.verify_next_tc_is_handled_properly(&time_stamp); + testbench.verify_all_tcs_handled(&time_stamp); + + testbench.verify_packet_started(); + + let possible_req = testbench.request_rx.try_recv(); + assert!(possible_req.is_ok()); + let req = possible_req.unwrap(); + if let CompositeRequest::Action(action_req) = req.message { + assert_eq!(action_req.action_id, action_id); + assert_eq!(action_req.variant, ActionRequestVariant::NoData); + let action_reply = PusActionReply::new(action_id, ActionReplyVariant::Completed); + testbench + .reply_tx + .send(GenericMessage::new(req.requestor_info, action_reply)) + .unwrap(); + } else { + panic!("unexpected request type"); + } + testbench.verify_next_reply_is_handled_properly(&time_stamp); + testbench.verify_all_replies_handled(&time_stamp); + + testbench.verify_packet_completed(); + testbench.verify_tm_empty(); + } + + #[test] + fn basic_request_routing_error() { + let mut testbench = TargetedPusRequestTestbench::new_for_action( + TEST_COMPONENT_ID_0.id(), + TEST_COMPONENT_ID_1.id(), + ); + // Create a basic action request and verify forwarding. + let sec_header = PusTcSecondaryHeader::new_simple(8, 128); + let action_id = 5_u32; + let mut app_data: [u8; 8] = [0; 8]; + // Invalid ID, routing should fail. + app_data[0..4].copy_from_slice(&0_u32.to_be_bytes()); + app_data[4..8].copy_from_slice(&action_id.to_be_bytes()); + let pus8_packet = PusTcCreator::new( + SpHeader::new_from_apid(TEST_APID), + sec_header, + &app_data, + true, + ); + testbench.add_tc(&pus8_packet); + let time_stamp: [u8; 7] = [0; 7]; + + let result = testbench.service.poll_and_handle_next_tc(&time_stamp); + assert!(result.is_err()); + // Verify the correct result and completion failure. + } + + #[test] + fn converter_action_req_no_data() { + let mut testbench = PusConverterTestbench::new( + TEST_COMPONENT_ID_0.raw(), + ActionRequestConverter::default(), + ); + let sec_header = PusTcSecondaryHeader::new_simple(8, 128); + let action_id = 5_u32; + let mut app_data: [u8; 8] = [0; 8]; + // Invalid ID, routing should fail. + app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_0.to_be_bytes()); + app_data[4..8].copy_from_slice(&action_id.to_be_bytes()); + let pus8_packet = PusTcCreator::new( + SpHeader::new_from_apid(TEST_APID), + sec_header, + &app_data, + true, + ); + let token = testbench.add_tc(&pus8_packet); + let result = testbench.convert(token, &[], TEST_APID, TEST_UNIQUE_ID_0); + assert!(result.is_ok()); + let (active_req, request) = result.unwrap(); + if let ActionRequestVariant::NoData = request.variant { + assert_eq!(request.action_id, action_id); + assert_eq!(active_req.action_id, action_id); + assert_eq!( + active_req.target_id(), + UniqueApidTargetId::new(TEST_APID, TEST_UNIQUE_ID_0).raw() + ); + assert_eq!( + active_req.token().request_id(), + testbench.request_id().unwrap() + ); + } else { + panic!("unexpected action request variant"); + } + } + + #[test] + fn converter_action_req_with_data() { + let mut testbench = + PusConverterTestbench::new(TEST_COMPONENT_ID_0.id(), ActionRequestConverter::default()); + let sec_header = PusTcSecondaryHeader::new_simple(8, 128); + let action_id = 5_u32; + let mut app_data: [u8; 16] = [0; 16]; + // Invalid ID, routing should fail. + app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_0.to_be_bytes()); + app_data[4..8].copy_from_slice(&action_id.to_be_bytes()); + for i in 0..8 { + app_data[i + 8] = i as u8; + } + let pus8_packet = PusTcCreator::new( + SpHeader::new_from_apid(TEST_APID), + sec_header, + &app_data, + true, + ); + let token = testbench.add_tc(&pus8_packet); + let result = testbench.convert(token, &[], TEST_APID, TEST_UNIQUE_ID_0); + assert!(result.is_ok()); + let (active_req, request) = result.unwrap(); + if let ActionRequestVariant::VecData(vec) = request.variant { + assert_eq!(request.action_id, action_id); + assert_eq!(active_req.action_id, action_id); + assert_eq!(vec, app_data[8..].to_vec()); + } else { + panic!("unexpected action request variant"); + } + } + + #[test] + fn reply_handling_completion_success() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let active_action_req = + ActivePusActionRequestStd::new_from_common_req(action_id, active_req); + let reply = PusActionReply::new(action_id, ActionReplyVariant::Completed); + let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply); + let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]); + assert!(result.is_ok()); + assert!(result.unwrap()); + testbench.verif_reporter.assert_full_completion_success( + TEST_COMPONENT_ID_0.id(), + req_id, + None, + ); + } + + #[test] + fn reply_handling_completion_failure() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let active_action_req = + ActivePusActionRequestStd::new_from_common_req(action_id, active_req); + let error_code = ResultU16::new(2, 3); + let reply = PusActionReply::new( + action_id, + ActionReplyVariant::CompletionFailed { + error_code, + params: None, + }, + ); + let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply); + let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]); + assert!(result.is_ok()); + assert!(result.unwrap()); + testbench.verif_reporter.assert_completion_failure( + TEST_COMPONENT_ID_0.into(), + req_id, + None, + error_code.raw() as u64, + ); + } + + #[test] + fn reply_handling_step_success() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let active_action_req = + ActivePusActionRequestStd::new_from_common_req(action_id, active_req); + let reply = PusActionReply::new(action_id, ActionReplyVariant::StepSuccess { step: 1 }); + let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply); + let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]); + assert!(result.is_ok()); + // Entry should not be removed, completion not done yet. + assert!(!result.unwrap()); + testbench.verif_reporter.check_next_was_added(req_id); + testbench + .verif_reporter + .check_next_is_acceptance_success(TEST_COMPONENT_ID_0.raw(), req_id); + testbench + .verif_reporter + .check_next_is_started_success(TEST_COMPONENT_ID_0.raw(), req_id); + testbench + .verif_reporter + .check_next_is_step_success(TEST_COMPONENT_ID_0.raw(), req_id, 1); + } + + #[test] + fn reply_handling_step_failure() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let active_action_req = + ActivePusActionRequestStd::new_from_common_req(action_id, active_req); + let error_code = ResultU16::new(2, 3); + let reply = PusActionReply::new( + action_id, + ActionReplyVariant::StepFailed { + error_code, + step: 1, + params: None, + }, + ); + let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply); + let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]); + assert!(result.is_ok()); + assert!(result.unwrap()); + testbench.verif_reporter.check_next_was_added(req_id); + testbench + .verif_reporter + .check_next_is_acceptance_success(TEST_COMPONENT_ID_0.id(), req_id); + testbench + .verif_reporter + .check_next_is_started_success(TEST_COMPONENT_ID_0.id(), req_id); + testbench.verif_reporter.check_next_is_step_failure( + TEST_COMPONENT_ID_0.id(), + req_id, + error_code.raw().into(), + ); + } + + #[test] + fn reply_handling_unrequested_reply() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_reply = PusActionReply::new(5_u32, ActionReplyVariant::Completed); + let unrequested_reply = + GenericMessage::new(MessageMetadata::new(10_u32, 15_u64), action_reply); + // Right now this function does not do a lot. We simply check that it does not panic or do + // weird stuff. + let result = testbench.handle_unrequested_reply(&unrequested_reply); + assert!(result.is_ok()); + } + + #[test] + fn reply_handling_reply_timeout() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_request) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let result = testbench.handle_request_timeout( + &ActivePusActionRequestStd::new_from_common_req(action_id, active_request), + &[], + ); + assert!(result.is_ok()); + testbench.verif_reporter.assert_completion_failure( + TEST_COMPONENT_ID_0.raw(), + req_id, + None, + tmtc_err::REQUEST_TIMEOUT.raw() as u64, + ); + } +} diff --git a/src/pus/mod.rs b/src/pus/mod.rs index 62deb27..eda55d4 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -1,3 +1,4 @@ +pub mod action; pub mod stack; pub mod test; @@ -13,9 +14,10 @@ use satrs::pus::verification::{ }; use satrs::pus::{ ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, - EcssTcReceiverCore, EcssTmSenderCore, EcssTmtcError, GenericConversionError, - GenericRoutingError, PusPacketHandlerResult, PusPacketHandlingError, PusReplyHandler, - PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory, + EcssTcInVecConverter, EcssTmSenderCore, EcssTmtcError, GenericConversionError, + GenericRoutingError, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, + PusPacketHandlingError, PusReplyHandler, PusRequestRouter, PusServiceHelper, + PusTcToRequestConverter, TcInMemory, }; use satrs::queue::GenericReceiveError; use satrs::request::{Apid, GenericMessage, MessageMetadata}; @@ -53,7 +55,7 @@ pub struct PusTcMpscRouter { // pub event_tc_sender: Sender, // pub sched_tc_sender: Sender, // pub hk_tc_sender: Sender, - // pub action_tc_sender: Sender, + pub action_tc_sender: Sender, // pub mode_tc_sender: Sender, } @@ -98,12 +100,10 @@ impl PusReceiver { tc_in_memory, token: Some(accepted_token.into()), })?, - // PusServiceId::Housekeeping => { - // self.pus_router.hk_tc_sender.send(EcssTcAndToken { - // tc_in_memory, - // token: Some(accepted_token.into()), - // })? - // } + PusServiceId::Action => self.pus_router.action_tc_sender.send(EcssTcAndToken { + tc_in_memory, + token: Some(accepted_token.into()), + })?, // PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken { // tc_in_memory, // token: Some(accepted_token.into()), @@ -161,7 +161,7 @@ impl PusReceiver { pub trait TargetedPusService { /// Returns [true] interface the packet handling is finished. - fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool; + fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus; fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus; fn check_for_request_timeouts(&mut self); } @@ -188,9 +188,6 @@ pub trait TargetedPusService { /// 3. [Self::check_for_request_timeouts] which checks for request timeouts, covering step 7. #[allow(dead_code)] pub struct PusTargetedRequestService< - TcReceiver: EcssTcReceiverCore, - TmSender: EcssTmSenderCore, - TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, RequestConverter: PusTcToRequestConverter, ReplyHandler: PusReplyHandler, @@ -199,8 +196,12 @@ pub struct PusTargetedRequestService< RequestType, ReplyType, > { - pub service_helper: - PusServiceHelper, + pub service_helper: PusServiceHelper< + MpscTcReceiver, + MpscTmAsVecSender, + EcssTcInVecConverter, + VerificationReporter, + >, pub request_router: GenericRequestRouter, pub request_converter: RequestConverter, pub active_request_map: ActiveRequestMap, @@ -209,11 +210,7 @@ pub struct PusTargetedRequestService< phantom: std::marker::PhantomData<(RequestType, ActiveRequestInfo, ReplyType)>, } -#[allow(dead_code)] impl< - TcReceiver: EcssTcReceiverCore, - TmSender: EcssTmSenderCore, - TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, RequestConverter: PusTcToRequestConverter, ReplyHandler: PusReplyHandler, @@ -223,9 +220,6 @@ impl< ReplyType, > PusTargetedRequestService< - TcReceiver, - TmSender, - TcInMemConverter, VerificationReporter, RequestConverter, ReplyHandler, @@ -239,9 +233,9 @@ where { pub fn new( service_helper: PusServiceHelper< - TcReceiver, - TmSender, - TcInMemConverter, + MpscTcReceiver, + MpscTmAsVecSender, + EcssTcInVecConverter, VerificationReporter, >, request_converter: RequestConverter, @@ -471,7 +465,7 @@ pub(crate) mod tests { use satrs::{ pus::{ verification::test_util::TestVerificationReporter, ActivePusRequestStd, - ActiveRequestMapProvider, EcssTcInVecConverter, MpscTcReceiver, + ActiveRequestMapProvider, }, request::UniqueApidTargetId, spacepackets::{ @@ -691,9 +685,6 @@ pub(crate) mod tests { ReplyType, > { pub service: PusTargetedRequestService< - MpscTcReceiver, - MpscTmAsVecSender, - EcssTcInVecConverter, TestVerificationReporter, RequestConverter, ReplyHandler, diff --git a/src/pus/stack.rs b/src/pus/stack.rs index f5aa1c1..eb1fb3f 100644 --- a/src/pus/stack.rs +++ b/src/pus/stack.rs @@ -1,11 +1,9 @@ -// use crate::pus::mode::ModeServiceWrapper; use crate::pus::test::TestCustomServiceWrapper; use crate::pus::HandlingStatus; use derive_new::new; -use satrs::{ - pus::{EcssTcInMemConverter, EcssTmSenderCore}, - spacepackets::time::{cds, TimeWriter}, -}; +use satrs::spacepackets::time::{cds, TimeWriter}; + +use super::{action::ActionServiceWrapper, TargetedPusService}; // use super::{ // action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper, @@ -14,18 +12,16 @@ use satrs::{ // }; #[derive(new)] -pub struct PusStack { - test_srv: TestCustomServiceWrapper, +pub struct PusStack { + test_srv: TestCustomServiceWrapper, // hk_srv_wrapper: HkServiceWrapper, // event_srv: EventServiceWrapper, - // action_srv_wrapper: ActionServiceWrapper, + action_srv_wrapper: ActionServiceWrapper, // schedule_srv: SchedulingServiceWrapper, // mode_srv: ModeServiceWrapper, } -impl - PusStack -{ +impl PusStack { pub fn periodic_operation(&mut self) { // Release all telecommands which reached their release time before calling the service // handlers. @@ -37,8 +33,8 @@ impl loop { let mut nothing_to_do = true; let mut is_srv_finished = - |tc_handling_done: bool, reply_handling_done: Option| { - if !tc_handling_done + |tc_handling_done: HandlingStatus, reply_handling_done: Option| { + if tc_handling_done == HandlingStatus::Empty || (reply_handling_done.is_some() && reply_handling_done.unwrap() == HandlingStatus::Empty) { @@ -48,13 +44,13 @@ impl is_srv_finished(self.test_srv.poll_and_handle_next_packet(&time_stamp), None); // is_srv_finished(self.schedule_srv.poll_and_handle_next_tc(&time_stamp), None); // is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None); - // is_srv_finished( - // self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp), - // Some( - // self.action_srv_wrapper - // .poll_and_handle_next_reply(&time_stamp), - // ), - // ); + is_srv_finished( + self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp), + Some( + self.action_srv_wrapper + .poll_and_handle_next_reply(&time_stamp), + ), + ); // is_srv_finished( // self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp), // Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)), diff --git a/src/pus/test.rs b/src/pus/test.rs index ec63005..3f46fdd 100644 --- a/src/pus/test.rs +++ b/src/pus/test.rs @@ -6,8 +6,8 @@ use ops_sat_rs::config::tmtc_err; use satrs::pus::test::PusService17TestHandler; use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; use satrs::pus::{ - EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, EcssTmSenderCore, MpscTcReceiver, - MpscTmAsVecSender, PusPacketHandlerResult, PusServiceHelper, PusTmAsVec, + EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, + PusPacketHandlerResult, PusServiceHelper, PusTmAsVec, }; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::PusPacket; @@ -15,11 +15,13 @@ use satrs::spacepackets::time::cds::CdsTime; use satrs::spacepackets::time::TimeWriter; use std::sync::mpsc; -pub fn create_test_service_dynamic( +use super::HandlingStatus; + +pub fn create_test_service( tm_funnel_tx: mpsc::Sender, // event_sender: mpsc::Sender, pus_test_rx: mpsc::Receiver, -) -> TestCustomServiceWrapper { +) -> TestCustomServiceWrapper { let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( PUS_TEST_SERVICE.id(), pus_test_rx, @@ -33,23 +35,22 @@ pub fn create_test_service_dynamic( } } -pub struct TestCustomServiceWrapper< - TmSender: EcssTmSenderCore, - TcInMemConverter: EcssTcInMemConverter, -> { - pub handler: - PusService17TestHandler, +pub struct TestCustomServiceWrapper { + pub handler: PusService17TestHandler< + MpscTcReceiver, + MpscTmAsVecSender, + EcssTcInVecConverter, + VerificationReporter, + >, // pub test_srv_event_sender: mpsc::Sender, } -impl - TestCustomServiceWrapper -{ - pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> bool { +impl TestCustomServiceWrapper { + pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> HandlingStatus { let res = self.handler.poll_and_handle_next_tc(time_stamp); if res.is_err() { warn!("PUS17 handler failed with error {:?}", res.unwrap_err()); - return true; + return HandlingStatus::Empty; } match res.unwrap() { PusPacketHandlerResult::RequestHandled => { @@ -114,10 +115,8 @@ impl .expect("Sending start failure verification failed"); } } - PusPacketHandlerResult::Empty => { - return true; - } + PusPacketHandlerResult::Empty => return HandlingStatus::Empty, } - false + HandlingStatus::HandledOne } }