diff --git a/Cargo.lock b/Cargo.lock index f8d0d52..d9d0993 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14,6 +14,15 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + [[package]] name = "allocator-api2" version = "0.2.16" @@ -35,6 +44,54 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" + +[[package]] +name = "anstyle-parse" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "array-init" version = "0.0.4" @@ -111,6 +168,12 @@ name = "cobs" version = "0.2.3" source = "git+https://github.com/robamu/cobs.rs.git?branch=all_features#c70a7f30fd00a7cbdb7666dec12b437977385d40" +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -202,6 +265,29 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" +[[package]] +name = "env_filter" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -239,6 +325,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -387,6 +479,7 @@ version = "0.0.1" dependencies = [ "chrono", "derive-new", + "env_logger", "fern", "lazy_static", "log", @@ -452,6 +545,35 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" + [[package]] name = "rustversion" version = "1.0.15" @@ -467,7 +589,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" version = "0.2.0-rc.0" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f" dependencies = [ "bus", "cobs", @@ -492,7 +614,7 @@ dependencies = [ [[package]] name = "satrs-mib" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f" dependencies = [ "csv", "satrs-mib-codegen", @@ -504,7 +626,7 @@ dependencies = [ [[package]] name = "satrs-mib-codegen" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f" dependencies = [ "proc-macro2", "quote", @@ -514,7 +636,7 @@ dependencies = [ [[package]] name = "satrs-shared" version = "0.1.3" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f" dependencies = [ "serde", "spacepackets", @@ -679,6 +801,12 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "version_check" version = "0.9.4" diff --git a/Cargo.toml b/Cargo.toml index 5a74807..0b0fd40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,3 +25,6 @@ features = ["test_util"] version = "0.1.1" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" branch = "main" + +[dev-dependencies] +env_logger = "0.11" diff --git a/pytmtc/common.py b/pytmtc/common.py index 6f56604..56b469f 100644 --- a/pytmtc/common.py +++ b/pytmtc/common.py @@ -5,11 +5,8 @@ import enum import struct -class Apid(enum.IntEnum): - SCHED = 1 - GENERIC_PUS = 2 - ACS = 3 - CFDP = 4 +EXPERIMENT_ID = 278 +EXPERIMENT_APID = 1024 + EXPERIMENT_ID class EventSeverity(enum.IntEnum): diff --git a/pytmtc/main.py b/pytmtc/main.py index a3e0caf..9fe494c 100755 --- a/pytmtc/main.py +++ b/pytmtc/main.py @@ -47,7 +47,7 @@ from tmtccmd.util.obj_id import ObjectIdDictT import pus_tc -from common import Apid, EventU32 +from common import EXPERIMENT_APID, EventU32 _LOGGER = logging.getLogger() @@ -64,8 +64,7 @@ class SatRsConfigHook(HookBase): assert self.cfg_path is not None packet_id_list = [] - for apid in Apid: - packet_id_list.append(PacketId(PacketType.TM, True, apid)) + packet_id_list.append(PacketId(PacketType.TM, True, EXPERIMENT_APID)) cfg = create_com_interface_cfg_default( com_if_key=com_if_key, json_cfg_path=self.cfg_path, @@ -181,7 +180,7 @@ class TcHandler(TcHandlerBase): tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, seq_cnt_provider=seq_count_provider, pus_verificator=self.verif_wrapper.pus_verificator, - default_pus_apid=None, + default_pus_apid=EXPERIMENT_APID, ) def send_cb(self, send_params: SendCbParams): diff --git a/pytmtc/pus_tc.py b/pytmtc/pus_tc.py index b0febdc..3a8e83d 100644 --- a/pytmtc/pus_tc.py +++ b/pytmtc/pus_tc.py @@ -10,7 +10,6 @@ from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice -from common import AcsId, Apid _LOGGER = logging.getLogger(__name__) @@ -66,14 +65,6 @@ def create_cmd_definition_tree() -> CmdTreeNode: ) root_node.add_child(scheduler_node) - acs_node = CmdTreeNode("acs", "ACS Subsystem Node") - mgm_node = CmdTreeNode("mgms", "MGM devices node") - mgm_node.add_child(mode_node) - mgm_node.add_child(hk_node) - - acs_node.add_child(mgm_node) - root_node.add_child(acs_node) - return root_node @@ -87,14 +78,10 @@ def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str): assert len(cmd_path_list) >= 2 if cmd_path_list[1] == "ping": q.add_log_cmd("Sending PUS ping telecommand") - return q.add_pus_tc( - PusTelecommand(apid=Apid.GENERIC_PUS, service=17, subservice=1) - ) + return q.add_pus_tc(PusTelecommand(service=17, subservice=1)) elif cmd_path_list[1] == "trigger_event": q.add_log_cmd("Triggering test event") - return q.add_pus_tc( - PusTelecommand(apid=Apid.GENERIC_PUS, service=17, subservice=128) - ) + return q.add_pus_tc(PusTelecommand(service=17, subservice=128)) if cmd_path_list[0] == "scheduler": assert len(cmd_path_list) >= 2 if cmd_path_list[1] == "schedule_ping_10_secs_ahead": @@ -106,27 +93,10 @@ def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str): create_time_tagged_cmd( time_stamp, PusTelecommand(service=17, subservice=1), - apid=Apid.SCHED, ) ) if cmd_path_list[0] == "acs": assert len(cmd_path_list) >= 2 - if cmd_path_list[1] == "mgms": - assert len(cmd_path_list) >= 3 - if cmd_path_list[2] == "hk": - if cmd_path_list[3] == "one_shot_hk": - q.add_log_cmd("Sending HK one shot request") - # TODO: Fix - # q.add_pus_tc( - # create_request_one_hk_command( - # make_addressable_id(Apid.ACS, AcsId.MGM_SET) - # ) - # ) - if cmd_path_list[2] == "mode": - if cmd_path_list[3] == "set_mode": - handle_set_mode_cmd( - q, "MGM 0", cmd_path_list[4], Apid.ACS, AcsId.MGM_0 - ) def handle_set_mode_cmd( diff --git a/src/config.rs b/src/config.rs index 623f8f1..ef391a7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,32 +1,18 @@ use lazy_static::lazy_static; use num_enum::{IntoPrimitive, TryFromPrimitive}; -use satrs::spacepackets::{PacketId, PacketType}; use satrs_mib::res_code::ResultU16Info; use satrs_mib::resultcode; -use std::{collections::HashSet, net::Ipv4Addr}; -use strum::IntoEnumIterator; +use std::env; +use std::net::Ipv4Addr; +use std::path::{Path, PathBuf}; pub const STOP_FILE_NAME: &str = "stop-experiment"; +pub const HOME_FOLER_EXPERIMENT: &str = "/home/exp278"; pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const SERVER_PORT: u16 = 7301; - -lazy_static! { - pub static ref PACKET_ID_VALIDATOR: HashSet = { - let mut set = HashSet::new(); - for id in components::Apid::iter() { - set.insert(PacketId::new(PacketType::Tc, true, id as u16)); - } - set - }; - pub static ref APID_VALIDATOR: HashSet = { - let mut set = HashSet::new(); - for id in components::Apid::iter() { - set.insert(id as u16); - } - set - }; -} +pub const EXPERIMENT_ID: u32 = 278; +pub const EXPERIMENT_APID: u16 = 1024 + EXPERIMENT_ID as u16; #[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)] #[repr(u8)] @@ -40,6 +26,20 @@ pub enum GroupId { Tmtc = 0, Hk = 1, Mode = 2, + Action = 3, +} + +lazy_static! { + pub static ref HOME_PATH: PathBuf = { + let home_path_default = env::var("HOME").expect("HOME env variable not set"); + let mut home_path = PathBuf::new(); + home_path.push(if Path::new(HOME_FOLER_EXPERIMENT).exists() { + HOME_FOLER_EXPERIMENT + } else { + &home_path_default + }); + home_path + }; } pub mod tmtc_err { @@ -78,47 +78,47 @@ pub mod tmtc_err { ]; } +pub mod action_err { + use super::*; + use satrs::res_code::ResultU16; + + #[resultcode] + pub const INVALID_ACTION_ID: ResultU16 = ResultU16::new(GroupId::Action as u8, 0); + + pub const ACTION_RESULTS: &[ResultU16Info] = &[INVALID_ACTION_ID_EXT]; +} + pub mod components { use satrs::request::UniqueApidTargetId; - use strum::EnumIter; - #[derive(Copy, Clone, PartialEq, Eq, EnumIter)] - pub enum Apid { - Sched = 1, - GenericPus = 2, - Cfdp = 4, - } + use super::EXPERIMENT_APID; // Component IDs for components with the PUS APID. #[derive(Copy, Clone, PartialEq, Eq)] - pub enum PusId { - PusEventManagement = 0, - PusRouting = 1, - PusTest = 2, - PusAction = 3, - PusMode = 4, - PusHk = 5, - } - - #[derive(Copy, Clone, PartialEq, Eq)] - pub enum AcsId { - Mgm0 = 0, + pub enum UniqueId { + Controller = 0, + PusEventManagement = 1, + PusRouting = 2, + PusTest = 3, + PusAction = 4, + PusMode = 5, + PusHk = 6, } + pub const CONTROLLER_ID: UniqueApidTargetId = + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::Controller as u32); pub const PUS_ACTION_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusAction as u32); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusAction as u32); pub const PUS_EVENT_MANAGEMENT: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, 0); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusEventManagement as u32); pub const PUS_ROUTING_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusRouting as u32); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusRouting as u32); pub const PUS_TEST_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusTest as u32); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusTest as u32); pub const PUS_MODE_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusMode as u32); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusMode as u32); pub const PUS_HK_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusHk as u32); - pub const PUS_SCHED_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::Sched as u16, 0); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusHk as u32); } pub mod tasks { diff --git a/src/controller.rs b/src/controller.rs new file mode 100644 index 0000000..f69d76b --- /dev/null +++ b/src/controller.rs @@ -0,0 +1,128 @@ +use num_enum::TryFromPrimitive; +use satrs::{ + action::ActionRequest, + pus::action::{ActionReplyVariant, PusActionReply}, + request::{GenericMessage, MessageMetadata}, +}; +use std::{ + env::temp_dir, + path::{Path, PathBuf}, + sync::{atomic::AtomicBool, mpsc, Arc}, +}; + +use ops_sat_rs::config::{action_err::INVALID_ACTION_ID, HOME_PATH, STOP_FILE_NAME}; + +use crate::requests::CompositeRequest; + +#[derive(Debug, Clone, Copy, TryFromPrimitive)] +#[repr(u32)] +pub enum ActionId { + StopExperiment = 1, +} + +pub struct ExperimentController { + pub composite_request_rx: mpsc::Receiver>, + pub action_reply_tx: mpsc::Sender>, + pub stop_signal: Arc, + home_path_stop_file: PathBuf, + tmp_path_stop_file: PathBuf, +} + +impl ExperimentController { + pub fn new( + composite_request_rx: mpsc::Receiver>, + action_reply_tx: mpsc::Sender>, + stop_signal: Arc, + ) -> Self { + let mut home_path_stop_file = PathBuf::new(); + home_path_stop_file.push(HOME_PATH.as_path()); + home_path_stop_file.push(STOP_FILE_NAME); + let mut tmp_path_stop_file = temp_dir(); + tmp_path_stop_file.push(STOP_FILE_NAME); + Self { + composite_request_rx, + action_reply_tx, + stop_signal, + home_path_stop_file, + tmp_path_stop_file, + } + } +} + +impl ExperimentController { + pub fn perform_operation(&mut self) { + match self.composite_request_rx.try_recv() { + Ok(msg) => match msg.message { + CompositeRequest::Hk(_) => { + log::warn!("hk request handling unimplemented") + } + CompositeRequest::Action(action_req) => { + self.handle_action_request(msg.requestor_info, action_req); + } + }, + Err(e) => { + if e != mpsc::TryRecvError::Empty { + log::error!("composite request rx error: {:?}", e); + } + } + } + self.check_stop_file(); + } + + pub fn handle_action_request(&mut self, requestor: MessageMetadata, action_req: ActionRequest) { + let action_id = ActionId::try_from(action_req.action_id); + if action_id.is_err() { + let result = self.action_reply_tx.send(GenericMessage::new_action_reply( + requestor, + action_req.action_id, + ActionReplyVariant::CompletionFailed { + error_code: INVALID_ACTION_ID, + params: None, + }, + )); + if result.is_err() { + log::error!("sending action reply failed"); + } + return; + } + let action_id = action_id.unwrap(); + match action_id { + ActionId::StopExperiment => { + self.stop_signal + .store(true, std::sync::atomic::Ordering::Relaxed); + let result = self.action_reply_tx.send(GenericMessage::new_action_reply( + requestor, + action_req.action_id, + ActionReplyVariant::Completed, + )); + if result.is_err() { + log::error!("sending action reply failed"); + } + } + } + } + + pub fn check_stop_file(&self) { + let check_at_path = |path: &Path| { + if path.exists() { + log::warn!( + "Detected stop file name at {:?}. Initiating experiment shutdown", + path + ); + // By default, clear the stop file. + let result = std::fs::remove_file(path); + if result.is_err() { + log::error!( + "failed to remove stop file at {:?}: {}", + path, + result.unwrap_err() + ); + } + self.stop_signal + .store(true, std::sync::atomic::Ordering::Relaxed); + } + }; + check_at_path(self.tmp_path_stop_file.as_path()); + check_at_path(self.home_path_stop_file.as_path()); + } +} diff --git a/src/interface/tcp.rs b/src/interface/tcp.rs index 3a184e6..f722dfa 100644 --- a/src/interface/tcp.rs +++ b/src/interface/tcp.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashSet, VecDeque}, + collections::VecDeque, sync::{atomic::AtomicBool, Arc, Mutex}, time::Duration, }; @@ -86,7 +86,7 @@ impl HandledConnectionHandler for ConnectionFinishedHandler { pub type TcpServerType = TcpSpacepacketsServer< SyncTcpTmSource, CcsdsDistributor, MpscErrorType>, - HashSet, + Vec, ConnectionFinishedHandler, (), CcsdsError, @@ -116,7 +116,7 @@ impl< cfg: ServerConfig, tm_source: SyncTcpTmSource, tc_receiver: CcsdsDistributor, MpscErrorType>, - packet_id_lookup: HashSet, + packet_id_lookup: Vec, stop_signal: Arc, ) -> Result { Ok(Self { diff --git a/src/main.rs b/src/main.rs index 7d6a1af..db66e01 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,22 +7,21 @@ use std::{ use log::info; use ops_sat_rs::config::{ - tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT, -}; -use ops_sat_rs::config::{ + components::CONTROLLER_ID, tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, - STOP_FILE_NAME, + EXPERIMENT_APID, }; +use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use satrs::{ hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, + spacepackets::PacketId, tmtc::CcsdsDistributor, }; -use crate::pus::stack::PusStack; -use crate::pus::test::create_test_service_dynamic; use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::tmtc::tm_funnel::TmFunnelDynamic; use crate::tmtc::TcSourceTaskDynamic; +use crate::{controller::ExperimentController, pus::test::create_test_service}; use crate::{ interface::tcp::{SyncTcpTmSource, TcpTask}, interface::udp::{DynamicUdpTmHandler, UdpTmtcServer}, @@ -30,7 +29,12 @@ use crate::{ tmtc::ccsds::CcsdsReceiver, tmtc::PusTcSourceProviderDynamic, }; +use crate::{ + pus::{action::create_action_service, stack::PusStack}, + requests::GenericRequestRouter, +}; +mod controller; mod interface; mod logger; mod pus; @@ -53,23 +57,31 @@ fn main() { // let (pus_event_tx, pus_event_rx) = mpsc::channel(); // let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); // let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); - // let (pus_action_tx, pus_action_rx) = mpsc::channel(); + let (pus_action_tx, pus_action_rx) = mpsc::channel(); // let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); - // let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); + let (pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); // let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); // let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel(); + let (controller_composite_tx, controller_composite_rx) = mpsc::channel(); + // let (controller_action_reply_tx, controller_action_reply_rx) = mpsc::channel(); + + // Some request are targetable. This map is used to retrieve sender handles based on a target ID. + let mut request_map = GenericRequestRouter::default(); + request_map + .composite_router_map + .insert(CONTROLLER_ID.id(), controller_composite_tx); let pus_router = PusTcMpscRouter { test_tc_sender: pus_test_tx, // event_tc_sender: pus_event_tx, // sched_tc_sender: pus_sched_tx, // hk_tc_sender: pus_hk_tx, - // action_tc_sender: pus_action_tx, + action_tc_sender: pus_action_tx, // mode_tc_sender: pus_mode_tx, }; - let pus_test_service = create_test_service_dynamic( + let pus_test_service = create_test_service( tm_funnel_tx.clone(), // event_handler.clone_event_sender(), pus_test_rx, @@ -83,12 +95,12 @@ fn main() { // // let pus_event_service = // create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx); - // let pus_action_service = create_action_service_dynamic( - // tm_funnel_tx.clone(), - // pus_action_rx, - // request_map.clone(), - // pus_action_reply_rx, - // ); + let pus_action_service = create_action_service( + tm_funnel_tx.clone(), + pus_action_rx, + request_map.clone(), + pus_action_reply_rx, + ); // let pus_hk_service = create_hk_service_dynamic( // tm_funnel_tx.clone(), // pus_hk_rx, @@ -105,7 +117,7 @@ fn main() { pus_test_service, // pus_hk_service, // pus_event_service, - // pus_action_service, + pus_action_service, // pus_scheduler_service, // pus_mode_service, ); @@ -135,7 +147,7 @@ fn main() { tcp_server_cfg, sync_tm_tcp_source.clone(), tcp_ccsds_distributor, - PACKET_ID_VALIDATOR.clone(), + vec![PacketId::new_for_tc(true, EXPERIMENT_APID)], stop_signal.clone(), ) .expect("tcp server creation failed"); @@ -147,17 +159,19 @@ fn main() { stop_signal.clone(), ); + let mut controller = ExperimentController::new( + controller_composite_rx, + pus_action_reply_tx, + stop_signal.clone(), + ); + info!("Starting CTRL task"); let ctrl_stop_signal = stop_signal.clone(); let jh_ctrl_thread = thread::Builder::new() .name("ops-sat ctrl".to_string()) .spawn(move || loop { - if std::path::Path::new(STOP_FILE_NAME).exists() { - log::warn!( - "Detected stop file name at {}. Initiating experiment shutdown", - STOP_FILE_NAME - ); - ctrl_stop_signal.store(true, std::sync::atomic::Ordering::Relaxed); + controller.perform_operation(); + if ctrl_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } thread::sleep(Duration::from_millis(FREQ_MS_CTRL)); diff --git a/src/pus/action.rs b/src/pus/action.rs new file mode 100644 index 0000000..409f330 --- /dev/null +++ b/src/pus/action.rs @@ -0,0 +1,715 @@ +use log::{error, warn}; +use ops_sat_rs::config::components::PUS_ACTION_SERVICE; +use ops_sat_rs::config::tmtc_err; +use satrs::action::{ActionRequest, ActionRequestVariant}; +use satrs::params::WritableToBeBytes; +use satrs::pus::action::{ + ActionReplyVariant, ActivePusActionRequestStd, DefaultActiveActionRequestMap, PusActionReply, +}; +use satrs::pus::verification::{ + FailParams, FailParamsWithStep, TcStateAccepted, TcStateStarted, VerificationReporter, + VerificationReportingProvider, VerificationToken, +}; +use satrs::pus::{ + ActiveRequestProvider, EcssTcAndToken, EcssTcInVecConverter, EcssTmSenderCore, EcssTmtcError, + GenericConversionError, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, + PusTcToRequestConverter, PusTmAsVec, +}; +use satrs::request::{GenericMessage, UniqueApidTargetId}; +use satrs::spacepackets::ecss::tc::PusTcReader; +use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket}; +use std::sync::mpsc; +use std::time::Duration; + +use crate::requests::GenericRequestRouter; + +use super::{ + create_verification_reporter, generic_pus_request_timeout_handler, HandlingStatus, + PusTargetedRequestService, TargetedPusService, +}; + +pub struct ActionReplyHandler { + fail_data_buf: [u8; 128], +} + +impl Default for ActionReplyHandler { + fn default() -> Self { + Self { + fail_data_buf: [0; 128], + } + } +} + +impl PusReplyHandler for ActionReplyHandler { + type Error = EcssTmtcError; + + fn handle_unrequested_reply( + &mut self, + reply: &GenericMessage, + _tm_sender: &impl EcssTmSenderCore, + ) -> Result<(), Self::Error> { + warn!("received unexpected reply for service 8: {reply:?}"); + Ok(()) + } + + fn handle_reply( + &mut self, + reply: &GenericMessage, + active_request: &ActivePusActionRequestStd, + tm_sender: &(impl EcssTmSenderCore + ?Sized), + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result { + let verif_token: VerificationToken = active_request + .token() + .try_into() + .expect("invalid token state"); + let remove_entry = match &reply.message.variant { + ActionReplyVariant::CompletionFailed { error_code, params } => { + let mut fail_data_len = 0; + if let Some(params) = params { + fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?; + } + verification_handler.completion_failure( + tm_sender, + verif_token, + FailParams::new(time_stamp, error_code, &self.fail_data_buf[..fail_data_len]), + )?; + true + } + ActionReplyVariant::StepFailed { + error_code, + step, + params, + } => { + let mut fail_data_len = 0; + if let Some(params) = params { + fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?; + } + verification_handler.step_failure( + tm_sender, + verif_token, + FailParamsWithStep::new( + time_stamp, + &EcssEnumU16::new(*step), + error_code, + &self.fail_data_buf[..fail_data_len], + ), + )?; + true + } + ActionReplyVariant::Completed => { + verification_handler.completion_success(tm_sender, verif_token, time_stamp)?; + true + } + ActionReplyVariant::StepSuccess { step } => { + verification_handler.step_success( + tm_sender, + &verif_token, + time_stamp, + EcssEnumU16::new(*step), + )?; + false + } + _ => false, + }; + Ok(remove_entry) + } + + fn handle_request_timeout( + &mut self, + active_request: &ActivePusActionRequestStd, + tm_sender: &impl EcssTmSenderCore, + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result<(), Self::Error> { + generic_pus_request_timeout_handler( + tm_sender, + active_request, + verification_handler, + time_stamp, + "action", + ) + } +} + +#[derive(Default)] +pub struct ActionRequestConverter {} + +impl PusTcToRequestConverter for ActionRequestConverter { + type Error = GenericConversionError; + + fn convert( + &mut self, + token: VerificationToken, + tc: &PusTcReader, + tm_sender: &(impl EcssTmSenderCore + ?Sized), + verif_reporter: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result<(ActivePusActionRequestStd, ActionRequest), Self::Error> { + let subservice = tc.subservice(); + let user_data = tc.user_data(); + if user_data.len() < 8 { + verif_reporter + .start_failure( + tm_sender, + token, + FailParams::new_no_fail_data(time_stamp, &tmtc_err::NOT_ENOUGH_APP_DATA), + ) + .expect("Sending start failure failed"); + return Err(GenericConversionError::NotEnoughAppData { + expected: 8, + found: user_data.len(), + }); + } + let target_id_and_apid = UniqueApidTargetId::from_pus_tc(tc).unwrap(); + let action_id = u32::from_be_bytes(user_data[4..8].try_into().unwrap()); + if subservice == 128 { + let req_variant = if user_data.len() == 8 { + ActionRequestVariant::NoData + } else { + ActionRequestVariant::VecData(user_data[8..].to_vec()) + }; + Ok(( + ActivePusActionRequestStd::new( + action_id, + target_id_and_apid.into(), + token.into(), + Duration::from_secs(30), + ), + ActionRequest::new(action_id, req_variant), + )) + } else { + verif_reporter + .start_failure( + tm_sender, + token, + FailParams::new_no_fail_data(time_stamp, &tmtc_err::INVALID_PUS_SUBSERVICE), + ) + .expect("Sending start failure failed"); + Err(GenericConversionError::InvalidSubservice(subservice)) + } + } +} + +pub fn create_action_service( + tm_funnel_tx: mpsc::Sender, + pus_action_rx: mpsc::Receiver, + action_router: GenericRequestRouter, + reply_receiver: mpsc::Receiver>, +) -> ActionServiceWrapper { + let action_request_handler = PusTargetedRequestService::new( + PusServiceHelper::new( + PUS_ACTION_SERVICE.id(), + pus_action_rx, + tm_funnel_tx, + create_verification_reporter(PUS_ACTION_SERVICE.id(), PUS_ACTION_SERVICE.apid), + EcssTcInVecConverter::default(), + ), + ActionRequestConverter::default(), + DefaultActiveActionRequestMap::default(), + ActionReplyHandler::default(), + action_router, + reply_receiver, + ); + ActionServiceWrapper { + service: action_request_handler, + } +} + +pub struct ActionServiceWrapper { + pub(crate) service: PusTargetedRequestService< + VerificationReporter, + ActionRequestConverter, + ActionReplyHandler, + DefaultActiveActionRequestMap, + ActivePusActionRequestStd, + ActionRequest, + PusActionReply, + >, +} + +impl TargetedPusService for ActionServiceWrapper { + /// Returns [true] if the packet handling is finished. + fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { + match self.service.poll_and_handle_next_tc(time_stamp) { + Ok(result) => match result { + PusPacketHandlerResult::RequestHandled => {} + PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { + warn!("PUS 8 partial packet handling success: {e:?}") + } + PusPacketHandlerResult::CustomSubservice(invalid, _) => { + warn!("PUS 8 invalid subservice {invalid}"); + } + PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { + warn!("PUS 8 subservice {subservice} not implemented"); + } + PusPacketHandlerResult::Empty => return HandlingStatus::Empty, + }, + Err(error) => { + error!("PUS packet handling error: {error:?}"); + return HandlingStatus::Empty; + } + } + HandlingStatus::HandledOne + } + + fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus { + // This only fails if all senders disconnected. Treat it like an empty queue. + self.service + .poll_and_check_next_reply(time_stamp) + .unwrap_or_else(|e| { + warn!("PUS 8: Handling reply failed with error {e:?}"); + HandlingStatus::Empty + }) + } + + fn check_for_request_timeouts(&mut self) { + self.service.check_for_request_timeouts(); + } +} + +#[cfg(test)] +mod tests { + use satrs::pus::test_util::{ + TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1, + }; + use satrs::pus::verification; + use satrs::pus::verification::test_util::TestVerificationReporter; + use satrs::request::MessageMetadata; + use satrs::ComponentId; + use satrs::{ + res_code::ResultU16, + spacepackets::{ + ecss::{ + tc::{PusTcCreator, PusTcSecondaryHeader}, + tm::PusTmReader, + WritablePusPacket, + }, + SpHeader, + }, + }; + + use crate::{ + pus::tests::{PusConverterTestbench, ReplyHandlerTestbench, TargetedPusRequestTestbench}, + requests::CompositeRequest, + }; + + use super::*; + + impl + TargetedPusRequestTestbench< + ActionRequestConverter, + ActionReplyHandler, + DefaultActiveActionRequestMap, + ActivePusActionRequestStd, + ActionRequest, + PusActionReply, + > + { + pub fn new_for_action(owner_id: ComponentId, target_id: ComponentId) -> Self { + let _ = env_logger::builder().is_test(true).try_init(); + let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); + let (pus_action_tx, pus_action_rx) = mpsc::channel(); + let (action_reply_tx, action_reply_rx) = mpsc::channel(); + let (action_req_tx, action_req_rx) = mpsc::channel(); + let verif_reporter = TestVerificationReporter::new(owner_id); + let mut generic_req_router = GenericRequestRouter::default(); + generic_req_router + .composite_router_map + .insert(target_id, action_req_tx); + Self { + service: PusTargetedRequestService::new( + PusServiceHelper::new( + owner_id, + pus_action_rx, + tm_funnel_tx.clone(), + verif_reporter, + EcssTcInVecConverter::default(), + ), + ActionRequestConverter::default(), + DefaultActiveActionRequestMap::default(), + ActionReplyHandler::default(), + generic_req_router, + action_reply_rx, + ), + request_id: None, + pus_packet_tx: pus_action_tx, + tm_funnel_rx, + reply_tx: action_reply_tx, + request_rx: action_req_rx, + } + } + + pub fn verify_packet_started(&self) { + self.service + .service_helper + .common + .verif_reporter + .check_next_is_started_success( + self.service.service_helper.id(), + self.request_id.expect("request ID not set").into(), + ); + } + + pub fn verify_packet_completed(&self) { + self.service + .service_helper + .common + .verif_reporter + .check_next_is_completion_success( + self.service.service_helper.id(), + self.request_id.expect("request ID not set").into(), + ); + } + + pub fn verify_tm_empty(&self) { + let packet = self.tm_funnel_rx.try_recv(); + if let Err(mpsc::TryRecvError::Empty) = packet { + } else { + let tm = packet.unwrap(); + let unexpected_tm = PusTmReader::new(&tm.packet, 7).unwrap().0; + panic!("unexpected TM packet {unexpected_tm:?}"); + } + } + + pub fn verify_next_tc_is_handled_properly(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_handle_next_tc(time_stamp); + if let Err(e) = result { + panic!("unexpected error {:?}", e); + } + let result = result.unwrap(); + match result { + PusPacketHandlerResult::RequestHandled => (), + _ => panic!("unexpected result {result:?}"), + } + } + + pub fn verify_all_tcs_handled(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_handle_next_tc(time_stamp); + if let Err(e) = result { + panic!("unexpected error {:?}", e); + } + let result = result.unwrap(); + match result { + PusPacketHandlerResult::Empty => (), + _ => panic!("unexpected result {result:?}"), + } + } + + pub fn verify_next_reply_is_handled_properly(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_check_next_reply(time_stamp); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), HandlingStatus::HandledOne); + } + + pub fn verify_all_replies_handled(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_check_next_reply(time_stamp); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), HandlingStatus::Empty); + } + + pub fn add_tc(&mut self, tc: &PusTcCreator) { + self.request_id = Some(verification::RequestId::new(tc).into()); + let token = self.service.service_helper.verif_reporter_mut().add_tc(tc); + let accepted_token = self + .service + .service_helper + .verif_reporter() + .acceptance_success(self.service.service_helper.tm_sender(), token, &[0; 7]) + .expect("TC acceptance failed"); + self.service + .service_helper + .verif_reporter() + .check_next_was_added(accepted_token.request_id()); + let id = self.service.service_helper.id(); + self.service + .service_helper + .verif_reporter() + .check_next_is_acceptance_success(id, accepted_token.request_id()); + self.pus_packet_tx + .send(EcssTcAndToken::new(tc.to_vec().unwrap(), accepted_token)) + .unwrap(); + } + } + + #[test] + fn basic_request() { + let mut testbench = TargetedPusRequestTestbench::new_for_action( + TEST_COMPONENT_ID_0.id(), + TEST_COMPONENT_ID_1.id(), + ); + // Create a basic action request and verify forwarding. + let sp_header = SpHeader::new_from_apid(TEST_APID); + let sec_header = PusTcSecondaryHeader::new_simple(8, 128); + let action_id = 5_u32; + let mut app_data: [u8; 8] = [0; 8]; + app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_1.to_be_bytes()); + app_data[4..8].copy_from_slice(&action_id.to_be_bytes()); + let pus8_packet = PusTcCreator::new(sp_header, sec_header, &app_data, true); + testbench.add_tc(&pus8_packet); + let time_stamp: [u8; 7] = [0; 7]; + testbench.verify_next_tc_is_handled_properly(&time_stamp); + testbench.verify_all_tcs_handled(&time_stamp); + + testbench.verify_packet_started(); + + let possible_req = testbench.request_rx.try_recv(); + assert!(possible_req.is_ok()); + let req = possible_req.unwrap(); + if let CompositeRequest::Action(action_req) = req.message { + assert_eq!(action_req.action_id, action_id); + assert_eq!(action_req.variant, ActionRequestVariant::NoData); + let action_reply = PusActionReply::new(action_id, ActionReplyVariant::Completed); + testbench + .reply_tx + .send(GenericMessage::new(req.requestor_info, action_reply)) + .unwrap(); + } else { + panic!("unexpected request type"); + } + testbench.verify_next_reply_is_handled_properly(&time_stamp); + testbench.verify_all_replies_handled(&time_stamp); + + testbench.verify_packet_completed(); + testbench.verify_tm_empty(); + } + + #[test] + fn basic_request_routing_error() { + let mut testbench = TargetedPusRequestTestbench::new_for_action( + TEST_COMPONENT_ID_0.id(), + TEST_COMPONENT_ID_1.id(), + ); + // Create a basic action request and verify forwarding. + let sec_header = PusTcSecondaryHeader::new_simple(8, 128); + let action_id = 5_u32; + let mut app_data: [u8; 8] = [0; 8]; + // Invalid ID, routing should fail. + app_data[0..4].copy_from_slice(&0_u32.to_be_bytes()); + app_data[4..8].copy_from_slice(&action_id.to_be_bytes()); + let pus8_packet = PusTcCreator::new( + SpHeader::new_from_apid(TEST_APID), + sec_header, + &app_data, + true, + ); + testbench.add_tc(&pus8_packet); + let time_stamp: [u8; 7] = [0; 7]; + + let result = testbench.service.poll_and_handle_next_tc(&time_stamp); + assert!(result.is_err()); + // Verify the correct result and completion failure. + } + + #[test] + fn converter_action_req_no_data() { + let mut testbench = PusConverterTestbench::new( + TEST_COMPONENT_ID_0.raw(), + ActionRequestConverter::default(), + ); + let sec_header = PusTcSecondaryHeader::new_simple(8, 128); + let action_id = 5_u32; + let mut app_data: [u8; 8] = [0; 8]; + // Invalid ID, routing should fail. + app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_0.to_be_bytes()); + app_data[4..8].copy_from_slice(&action_id.to_be_bytes()); + let pus8_packet = PusTcCreator::new( + SpHeader::new_from_apid(TEST_APID), + sec_header, + &app_data, + true, + ); + let token = testbench.add_tc(&pus8_packet); + let result = testbench.convert(token, &[], TEST_APID, TEST_UNIQUE_ID_0); + assert!(result.is_ok()); + let (active_req, request) = result.unwrap(); + if let ActionRequestVariant::NoData = request.variant { + assert_eq!(request.action_id, action_id); + assert_eq!(active_req.action_id, action_id); + assert_eq!( + active_req.target_id(), + UniqueApidTargetId::new(TEST_APID, TEST_UNIQUE_ID_0).raw() + ); + assert_eq!( + active_req.token().request_id(), + testbench.request_id().unwrap() + ); + } else { + panic!("unexpected action request variant"); + } + } + + #[test] + fn converter_action_req_with_data() { + let mut testbench = + PusConverterTestbench::new(TEST_COMPONENT_ID_0.id(), ActionRequestConverter::default()); + let sec_header = PusTcSecondaryHeader::new_simple(8, 128); + let action_id = 5_u32; + let mut app_data: [u8; 16] = [0; 16]; + // Invalid ID, routing should fail. + app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_0.to_be_bytes()); + app_data[4..8].copy_from_slice(&action_id.to_be_bytes()); + for i in 0..8 { + app_data[i + 8] = i as u8; + } + let pus8_packet = PusTcCreator::new( + SpHeader::new_from_apid(TEST_APID), + sec_header, + &app_data, + true, + ); + let token = testbench.add_tc(&pus8_packet); + let result = testbench.convert(token, &[], TEST_APID, TEST_UNIQUE_ID_0); + assert!(result.is_ok()); + let (active_req, request) = result.unwrap(); + if let ActionRequestVariant::VecData(vec) = request.variant { + assert_eq!(request.action_id, action_id); + assert_eq!(active_req.action_id, action_id); + assert_eq!(vec, app_data[8..].to_vec()); + } else { + panic!("unexpected action request variant"); + } + } + + #[test] + fn reply_handling_completion_success() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let active_action_req = + ActivePusActionRequestStd::new_from_common_req(action_id, active_req); + let reply = PusActionReply::new(action_id, ActionReplyVariant::Completed); + let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply); + let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]); + assert!(result.is_ok()); + assert!(result.unwrap()); + testbench.verif_reporter.assert_full_completion_success( + TEST_COMPONENT_ID_0.id(), + req_id, + None, + ); + } + + #[test] + fn reply_handling_completion_failure() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let active_action_req = + ActivePusActionRequestStd::new_from_common_req(action_id, active_req); + let error_code = ResultU16::new(2, 3); + let reply = PusActionReply::new( + action_id, + ActionReplyVariant::CompletionFailed { + error_code, + params: None, + }, + ); + let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply); + let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]); + assert!(result.is_ok()); + assert!(result.unwrap()); + testbench.verif_reporter.assert_completion_failure( + TEST_COMPONENT_ID_0.into(), + req_id, + None, + error_code.raw() as u64, + ); + } + + #[test] + fn reply_handling_step_success() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let active_action_req = + ActivePusActionRequestStd::new_from_common_req(action_id, active_req); + let reply = PusActionReply::new(action_id, ActionReplyVariant::StepSuccess { step: 1 }); + let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply); + let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]); + assert!(result.is_ok()); + // Entry should not be removed, completion not done yet. + assert!(!result.unwrap()); + testbench.verif_reporter.check_next_was_added(req_id); + testbench + .verif_reporter + .check_next_is_acceptance_success(TEST_COMPONENT_ID_0.raw(), req_id); + testbench + .verif_reporter + .check_next_is_started_success(TEST_COMPONENT_ID_0.raw(), req_id); + testbench + .verif_reporter + .check_next_is_step_success(TEST_COMPONENT_ID_0.raw(), req_id, 1); + } + + #[test] + fn reply_handling_step_failure() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let active_action_req = + ActivePusActionRequestStd::new_from_common_req(action_id, active_req); + let error_code = ResultU16::new(2, 3); + let reply = PusActionReply::new( + action_id, + ActionReplyVariant::StepFailed { + error_code, + step: 1, + params: None, + }, + ); + let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply); + let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]); + assert!(result.is_ok()); + assert!(result.unwrap()); + testbench.verif_reporter.check_next_was_added(req_id); + testbench + .verif_reporter + .check_next_is_acceptance_success(TEST_COMPONENT_ID_0.id(), req_id); + testbench + .verif_reporter + .check_next_is_started_success(TEST_COMPONENT_ID_0.id(), req_id); + testbench.verif_reporter.check_next_is_step_failure( + TEST_COMPONENT_ID_0.id(), + req_id, + error_code.raw().into(), + ); + } + + #[test] + fn reply_handling_unrequested_reply() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_reply = PusActionReply::new(5_u32, ActionReplyVariant::Completed); + let unrequested_reply = + GenericMessage::new(MessageMetadata::new(10_u32, 15_u64), action_reply); + // Right now this function does not do a lot. We simply check that it does not panic or do + // weird stuff. + let result = testbench.handle_unrequested_reply(&unrequested_reply); + assert!(result.is_ok()); + } + + #[test] + fn reply_handling_reply_timeout() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_request) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let result = testbench.handle_request_timeout( + &ActivePusActionRequestStd::new_from_common_req(action_id, active_request), + &[], + ); + assert!(result.is_ok()); + testbench.verif_reporter.assert_completion_failure( + TEST_COMPONENT_ID_0.raw(), + req_id, + None, + tmtc_err::REQUEST_TIMEOUT.raw() as u64, + ); + } +} diff --git a/src/pus/mod.rs b/src/pus/mod.rs index 62deb27..eda55d4 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -1,3 +1,4 @@ +pub mod action; pub mod stack; pub mod test; @@ -13,9 +14,10 @@ use satrs::pus::verification::{ }; use satrs::pus::{ ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, - EcssTcReceiverCore, EcssTmSenderCore, EcssTmtcError, GenericConversionError, - GenericRoutingError, PusPacketHandlerResult, PusPacketHandlingError, PusReplyHandler, - PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory, + EcssTcInVecConverter, EcssTmSenderCore, EcssTmtcError, GenericConversionError, + GenericRoutingError, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, + PusPacketHandlingError, PusReplyHandler, PusRequestRouter, PusServiceHelper, + PusTcToRequestConverter, TcInMemory, }; use satrs::queue::GenericReceiveError; use satrs::request::{Apid, GenericMessage, MessageMetadata}; @@ -53,7 +55,7 @@ pub struct PusTcMpscRouter { // pub event_tc_sender: Sender, // pub sched_tc_sender: Sender, // pub hk_tc_sender: Sender, - // pub action_tc_sender: Sender, + pub action_tc_sender: Sender, // pub mode_tc_sender: Sender, } @@ -98,12 +100,10 @@ impl PusReceiver { tc_in_memory, token: Some(accepted_token.into()), })?, - // PusServiceId::Housekeeping => { - // self.pus_router.hk_tc_sender.send(EcssTcAndToken { - // tc_in_memory, - // token: Some(accepted_token.into()), - // })? - // } + PusServiceId::Action => self.pus_router.action_tc_sender.send(EcssTcAndToken { + tc_in_memory, + token: Some(accepted_token.into()), + })?, // PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken { // tc_in_memory, // token: Some(accepted_token.into()), @@ -161,7 +161,7 @@ impl PusReceiver { pub trait TargetedPusService { /// Returns [true] interface the packet handling is finished. - fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool; + fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus; fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus; fn check_for_request_timeouts(&mut self); } @@ -188,9 +188,6 @@ pub trait TargetedPusService { /// 3. [Self::check_for_request_timeouts] which checks for request timeouts, covering step 7. #[allow(dead_code)] pub struct PusTargetedRequestService< - TcReceiver: EcssTcReceiverCore, - TmSender: EcssTmSenderCore, - TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, RequestConverter: PusTcToRequestConverter, ReplyHandler: PusReplyHandler, @@ -199,8 +196,12 @@ pub struct PusTargetedRequestService< RequestType, ReplyType, > { - pub service_helper: - PusServiceHelper, + pub service_helper: PusServiceHelper< + MpscTcReceiver, + MpscTmAsVecSender, + EcssTcInVecConverter, + VerificationReporter, + >, pub request_router: GenericRequestRouter, pub request_converter: RequestConverter, pub active_request_map: ActiveRequestMap, @@ -209,11 +210,7 @@ pub struct PusTargetedRequestService< phantom: std::marker::PhantomData<(RequestType, ActiveRequestInfo, ReplyType)>, } -#[allow(dead_code)] impl< - TcReceiver: EcssTcReceiverCore, - TmSender: EcssTmSenderCore, - TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, RequestConverter: PusTcToRequestConverter, ReplyHandler: PusReplyHandler, @@ -223,9 +220,6 @@ impl< ReplyType, > PusTargetedRequestService< - TcReceiver, - TmSender, - TcInMemConverter, VerificationReporter, RequestConverter, ReplyHandler, @@ -239,9 +233,9 @@ where { pub fn new( service_helper: PusServiceHelper< - TcReceiver, - TmSender, - TcInMemConverter, + MpscTcReceiver, + MpscTmAsVecSender, + EcssTcInVecConverter, VerificationReporter, >, request_converter: RequestConverter, @@ -471,7 +465,7 @@ pub(crate) mod tests { use satrs::{ pus::{ verification::test_util::TestVerificationReporter, ActivePusRequestStd, - ActiveRequestMapProvider, EcssTcInVecConverter, MpscTcReceiver, + ActiveRequestMapProvider, }, request::UniqueApidTargetId, spacepackets::{ @@ -691,9 +685,6 @@ pub(crate) mod tests { ReplyType, > { pub service: PusTargetedRequestService< - MpscTcReceiver, - MpscTmAsVecSender, - EcssTcInVecConverter, TestVerificationReporter, RequestConverter, ReplyHandler, diff --git a/src/pus/stack.rs b/src/pus/stack.rs index f5aa1c1..0699046 100644 --- a/src/pus/stack.rs +++ b/src/pus/stack.rs @@ -1,11 +1,9 @@ -// use crate::pus::mode::ModeServiceWrapper; use crate::pus::test::TestCustomServiceWrapper; use crate::pus::HandlingStatus; use derive_new::new; -use satrs::{ - pus::{EcssTcInMemConverter, EcssTmSenderCore}, - spacepackets::time::{cds, TimeWriter}, -}; +use satrs::spacepackets::time::{cds, TimeWriter}; + +use super::{action::ActionServiceWrapper, TargetedPusService}; // use super::{ // action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper, @@ -14,18 +12,16 @@ use satrs::{ // }; #[derive(new)] -pub struct PusStack { - test_srv: TestCustomServiceWrapper, +pub struct PusStack { + test_srv: TestCustomServiceWrapper, // hk_srv_wrapper: HkServiceWrapper, // event_srv: EventServiceWrapper, - // action_srv_wrapper: ActionServiceWrapper, + action_srv_wrapper: ActionServiceWrapper, // schedule_srv: SchedulingServiceWrapper, // mode_srv: ModeServiceWrapper, } -impl - PusStack -{ +impl PusStack { pub fn periodic_operation(&mut self) { // Release all telecommands which reached their release time before calling the service // handlers. @@ -37,24 +33,31 @@ impl loop { let mut nothing_to_do = true; let mut is_srv_finished = - |tc_handling_done: bool, reply_handling_done: Option| { - if !tc_handling_done - || (reply_handling_done.is_some() - && reply_handling_done.unwrap() == HandlingStatus::Empty) + |_srv_id: u8, + tc_handling_status: HandlingStatus, + reply_handling_status: Option| { + if tc_handling_status == HandlingStatus::HandledOne + || (reply_handling_status.is_some() + && reply_handling_status.unwrap() == HandlingStatus::HandledOne) { nothing_to_do = false; } }; - is_srv_finished(self.test_srv.poll_and_handle_next_packet(&time_stamp), None); + is_srv_finished( + 17, + self.test_srv.poll_and_handle_next_packet(&time_stamp), + None, + ); // is_srv_finished(self.schedule_srv.poll_and_handle_next_tc(&time_stamp), None); // is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None); - // is_srv_finished( - // self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp), - // Some( - // self.action_srv_wrapper - // .poll_and_handle_next_reply(&time_stamp), - // ), - // ); + is_srv_finished( + 8, + self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp), + Some( + self.action_srv_wrapper + .poll_and_handle_next_reply(&time_stamp), + ), + ); // is_srv_finished( // self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp), // Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)), @@ -65,7 +68,7 @@ impl // ); if nothing_to_do { // Timeout checking is only done once. - // self.action_srv_wrapper.check_for_request_timeouts(); + self.action_srv_wrapper.check_for_request_timeouts(); // self.hk_srv_wrapper.check_for_request_timeouts(); // self.mode_srv.check_for_request_timeouts(); break; diff --git a/src/pus/test.rs b/src/pus/test.rs index ec63005..3f46fdd 100644 --- a/src/pus/test.rs +++ b/src/pus/test.rs @@ -6,8 +6,8 @@ use ops_sat_rs::config::tmtc_err; use satrs::pus::test::PusService17TestHandler; use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; use satrs::pus::{ - EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, EcssTmSenderCore, MpscTcReceiver, - MpscTmAsVecSender, PusPacketHandlerResult, PusServiceHelper, PusTmAsVec, + EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, + PusPacketHandlerResult, PusServiceHelper, PusTmAsVec, }; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::PusPacket; @@ -15,11 +15,13 @@ use satrs::spacepackets::time::cds::CdsTime; use satrs::spacepackets::time::TimeWriter; use std::sync::mpsc; -pub fn create_test_service_dynamic( +use super::HandlingStatus; + +pub fn create_test_service( tm_funnel_tx: mpsc::Sender, // event_sender: mpsc::Sender, pus_test_rx: mpsc::Receiver, -) -> TestCustomServiceWrapper { +) -> TestCustomServiceWrapper { let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( PUS_TEST_SERVICE.id(), pus_test_rx, @@ -33,23 +35,22 @@ pub fn create_test_service_dynamic( } } -pub struct TestCustomServiceWrapper< - TmSender: EcssTmSenderCore, - TcInMemConverter: EcssTcInMemConverter, -> { - pub handler: - PusService17TestHandler, +pub struct TestCustomServiceWrapper { + pub handler: PusService17TestHandler< + MpscTcReceiver, + MpscTmAsVecSender, + EcssTcInVecConverter, + VerificationReporter, + >, // pub test_srv_event_sender: mpsc::Sender, } -impl - TestCustomServiceWrapper -{ - pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> bool { +impl TestCustomServiceWrapper { + pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> HandlingStatus { let res = self.handler.poll_and_handle_next_tc(time_stamp); if res.is_err() { warn!("PUS17 handler failed with error {:?}", res.unwrap_err()); - return true; + return HandlingStatus::Empty; } match res.unwrap() { PusPacketHandlerResult::RequestHandled => { @@ -114,10 +115,8 @@ impl .expect("Sending start failure verification failed"); } } - PusPacketHandlerResult::Empty => { - return true; - } + PusPacketHandlerResult::Empty => return HandlingStatus::Empty, } - false + HandlingStatus::HandledOne } } diff --git a/src/tmtc/ccsds.rs b/src/tmtc/ccsds.rs index 0ba4776..97edf26 100644 --- a/src/tmtc/ccsds.rs +++ b/src/tmtc/ccsds.rs @@ -1,5 +1,4 @@ -use ops_sat_rs::config::components::Apid; -use ops_sat_rs::config::APID_VALIDATOR; +use ops_sat_rs::config::EXPERIMENT_APID; use satrs::pus::ReceivesEcssPusTc; use satrs::spacepackets::{CcsdsPacket, SpHeader}; use satrs::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; @@ -19,7 +18,7 @@ impl< > ValidatorU16Id for CcsdsReceiver { fn validate(&self, apid: u16) -> bool { - APID_VALIDATOR.contains(&apid) + apid == EXPERIMENT_APID } } @@ -35,11 +34,7 @@ impl< sp_header: &SpHeader, tc_raw: &[u8], ) -> Result<(), Self::Error> { - if sp_header.apid() == Apid::Cfdp as u16 { - } else { - return self.tc_source.pass_ccsds(sp_header, tc_raw); - } - Ok(()) + self.tc_source.pass_ccsds(sp_header, tc_raw) } fn handle_packet_with_unknown_apid( @@ -47,6 +42,7 @@ impl< sp_header: &SpHeader, _tc_raw: &[u8], ) -> Result<(), Self::Error> { + // TODO: Log event as telemetry or something similar? log::warn!("unknown APID 0x{:x?} detected", sp_header.apid()); Ok(()) }