Add action service and controller component #5

Merged
muellerr merged 11 commits from add-action-service-controller-obj into main 2024-04-13 11:19:13 +02:00
10 changed files with 950 additions and 137 deletions
Showing only changes of commit 59a06b5c50 - Show all commits

128
Cargo.lock generated
View File

@ -14,6 +14,15 @@ dependencies = [
"zerocopy", "zerocopy",
] ]
[[package]]
name = "aho-corasick"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "allocator-api2" name = "allocator-api2"
version = "0.2.16" version = "0.2.16"
@ -35,6 +44,54 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "anstream"
version = "0.6.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc"
[[package]]
name = "anstyle-parse"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648"
dependencies = [
"windows-sys",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7"
dependencies = [
"anstyle",
"windows-sys",
]
[[package]] [[package]]
name = "array-init" name = "array-init"
version = "0.0.4" version = "0.0.4"
@ -111,6 +168,12 @@ name = "cobs"
version = "0.2.3" version = "0.2.3"
source = "git+https://github.com/robamu/cobs.rs.git?branch=all_features#c70a7f30fd00a7cbdb7666dec12b437977385d40" source = "git+https://github.com/robamu/cobs.rs.git?branch=all_features#c70a7f30fd00a7cbdb7666dec12b437977385d40"
[[package]]
name = "colorchoice"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]] [[package]]
name = "core-foundation-sys" name = "core-foundation-sys"
version = "0.8.6" version = "0.8.6"
@ -202,6 +265,29 @@ version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125"
[[package]]
name = "env_filter"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea"
dependencies = [
"log",
"regex",
]
[[package]]
name = "env_logger"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9"
dependencies = [
"anstream",
"anstyle",
"env_filter",
"humantime",
"log",
]
[[package]] [[package]]
name = "equivalent" name = "equivalent"
version = "1.0.1" version = "1.0.1"
@ -239,6 +325,12 @@ version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]] [[package]]
name = "iana-time-zone" name = "iana-time-zone"
version = "0.1.60" version = "0.1.60"
@ -375,6 +467,7 @@ version = "0.0.1"
dependencies = [ dependencies = [
"chrono", "chrono",
"derive-new", "derive-new",
"env_logger",
"fern", "fern",
"lazy_static", "lazy_static",
"log", "log",
@ -440,6 +533,35 @@ dependencies = [
"bitflags", "bitflags",
] ]
[[package]]
name = "regex"
version = "1.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
[[package]] [[package]]
name = "rustversion" name = "rustversion"
version = "1.0.15" version = "1.0.15"
@ -666,6 +788,12 @@ version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
[[package]]
name = "utf8parse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]] [[package]]
name = "version_check" name = "version_check"
version = "0.9.4" version = "0.9.4"

View File

@ -25,3 +25,6 @@ features = ["test_util"]
version = "0.1.1" version = "0.1.1"
git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git"
branch = "main" branch = "main"
[dev-dependencies]
env_logger = "0.11"

View File

@ -1,30 +1,12 @@
use lazy_static::lazy_static;
use num_enum::{IntoPrimitive, TryFromPrimitive}; use num_enum::{IntoPrimitive, TryFromPrimitive};
use satrs::spacepackets::{PacketId, PacketType};
use satrs_mib::res_code::ResultU16Info; use satrs_mib::res_code::ResultU16Info;
use satrs_mib::resultcode; use satrs_mib::resultcode;
use std::{collections::HashSet, net::Ipv4Addr}; use std::net::Ipv4Addr;
use strum::IntoEnumIterator;
pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
pub const SERVER_PORT: u16 = 7301; pub const SERVER_PORT: u16 = 7301;
pub const EXPERIMENT_ID: u32 = 278;
lazy_static! { pub const EXPERIMENT_APID: u16 = 1024 + EXPERIMENT_ID as u16;
pub static ref PACKET_ID_VALIDATOR: HashSet<PacketId> = {
let mut set = HashSet::new();
for id in components::Apid::iter() {
set.insert(PacketId::new(PacketType::Tc, true, id as u16));
}
set
};
pub static ref APID_VALIDATOR: HashSet<u16> = {
let mut set = HashSet::new();
for id in components::Apid::iter() {
set.insert(id as u16);
}
set
};
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)] #[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)]
#[repr(u8)] #[repr(u8)]
@ -78,45 +60,35 @@ pub mod tmtc_err {
pub mod components { pub mod components {
use satrs::request::UniqueApidTargetId; use satrs::request::UniqueApidTargetId;
use strum::EnumIter;
#[derive(Copy, Clone, PartialEq, Eq, EnumIter)] use super::EXPERIMENT_APID;
pub enum Apid {
Sched = 1,
GenericPus = 2,
Cfdp = 4,
}
// Component IDs for components with the PUS APID. // Component IDs for components with the PUS APID.
#[derive(Copy, Clone, PartialEq, Eq)] #[derive(Copy, Clone, PartialEq, Eq)]
pub enum PusId { pub enum UniqueId {
PusEventManagement = 0, Controller = 0,
PusRouting = 1, PusEventManagement = 1,
PusTest = 2, PusRouting = 2,
PusAction = 3, PusTest = 3,
PusMode = 4, PusAction = 4,
PusHk = 5, PusMode = 5,
} PusHk = 6,
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum AcsId {
Mgm0 = 0,
} }
pub const CONTROLLER_ID: UniqueApidTargetId =
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::Controller as u32);
pub const PUS_ACTION_SERVICE: UniqueApidTargetId = pub const PUS_ACTION_SERVICE: UniqueApidTargetId =
UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusAction as u32); UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusAction as u32);
pub const PUS_EVENT_MANAGEMENT: UniqueApidTargetId = pub const PUS_EVENT_MANAGEMENT: UniqueApidTargetId =
UniqueApidTargetId::new(Apid::GenericPus as u16, 0); UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusEventManagement as u32);
pub const PUS_ROUTING_SERVICE: UniqueApidTargetId = pub const PUS_ROUTING_SERVICE: UniqueApidTargetId =
UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusRouting as u32); UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusRouting as u32);
pub const PUS_TEST_SERVICE: UniqueApidTargetId = pub const PUS_TEST_SERVICE: UniqueApidTargetId =
UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusTest as u32); UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusTest as u32);
pub const PUS_MODE_SERVICE: UniqueApidTargetId = pub const PUS_MODE_SERVICE: UniqueApidTargetId =
UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusMode as u32); UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusMode as u32);
pub const PUS_HK_SERVICE: UniqueApidTargetId = pub const PUS_HK_SERVICE: UniqueApidTargetId =
UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusHk as u32); UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusHk as u32);
pub const PUS_SCHED_SERVICE: UniqueApidTargetId =
UniqueApidTargetId::new(Apid::Sched as u16, 0);
} }
pub mod tasks { pub mod tasks {

View File

@ -101,7 +101,7 @@ impl<
cfg: ServerConfig, cfg: ServerConfig,
tm_source: SyncTcpTmSource, tm_source: SyncTcpTmSource,
tc_receiver: CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>, tc_receiver: CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
packet_id_lookup: HashSet<PacketId>, packet_id_lookup: Vec<PacketId>,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
Ok(Self { Ok(Self {
server: TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)?, server: TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)?,

View File

@ -82,6 +82,7 @@ mod tests {
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use ops_sat_rs::config::{components, OBSW_SERVER_ADDR};
use satrs::{ use satrs::{
spacepackets::{ spacepackets::{
ecss::{tc::PusTcCreator, WritablePusPacket}, ecss::{tc::PusTcCreator, WritablePusPacket},
@ -89,7 +90,6 @@ mod tests {
}, },
tmtc::ReceivesTcCore, tmtc::ReceivesTcCore,
}; };
use ops_sat_rs::config::{components, OBSW_SERVER_ADDR};
use super::*; use super::*;

View File

@ -6,17 +6,15 @@ use std::{
}; };
use log::info; use log::info;
use ops_sat_rs::config::tasks::FREQ_MS_PUS_STACK; use ops_sat_rs::config::{components::CONTROLLER_ID, tasks::FREQ_MS_PUS_STACK, EXPERIMENT_APID};
use ops_sat_rs::config::{ use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT,
};
use satrs::{ use satrs::{
hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer},
spacepackets::PacketId,
tmtc::CcsdsDistributor, tmtc::CcsdsDistributor,
}; };
use crate::pus::stack::PusStack; use crate::pus::test::create_test_service;
use crate::pus::test::create_test_service_dynamic;
use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::pus::{PusReceiver, PusTcMpscRouter};
use crate::tm_funnel::TmFunnelDynamic; use crate::tm_funnel::TmFunnelDynamic;
use crate::tmtc::TcSourceTaskDynamic; use crate::tmtc::TcSourceTaskDynamic;
@ -27,6 +25,10 @@ use crate::{
logger::setup_logger, logger::setup_logger,
tmtc::PusTcSourceProviderDynamic, tmtc::PusTcSourceProviderDynamic,
}; };
use crate::{
pus::{action::create_action_service, stack::PusStack},
requests::GenericRequestRouter,
};
mod ccsds; mod ccsds;
mod interface; mod interface;
@ -51,23 +53,30 @@ fn main() {
// let (pus_event_tx, pus_event_rx) = mpsc::channel(); // let (pus_event_tx, pus_event_rx) = mpsc::channel();
// let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); // let (pus_sched_tx, pus_sched_rx) = mpsc::channel();
// let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); // let (pus_hk_tx, pus_hk_rx) = mpsc::channel();
// let (pus_action_tx, pus_action_rx) = mpsc::channel(); let (pus_action_tx, pus_action_rx) = mpsc::channel();
// let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); // let (pus_mode_tx, pus_mode_rx) = mpsc::channel();
// let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel();
// let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); // let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel();
// let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel(); // let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel();
let (controller_composite_tx, controller_composite_rx) = mpsc::channel();
// Some request are targetable. This map is used to retrieve sender handles based on a target ID.
let mut request_map = GenericRequestRouter::default();
request_map
.composite_router_map
.insert(CONTROLLER_ID.id(), controller_composite_tx);
let pus_router = PusTcMpscRouter { let pus_router = PusTcMpscRouter {
test_tc_sender: pus_test_tx, test_tc_sender: pus_test_tx,
// event_tc_sender: pus_event_tx, // event_tc_sender: pus_event_tx,
// sched_tc_sender: pus_sched_tx, // sched_tc_sender: pus_sched_tx,
// hk_tc_sender: pus_hk_tx, // hk_tc_sender: pus_hk_tx,
// action_tc_sender: pus_action_tx, action_tc_sender: pus_action_tx,
// mode_tc_sender: pus_mode_tx, // mode_tc_sender: pus_mode_tx,
}; };
let pus_test_service = create_test_service_dynamic( let pus_test_service = create_test_service(
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
// event_handler.clone_event_sender(), // event_handler.clone_event_sender(),
pus_test_rx, pus_test_rx,
@ -81,12 +90,12 @@ fn main() {
// //
// let pus_event_service = // let pus_event_service =
// create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx); // create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx);
// let pus_action_service = create_action_service_dynamic( let pus_action_service = create_action_service(
// tm_funnel_tx.clone(), tm_funnel_tx.clone(),
// pus_action_rx, pus_action_rx,
// request_map.clone(), request_map.clone(),
// pus_action_reply_rx, pus_action_reply_rx,
// ); );
// let pus_hk_service = create_hk_service_dynamic( // let pus_hk_service = create_hk_service_dynamic(
// tm_funnel_tx.clone(), // tm_funnel_tx.clone(),
// pus_hk_rx, // pus_hk_rx,
@ -103,7 +112,7 @@ fn main() {
pus_test_service, pus_test_service,
// pus_hk_service, // pus_hk_service,
// pus_event_service, // pus_event_service,
// pus_action_service, pus_action_service,
// pus_scheduler_service, // pus_scheduler_service,
// pus_mode_service, // pus_mode_service,
); );
@ -133,7 +142,8 @@ fn main() {
tcp_server_cfg, tcp_server_cfg,
sync_tm_tcp_source.clone(), sync_tm_tcp_source.clone(),
tcp_ccsds_distributor, tcp_ccsds_distributor,
PACKET_ID_VALIDATOR.clone(), vec![PacketId::new_for_tc(true, EXPERIMENT_APID)],
// PACKET_ID_VALIDATOR.clone(),
) )
.expect("tcp server creation failed"); .expect("tcp server creation failed");

714
src/pus/action.rs Normal file
View File

@ -0,0 +1,714 @@
use log::{error, warn};
use ops_sat_rs::config::components::PUS_ACTION_SERVICE;
use ops_sat_rs::config::tmtc_err;
use satrs::action::{ActionRequest, ActionRequestVariant};
use satrs::params::WritableToBeBytes;
use satrs::pus::action::{
ActionReplyVariant, ActivePusActionRequestStd, DefaultActiveActionRequestMap, PusActionReply,
};
use satrs::pus::verification::{
FailParams, FailParamsWithStep, TcStateAccepted, TcStateStarted, VerificationReporter,
VerificationReportingProvider, VerificationToken,
};
use satrs::pus::{
ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter,
EcssTmSenderCore, EcssTmtcError, GenericConversionError, MpscTmAsVecSender,
PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter, PusTmAsVec,
};
use satrs::request::{GenericMessage, UniqueApidTargetId};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket};
use std::sync::mpsc;
use std::time::Duration;
use crate::requests::GenericRequestRouter;
use super::{
create_verification_reporter, generic_pus_request_timeout_handler, HandlingStatus,
PusTargetedRequestService, TargetedPusService,
};
pub struct ActionReplyHandler {
fail_data_buf: [u8; 128],
}
impl Default for ActionReplyHandler {
fn default() -> Self {
Self {
fail_data_buf: [0; 128],
}
}
}
impl PusReplyHandler<ActivePusActionRequestStd, PusActionReply> for ActionReplyHandler {
type Error = EcssTmtcError;
fn handle_unrequested_reply(
&mut self,
reply: &GenericMessage<PusActionReply>,
_tm_sender: &impl EcssTmSenderCore,
) -> Result<(), Self::Error> {
warn!("received unexpected reply for service 8: {reply:?}");
Ok(())
}
fn handle_reply(
&mut self,
reply: &GenericMessage<PusActionReply>,
active_request: &ActivePusActionRequestStd,
tm_sender: &(impl EcssTmSenderCore + ?Sized),
verification_handler: &impl VerificationReportingProvider,
time_stamp: &[u8],
) -> Result<bool, Self::Error> {
let verif_token: VerificationToken<TcStateStarted> = active_request
.token()
.try_into()
.expect("invalid token state");
let remove_entry = match &reply.message.variant {
ActionReplyVariant::CompletionFailed { error_code, params } => {
let mut fail_data_len = 0;
if let Some(params) = params {
fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?;
}
verification_handler.completion_failure(
tm_sender,
verif_token,
FailParams::new(time_stamp, error_code, &self.fail_data_buf[..fail_data_len]),
)?;
true
}
ActionReplyVariant::StepFailed {
error_code,
step,
params,
} => {
let mut fail_data_len = 0;
if let Some(params) = params {
fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?;
}
verification_handler.step_failure(
tm_sender,
verif_token,
FailParamsWithStep::new(
time_stamp,
&EcssEnumU16::new(*step),
error_code,
&self.fail_data_buf[..fail_data_len],
),
)?;
true
}
ActionReplyVariant::Completed => {
verification_handler.completion_success(tm_sender, verif_token, time_stamp)?;
true
}
ActionReplyVariant::StepSuccess { step } => {
verification_handler.step_success(
tm_sender,
&verif_token,
time_stamp,
EcssEnumU16::new(*step),
)?;
false
}
_ => false,
};
Ok(remove_entry)
}
fn handle_request_timeout(
&mut self,
active_request: &ActivePusActionRequestStd,
tm_sender: &impl EcssTmSenderCore,
verification_handler: &impl VerificationReportingProvider,
time_stamp: &[u8],
) -> Result<(), Self::Error> {
generic_pus_request_timeout_handler(
tm_sender,
active_request,
verification_handler,
time_stamp,
"action",
)
}
}
#[derive(Default)]
pub struct ActionRequestConverter {}
impl PusTcToRequestConverter<ActivePusActionRequestStd, ActionRequest> for ActionRequestConverter {
type Error = GenericConversionError;
fn convert(
&mut self,
token: VerificationToken<TcStateAccepted>,
tc: &PusTcReader,
tm_sender: &(impl EcssTmSenderCore + ?Sized),
verif_reporter: &impl VerificationReportingProvider,
time_stamp: &[u8],
) -> Result<(ActivePusActionRequestStd, ActionRequest), Self::Error> {
let subservice = tc.subservice();
let user_data = tc.user_data();
if user_data.len() < 8 {
verif_reporter
.start_failure(
tm_sender,
token,
FailParams::new_no_fail_data(time_stamp, &tmtc_err::NOT_ENOUGH_APP_DATA),
)
.expect("Sending start failure failed");
return Err(GenericConversionError::NotEnoughAppData {
expected: 8,
found: user_data.len(),
});
}
let target_id_and_apid = UniqueApidTargetId::from_pus_tc(tc).unwrap();
let action_id = u32::from_be_bytes(user_data[4..8].try_into().unwrap());
if subservice == 128 {
let req_variant = if user_data.len() == 8 {
ActionRequestVariant::NoData
} else {
ActionRequestVariant::VecData(user_data[8..].to_vec())
};
Ok((
ActivePusActionRequestStd::new(
action_id,
target_id_and_apid.into(),
token.into(),
Duration::from_secs(30),
),
ActionRequest::new(action_id, req_variant),
))
} else {
verif_reporter
.start_failure(
tm_sender,
token,
FailParams::new_no_fail_data(time_stamp, &tmtc_err::INVALID_PUS_SUBSERVICE),
)
.expect("Sending start failure failed");
Err(GenericConversionError::InvalidSubservice(subservice))
}
}
}
pub fn create_action_service(
tm_funnel_tx: mpsc::Sender<PusTmAsVec>,
pus_action_rx: mpsc::Receiver<EcssTcAndToken>,
action_router: GenericRequestRouter,
reply_receiver: mpsc::Receiver<GenericMessage<PusActionReply>>,
) -> ActionServiceWrapper {
let action_request_handler = PusTargetedRequestService::new(
PusServiceHelper::new(
PUS_ACTION_SERVICE.id(),
pus_action_rx,
tm_funnel_tx,
create_verification_reporter(PUS_ACTION_SERVICE.id(), PUS_ACTION_SERVICE.apid),
EcssTcInVecConverter::default(),
),
ActionRequestConverter::default(),
DefaultActiveActionRequestMap::default(),
ActionReplyHandler::default(),
action_router,
reply_receiver,
);
ActionServiceWrapper {
service: action_request_handler,
}
}
pub struct ActionServiceWrapper {
pub(crate) service: PusTargetedRequestService<
VerificationReporter,
ActionRequestConverter,
ActionReplyHandler,
DefaultActiveActionRequestMap,
ActivePusActionRequestStd,
ActionRequest,
PusActionReply,
>,
}
impl TargetedPusService for ActionServiceWrapper {
/// Returns [true] if the packet handling is finished.
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self.service.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 8 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 8 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 8 subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
}
}
HandlingStatus::HandledOne
}
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
// This only fails if all senders disconnected. Treat it like an empty queue.
self.service
.poll_and_check_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!("PUS 8: Handling reply failed with error {e:?}");
HandlingStatus::Empty
})
}
fn check_for_request_timeouts(&mut self) {
self.service.check_for_request_timeouts();
}
}
#[cfg(test)]
mod tests {
use satrs::pus::test_util::{
TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1,
};
use satrs::pus::verification;
use satrs::pus::verification::test_util::TestVerificationReporter;
use satrs::request::MessageMetadata;
use satrs::ComponentId;
use satrs::{
res_code::ResultU16,
spacepackets::{
ecss::{
tc::{PusTcCreator, PusTcSecondaryHeader},
tm::PusTmReader,
WritablePusPacket,
},
SpHeader,
},
};
use crate::{
pus::tests::{PusConverterTestbench, ReplyHandlerTestbench, TargetedPusRequestTestbench},
requests::CompositeRequest,
};
use super::*;
impl
TargetedPusRequestTestbench<
ActionRequestConverter,
ActionReplyHandler,
DefaultActiveActionRequestMap,
ActivePusActionRequestStd,
ActionRequest,
PusActionReply,
>
{
pub fn new_for_action(owner_id: ComponentId, target_id: ComponentId) -> Self {
let _ = env_logger::builder().is_test(true).try_init();
let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel();
let (pus_action_tx, pus_action_rx) = mpsc::channel();
let (action_reply_tx, action_reply_rx) = mpsc::channel();
let (action_req_tx, action_req_rx) = mpsc::channel();
let verif_reporter = TestVerificationReporter::new(owner_id);
let mut generic_req_router = GenericRequestRouter::default();
generic_req_router
.composite_router_map
.insert(target_id, action_req_tx);
Self {
service: PusTargetedRequestService::new(
PusServiceHelper::new(
owner_id,
pus_action_rx,
tm_funnel_tx.clone(),
verif_reporter,
EcssTcInVecConverter::default(),
),
ActionRequestConverter::default(),
DefaultActiveActionRequestMap::default(),
ActionReplyHandler::default(),
generic_req_router,
action_reply_rx,
),
request_id: None,
pus_packet_tx: pus_action_tx,
tm_funnel_rx,
reply_tx: action_reply_tx,
request_rx: action_req_rx,
}
}
pub fn verify_packet_started(&self) {
self.service
.service_helper
.common
.verif_reporter
.check_next_is_started_success(
self.service.service_helper.id(),
self.request_id.expect("request ID not set").into(),
);
}
pub fn verify_packet_completed(&self) {
self.service
.service_helper
.common
.verif_reporter
.check_next_is_completion_success(
self.service.service_helper.id(),
self.request_id.expect("request ID not set").into(),
);
}
pub fn verify_tm_empty(&self) {
let packet = self.tm_funnel_rx.try_recv();
if let Err(mpsc::TryRecvError::Empty) = packet {
} else {
let tm = packet.unwrap();
let unexpected_tm = PusTmReader::new(&tm.packet, 7).unwrap().0;
panic!("unexpected TM packet {unexpected_tm:?}");
}
}
pub fn verify_next_tc_is_handled_properly(&mut self, time_stamp: &[u8]) {
let result = self.service.poll_and_handle_next_tc(time_stamp);
if let Err(e) = result {
panic!("unexpected error {:?}", e);
}
let result = result.unwrap();
match result {
PusPacketHandlerResult::RequestHandled => (),
_ => panic!("unexpected result {result:?}"),
}
}
pub fn verify_all_tcs_handled(&mut self, time_stamp: &[u8]) {
let result = self.service.poll_and_handle_next_tc(time_stamp);
if let Err(e) = result {
panic!("unexpected error {:?}", e);
}
let result = result.unwrap();
match result {
PusPacketHandlerResult::Empty => (),
_ => panic!("unexpected result {result:?}"),
}
}
pub fn verify_next_reply_is_handled_properly(&mut self, time_stamp: &[u8]) {
let result = self.service.poll_and_check_next_reply(time_stamp);
assert!(result.is_ok());
assert_eq!(result.unwrap(), HandlingStatus::HandledOne);
}
pub fn verify_all_replies_handled(&mut self, time_stamp: &[u8]) {
let result = self.service.poll_and_check_next_reply(time_stamp);
assert!(result.is_ok());
assert_eq!(result.unwrap(), HandlingStatus::Empty);
}
pub fn add_tc(&mut self, tc: &PusTcCreator) {
self.request_id = Some(verification::RequestId::new(tc).into());
let token = self.service.service_helper.verif_reporter_mut().add_tc(tc);
let accepted_token = self
.service
.service_helper
.verif_reporter()
.acceptance_success(self.service.service_helper.tm_sender(), token, &[0; 7])
.expect("TC acceptance failed");
self.service
.service_helper
.verif_reporter()
.check_next_was_added(accepted_token.request_id());
let id = self.service.service_helper.id();
self.service
.service_helper
.verif_reporter()
.check_next_is_acceptance_success(id, accepted_token.request_id());
self.pus_packet_tx
.send(EcssTcAndToken::new(tc.to_vec().unwrap(), accepted_token))
.unwrap();
}
}
#[test]
fn basic_request() {
let mut testbench = TargetedPusRequestTestbench::new_for_action(
TEST_COMPONENT_ID_0.id(),
TEST_COMPONENT_ID_1.id(),
);
// Create a basic action request and verify forwarding.
let sp_header = SpHeader::new_from_apid(TEST_APID);
let sec_header = PusTcSecondaryHeader::new_simple(8, 128);
let action_id = 5_u32;
let mut app_data: [u8; 8] = [0; 8];
app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_1.to_be_bytes());
app_data[4..8].copy_from_slice(&action_id.to_be_bytes());
let pus8_packet = PusTcCreator::new(sp_header, sec_header, &app_data, true);
testbench.add_tc(&pus8_packet);
let time_stamp: [u8; 7] = [0; 7];
testbench.verify_next_tc_is_handled_properly(&time_stamp);
testbench.verify_all_tcs_handled(&time_stamp);
testbench.verify_packet_started();
let possible_req = testbench.request_rx.try_recv();
assert!(possible_req.is_ok());
let req = possible_req.unwrap();
if let CompositeRequest::Action(action_req) = req.message {
assert_eq!(action_req.action_id, action_id);
assert_eq!(action_req.variant, ActionRequestVariant::NoData);
let action_reply = PusActionReply::new(action_id, ActionReplyVariant::Completed);
testbench
.reply_tx
.send(GenericMessage::new(req.requestor_info, action_reply))
.unwrap();
} else {
panic!("unexpected request type");
}
testbench.verify_next_reply_is_handled_properly(&time_stamp);
testbench.verify_all_replies_handled(&time_stamp);
testbench.verify_packet_completed();
testbench.verify_tm_empty();
}
#[test]
fn basic_request_routing_error() {
let mut testbench = TargetedPusRequestTestbench::new_for_action(
TEST_COMPONENT_ID_0.id(),
TEST_COMPONENT_ID_1.id(),
);
// Create a basic action request and verify forwarding.
let sec_header = PusTcSecondaryHeader::new_simple(8, 128);
let action_id = 5_u32;
let mut app_data: [u8; 8] = [0; 8];
// Invalid ID, routing should fail.
app_data[0..4].copy_from_slice(&0_u32.to_be_bytes());
app_data[4..8].copy_from_slice(&action_id.to_be_bytes());
let pus8_packet = PusTcCreator::new(
SpHeader::new_from_apid(TEST_APID),
sec_header,
&app_data,
true,
);
testbench.add_tc(&pus8_packet);
let time_stamp: [u8; 7] = [0; 7];
let result = testbench.service.poll_and_handle_next_tc(&time_stamp);
assert!(result.is_err());
// Verify the correct result and completion failure.
}
#[test]
fn converter_action_req_no_data() {
let mut testbench = PusConverterTestbench::new(
TEST_COMPONENT_ID_0.raw(),
ActionRequestConverter::default(),
);
let sec_header = PusTcSecondaryHeader::new_simple(8, 128);
let action_id = 5_u32;
let mut app_data: [u8; 8] = [0; 8];
// Invalid ID, routing should fail.
app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_0.to_be_bytes());
app_data[4..8].copy_from_slice(&action_id.to_be_bytes());
let pus8_packet = PusTcCreator::new(
SpHeader::new_from_apid(TEST_APID),
sec_header,
&app_data,
true,
);
let token = testbench.add_tc(&pus8_packet);
let result = testbench.convert(token, &[], TEST_APID, TEST_UNIQUE_ID_0);
assert!(result.is_ok());
let (active_req, request) = result.unwrap();
if let ActionRequestVariant::NoData = request.variant {
assert_eq!(request.action_id, action_id);
assert_eq!(active_req.action_id, action_id);
assert_eq!(
active_req.target_id(),
UniqueApidTargetId::new(TEST_APID, TEST_UNIQUE_ID_0).raw()
);
assert_eq!(
active_req.token().request_id(),
testbench.request_id().unwrap()
);
} else {
panic!("unexpected action request variant");
}
}
#[test]
fn converter_action_req_with_data() {
let mut testbench =
PusConverterTestbench::new(TEST_COMPONENT_ID_0.id(), ActionRequestConverter::default());
let sec_header = PusTcSecondaryHeader::new_simple(8, 128);
let action_id = 5_u32;
let mut app_data: [u8; 16] = [0; 16];
// Invalid ID, routing should fail.
app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_0.to_be_bytes());
app_data[4..8].copy_from_slice(&action_id.to_be_bytes());
for i in 0..8 {
app_data[i + 8] = i as u8;
}
let pus8_packet = PusTcCreator::new(
SpHeader::new_from_apid(TEST_APID),
sec_header,
&app_data,
true,
);
let token = testbench.add_tc(&pus8_packet);
let result = testbench.convert(token, &[], TEST_APID, TEST_UNIQUE_ID_0);
assert!(result.is_ok());
let (active_req, request) = result.unwrap();
if let ActionRequestVariant::VecData(vec) = request.variant {
assert_eq!(request.action_id, action_id);
assert_eq!(active_req.action_id, action_id);
assert_eq!(vec, app_data[8..].to_vec());
} else {
panic!("unexpected action request variant");
}
}
#[test]
fn reply_handling_completion_success() {
let mut testbench =
ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default());
let action_id = 5_u32;
let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]);
let active_action_req =
ActivePusActionRequestStd::new_from_common_req(action_id, active_req);
let reply = PusActionReply::new(action_id, ActionReplyVariant::Completed);
let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply);
let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]);
assert!(result.is_ok());
assert!(result.unwrap());
testbench.verif_reporter.assert_full_completion_success(
TEST_COMPONENT_ID_0.id(),
req_id,
None,
);
}
#[test]
fn reply_handling_completion_failure() {
let mut testbench =
ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default());
let action_id = 5_u32;
let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]);
let active_action_req =
ActivePusActionRequestStd::new_from_common_req(action_id, active_req);
let error_code = ResultU16::new(2, 3);
let reply = PusActionReply::new(
action_id,
ActionReplyVariant::CompletionFailed {
error_code,
params: None,
},
);
let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply);
let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]);
assert!(result.is_ok());
assert!(result.unwrap());
testbench.verif_reporter.assert_completion_failure(
TEST_COMPONENT_ID_0.into(),
req_id,
None,
error_code.raw() as u64,
);
}
#[test]
fn reply_handling_step_success() {
let mut testbench =
ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default());
let action_id = 5_u32;
let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]);
let active_action_req =
ActivePusActionRequestStd::new_from_common_req(action_id, active_req);
let reply = PusActionReply::new(action_id, ActionReplyVariant::StepSuccess { step: 1 });
let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply);
let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]);
assert!(result.is_ok());
// Entry should not be removed, completion not done yet.
assert!(!result.unwrap());
testbench.verif_reporter.check_next_was_added(req_id);
testbench
.verif_reporter
.check_next_is_acceptance_success(TEST_COMPONENT_ID_0.raw(), req_id);
testbench
.verif_reporter
.check_next_is_started_success(TEST_COMPONENT_ID_0.raw(), req_id);
testbench
.verif_reporter
.check_next_is_step_success(TEST_COMPONENT_ID_0.raw(), req_id, 1);
}
#[test]
fn reply_handling_step_failure() {
let mut testbench =
ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default());
let action_id = 5_u32;
let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]);
let active_action_req =
ActivePusActionRequestStd::new_from_common_req(action_id, active_req);
let error_code = ResultU16::new(2, 3);
let reply = PusActionReply::new(
action_id,
ActionReplyVariant::StepFailed {
error_code,
step: 1,
params: None,
},
);
let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply);
let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]);
assert!(result.is_ok());
assert!(result.unwrap());
testbench.verif_reporter.check_next_was_added(req_id);
testbench
.verif_reporter
.check_next_is_acceptance_success(TEST_COMPONENT_ID_0.id(), req_id);
testbench
.verif_reporter
.check_next_is_started_success(TEST_COMPONENT_ID_0.id(), req_id);
testbench.verif_reporter.check_next_is_step_failure(
TEST_COMPONENT_ID_0.id(),
req_id,
error_code.raw().into(),
);
}
#[test]
fn reply_handling_unrequested_reply() {
let mut testbench =
ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default());
let action_reply = PusActionReply::new(5_u32, ActionReplyVariant::Completed);
let unrequested_reply =
GenericMessage::new(MessageMetadata::new(10_u32, 15_u64), action_reply);
// Right now this function does not do a lot. We simply check that it does not panic or do
// weird stuff.
let result = testbench.handle_unrequested_reply(&unrequested_reply);
assert!(result.is_ok());
}
#[test]
fn reply_handling_reply_timeout() {
let mut testbench =
ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default());
let action_id = 5_u32;
let (req_id, active_request) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]);
let result = testbench.handle_request_timeout(
&ActivePusActionRequestStd::new_from_common_req(action_id, active_request),
&[],
);
assert!(result.is_ok());
testbench.verif_reporter.assert_completion_failure(
TEST_COMPONENT_ID_0.raw(),
req_id,
None,
tmtc_err::REQUEST_TIMEOUT.raw() as u64,
);
}
}

View File

@ -1,3 +1,4 @@
pub mod action;
pub mod stack; pub mod stack;
pub mod test; pub mod test;
@ -13,9 +14,10 @@ use satrs::pus::verification::{
}; };
use satrs::pus::{ use satrs::pus::{
ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter,
EcssTcReceiverCore, EcssTmSenderCore, EcssTmtcError, GenericConversionError, EcssTcInVecConverter, EcssTmSenderCore, EcssTmtcError, GenericConversionError,
GenericRoutingError, PusPacketHandlerResult, PusPacketHandlingError, PusReplyHandler, GenericRoutingError, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult,
PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory, PusPacketHandlingError, PusReplyHandler, PusRequestRouter, PusServiceHelper,
PusTcToRequestConverter, TcInMemory,
}; };
use satrs::queue::GenericReceiveError; use satrs::queue::GenericReceiveError;
use satrs::request::{Apid, GenericMessage, MessageMetadata}; use satrs::request::{Apid, GenericMessage, MessageMetadata};
@ -53,7 +55,7 @@ pub struct PusTcMpscRouter {
// pub event_tc_sender: Sender<EcssTcAndToken>, // pub event_tc_sender: Sender<EcssTcAndToken>,
// pub sched_tc_sender: Sender<EcssTcAndToken>, // pub sched_tc_sender: Sender<EcssTcAndToken>,
// pub hk_tc_sender: Sender<EcssTcAndToken>, // pub hk_tc_sender: Sender<EcssTcAndToken>,
// pub action_tc_sender: Sender<EcssTcAndToken>, pub action_tc_sender: Sender<EcssTcAndToken>,
// pub mode_tc_sender: Sender<EcssTcAndToken>, // pub mode_tc_sender: Sender<EcssTcAndToken>,
} }
@ -98,12 +100,10 @@ impl<TmSender: EcssTmSenderCore> PusReceiver<TmSender> {
tc_in_memory, tc_in_memory,
token: Some(accepted_token.into()), token: Some(accepted_token.into()),
})?, })?,
// PusServiceId::Housekeeping => { PusServiceId::Action => self.pus_router.action_tc_sender.send(EcssTcAndToken {
// self.pus_router.hk_tc_sender.send(EcssTcAndToken { tc_in_memory,
// tc_in_memory, token: Some(accepted_token.into()),
// token: Some(accepted_token.into()), })?,
// })?
// }
// PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken { // PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken {
// tc_in_memory, // tc_in_memory,
// token: Some(accepted_token.into()), // token: Some(accepted_token.into()),
@ -161,7 +161,7 @@ impl<TmSender: EcssTmSenderCore> PusReceiver<TmSender> {
pub trait TargetedPusService { pub trait TargetedPusService {
/// Returns [true] interface the packet handling is finished. /// Returns [true] interface the packet handling is finished.
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool; fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus;
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus; fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus;
fn check_for_request_timeouts(&mut self); fn check_for_request_timeouts(&mut self);
} }
@ -188,9 +188,6 @@ pub trait TargetedPusService {
/// 3. [Self::check_for_request_timeouts] which checks for request timeouts, covering step 7. /// 3. [Self::check_for_request_timeouts] which checks for request timeouts, covering step 7.
#[allow(dead_code)] #[allow(dead_code)]
pub struct PusTargetedRequestService< pub struct PusTargetedRequestService<
TcReceiver: EcssTcReceiverCore,
TmSender: EcssTmSenderCore,
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider, VerificationReporter: VerificationReportingProvider,
RequestConverter: PusTcToRequestConverter<ActiveRequestInfo, RequestType, Error = GenericConversionError>, RequestConverter: PusTcToRequestConverter<ActiveRequestInfo, RequestType, Error = GenericConversionError>,
ReplyHandler: PusReplyHandler<ActiveRequestInfo, ReplyType, Error = EcssTmtcError>, ReplyHandler: PusReplyHandler<ActiveRequestInfo, ReplyType, Error = EcssTmtcError>,
@ -199,8 +196,12 @@ pub struct PusTargetedRequestService<
RequestType, RequestType,
ReplyType, ReplyType,
> { > {
pub service_helper: pub service_helper: PusServiceHelper<
PusServiceHelper<TcReceiver, TmSender, TcInMemConverter, VerificationReporter>, MpscTcReceiver,
MpscTmAsVecSender,
EcssTcInVecConverter,
VerificationReporter,
>,
pub request_router: GenericRequestRouter, pub request_router: GenericRequestRouter,
pub request_converter: RequestConverter, pub request_converter: RequestConverter,
pub active_request_map: ActiveRequestMap, pub active_request_map: ActiveRequestMap,
@ -209,11 +210,7 @@ pub struct PusTargetedRequestService<
phantom: std::marker::PhantomData<(RequestType, ActiveRequestInfo, ReplyType)>, phantom: std::marker::PhantomData<(RequestType, ActiveRequestInfo, ReplyType)>,
} }
#[allow(dead_code)]
impl< impl<
TcReceiver: EcssTcReceiverCore,
TmSender: EcssTmSenderCore,
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider, VerificationReporter: VerificationReportingProvider,
RequestConverter: PusTcToRequestConverter<ActiveRequestInfo, RequestType, Error = GenericConversionError>, RequestConverter: PusTcToRequestConverter<ActiveRequestInfo, RequestType, Error = GenericConversionError>,
ReplyHandler: PusReplyHandler<ActiveRequestInfo, ReplyType, Error = EcssTmtcError>, ReplyHandler: PusReplyHandler<ActiveRequestInfo, ReplyType, Error = EcssTmtcError>,
@ -223,9 +220,6 @@ impl<
ReplyType, ReplyType,
> >
PusTargetedRequestService< PusTargetedRequestService<
TcReceiver,
TmSender,
TcInMemConverter,
VerificationReporter, VerificationReporter,
RequestConverter, RequestConverter,
ReplyHandler, ReplyHandler,
@ -239,9 +233,9 @@ where
{ {
pub fn new( pub fn new(
service_helper: PusServiceHelper< service_helper: PusServiceHelper<
TcReceiver, MpscTcReceiver,
TmSender, MpscTmAsVecSender,
TcInMemConverter, EcssTcInVecConverter,
VerificationReporter, VerificationReporter,
>, >,
request_converter: RequestConverter, request_converter: RequestConverter,
@ -471,7 +465,7 @@ pub(crate) mod tests {
use satrs::{ use satrs::{
pus::{ pus::{
verification::test_util::TestVerificationReporter, ActivePusRequestStd, verification::test_util::TestVerificationReporter, ActivePusRequestStd,
ActiveRequestMapProvider, EcssTcInVecConverter, MpscTcReceiver, ActiveRequestMapProvider,
}, },
request::UniqueApidTargetId, request::UniqueApidTargetId,
spacepackets::{ spacepackets::{
@ -691,9 +685,6 @@ pub(crate) mod tests {
ReplyType, ReplyType,
> { > {
pub service: PusTargetedRequestService< pub service: PusTargetedRequestService<
MpscTcReceiver,
MpscTmAsVecSender,
EcssTcInVecConverter,
TestVerificationReporter, TestVerificationReporter,
RequestConverter, RequestConverter,
ReplyHandler, ReplyHandler,

View File

@ -1,11 +1,9 @@
// use crate::pus::mode::ModeServiceWrapper;
use crate::pus::test::TestCustomServiceWrapper; use crate::pus::test::TestCustomServiceWrapper;
use crate::pus::HandlingStatus; use crate::pus::HandlingStatus;
use derive_new::new; use derive_new::new;
use satrs::{ use satrs::spacepackets::time::{cds, TimeWriter};
pus::{EcssTcInMemConverter, EcssTmSenderCore},
spacepackets::time::{cds, TimeWriter}, use super::{action::ActionServiceWrapper, TargetedPusService};
};
// use super::{ // use super::{
// action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper, // action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper,
@ -14,18 +12,16 @@ use satrs::{
// }; // };
#[derive(new)] #[derive(new)]
pub struct PusStack<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter> { pub struct PusStack {
test_srv: TestCustomServiceWrapper<TmSender, TcInMemConverter>, test_srv: TestCustomServiceWrapper,
// hk_srv_wrapper: HkServiceWrapper<TmSender, TcInMemConverter>, // hk_srv_wrapper: HkServiceWrapper<TmSender, TcInMemConverter>,
// event_srv: EventServiceWrapper<TmSender, TcInMemConverter>, // event_srv: EventServiceWrapper<TmSender, TcInMemConverter>,
// action_srv_wrapper: ActionServiceWrapper<TmSender, TcInMemConverter>, action_srv_wrapper: ActionServiceWrapper,
// schedule_srv: SchedulingServiceWrapper<TmSender, TcInMemConverter>, // schedule_srv: SchedulingServiceWrapper<TmSender, TcInMemConverter>,
// mode_srv: ModeServiceWrapper<TmSender, TcInMemConverter>, // mode_srv: ModeServiceWrapper<TmSender, TcInMemConverter>,
} }
impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter> impl PusStack {
PusStack<TmSender, TcInMemConverter>
{
pub fn periodic_operation(&mut self) { pub fn periodic_operation(&mut self) {
// Release all telecommands which reached their release time before calling the service // Release all telecommands which reached their release time before calling the service
// handlers. // handlers.
@ -37,8 +33,8 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
loop { loop {
let mut nothing_to_do = true; let mut nothing_to_do = true;
let mut is_srv_finished = let mut is_srv_finished =
|tc_handling_done: bool, reply_handling_done: Option<HandlingStatus>| { |tc_handling_done: HandlingStatus, reply_handling_done: Option<HandlingStatus>| {
if !tc_handling_done if tc_handling_done == HandlingStatus::Empty
|| (reply_handling_done.is_some() || (reply_handling_done.is_some()
&& reply_handling_done.unwrap() == HandlingStatus::Empty) && reply_handling_done.unwrap() == HandlingStatus::Empty)
{ {
@ -48,13 +44,13 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
is_srv_finished(self.test_srv.poll_and_handle_next_packet(&time_stamp), None); is_srv_finished(self.test_srv.poll_and_handle_next_packet(&time_stamp), None);
// is_srv_finished(self.schedule_srv.poll_and_handle_next_tc(&time_stamp), None); // is_srv_finished(self.schedule_srv.poll_and_handle_next_tc(&time_stamp), None);
// is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None); // is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
// is_srv_finished( is_srv_finished(
// self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp), self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
// Some( Some(
// self.action_srv_wrapper self.action_srv_wrapper
// .poll_and_handle_next_reply(&time_stamp), .poll_and_handle_next_reply(&time_stamp),
// ), ),
// ); );
// is_srv_finished( // is_srv_finished(
// self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp), // self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
// Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)), // Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)),

View File

@ -6,8 +6,8 @@ use ops_sat_rs::config::tmtc_err;
use satrs::pus::test::PusService17TestHandler; use satrs::pus::test::PusService17TestHandler;
use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider};
use satrs::pus::{ use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, EcssTmSenderCore, MpscTcReceiver, EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender,
MpscTmAsVecSender, PusPacketHandlerResult, PusServiceHelper, PusTmAsVec, PusPacketHandlerResult, PusServiceHelper, PusTmAsVec,
}; };
use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusPacket; use satrs::spacepackets::ecss::PusPacket;
@ -15,11 +15,13 @@ use satrs::spacepackets::time::cds::CdsTime;
use satrs::spacepackets::time::TimeWriter; use satrs::spacepackets::time::TimeWriter;
use std::sync::mpsc; use std::sync::mpsc;
pub fn create_test_service_dynamic( use super::HandlingStatus;
pub fn create_test_service(
tm_funnel_tx: mpsc::Sender<PusTmAsVec>, tm_funnel_tx: mpsc::Sender<PusTmAsVec>,
// event_sender: mpsc::Sender<EventMessageU32>, // event_sender: mpsc::Sender<EventMessageU32>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>, pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
) -> TestCustomServiceWrapper<MpscTmAsVecSender, EcssTcInVecConverter> { ) -> TestCustomServiceWrapper {
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
PUS_TEST_SERVICE.id(), PUS_TEST_SERVICE.id(),
pus_test_rx, pus_test_rx,
@ -33,23 +35,22 @@ pub fn create_test_service_dynamic(
} }
} }
pub struct TestCustomServiceWrapper< pub struct TestCustomServiceWrapper {
TmSender: EcssTmSenderCore, pub handler: PusService17TestHandler<
TcInMemConverter: EcssTcInMemConverter, MpscTcReceiver,
> { MpscTmAsVecSender,
pub handler: EcssTcInVecConverter,
PusService17TestHandler<MpscTcReceiver, TmSender, TcInMemConverter, VerificationReporter>, VerificationReporter,
>,
// pub test_srv_event_sender: mpsc::Sender<EventMessageU32>, // pub test_srv_event_sender: mpsc::Sender<EventMessageU32>,
} }
impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter> impl TestCustomServiceWrapper {
TestCustomServiceWrapper<TmSender, TcInMemConverter> pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> HandlingStatus {
{
pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> bool {
let res = self.handler.poll_and_handle_next_tc(time_stamp); let res = self.handler.poll_and_handle_next_tc(time_stamp);
if res.is_err() { if res.is_err() {
warn!("PUS17 handler failed with error {:?}", res.unwrap_err()); warn!("PUS17 handler failed with error {:?}", res.unwrap_err());
return true; return HandlingStatus::Empty;
} }
match res.unwrap() { match res.unwrap() {
PusPacketHandlerResult::RequestHandled => { PusPacketHandlerResult::RequestHandled => {
@ -114,10 +115,8 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
.expect("Sending start failure verification failed"); .expect("Sending start failure verification failed");
} }
} }
PusPacketHandlerResult::Empty => { PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
return true;
} }
} HandlingStatus::HandledOne
false
} }
} }