reduce some boilerplate
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit

This commit is contained in:
2024-03-24 12:22:37 +01:00
parent b7ab1d5ea7
commit cacff05b3c
11 changed files with 141 additions and 73 deletions

View File

@ -7,7 +7,7 @@ use satrs::request::GenericMessage;
use satrs::ComponentId; use satrs::ComponentId;
use crate::pus::hk::HkReply; use crate::pus::hk::HkReply;
use crate::requests::CompositeRequestWithToken; use crate::requests::CompositeRequest;
pub trait SpiInterface { pub trait SpiInterface {
type Error; type Error;
@ -27,7 +27,7 @@ pub struct MgmHandler<ComInterface: SpiInterface, TmSender: EcssTmSenderCore> {
mode_request_receiver: mpsc::Receiver<GenericMessage<ModeRequest>>, mode_request_receiver: mpsc::Receiver<GenericMessage<ModeRequest>>,
mode_reply_sender_to_pus: mpsc::Sender<GenericMessage<ModeReply>>, mode_reply_sender_to_pus: mpsc::Sender<GenericMessage<ModeReply>>,
mode_reply_sender_to_parent: mpsc::Sender<GenericMessage<ModeReply>>, mode_reply_sender_to_parent: mpsc::Sender<GenericMessage<ModeReply>>,
composite_request_receiver: mpsc::Receiver<CompositeRequestWithToken>, composite_request_receiver: mpsc::Receiver<GenericMessage<CompositeRequest>>,
hk_reply_sender: mpsc::Sender<GenericMessage<HkReply>>, hk_reply_sender: mpsc::Sender<GenericMessage<HkReply>>,
hk_tm_sender: TmSender, hk_tm_sender: TmSender,
mode: ModeAndSubmode, mode: ModeAndSubmode,

View File

@ -17,7 +17,7 @@ use log::info;
use pus::test::create_test_service_dynamic; use pus::test::create_test_service_dynamic;
use satrs::hal::std::tcp_server::ServerConfig; use satrs::hal::std::tcp_server::ServerConfig;
use satrs::hal::std::udp_server::UdpTcServer; use satrs::hal::std::udp_server::UdpTcServer;
use satrs::request::TargetAndApidId; use satrs::request::{GenericMessage, TargetAndApidId};
use satrs::tmtc::tm_helper::SharedTmPool; use satrs::tmtc::tm_helper::SharedTmPool;
use satrs_example::config::pool::{create_sched_tc_pool, create_static_pools}; use satrs_example::config::pool::{create_sched_tc_pool, create_static_pools};
use satrs_example::config::tasks::{ use satrs_example::config::tasks::{
@ -38,7 +38,7 @@ use crate::pus::hk::{create_hk_service_dynamic, create_hk_service_static};
use crate::pus::scheduler::{create_scheduler_service_dynamic, create_scheduler_service_static}; use crate::pus::scheduler::{create_scheduler_service_dynamic, create_scheduler_service_static};
use crate::pus::test::create_test_service_static; use crate::pus::test::create_test_service_static;
use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::pus::{PusReceiver, PusTcMpscRouter};
use crate::requests::{CompositeRequestWithToken, GenericRequestRouter}; use crate::requests::{CompositeRequest, GenericRequestRouter};
use crate::tcp::{SyncTcpTmSource, TcpTask}; use crate::tcp::{SyncTcpTmSource, TcpTask};
use crate::tmtc::{ use crate::tmtc::{
PusTcSourceProviderSharedPool, SharedTcPool, TcSourceTaskDynamic, TcSourceTaskStatic, PusTcSourceProviderSharedPool, SharedTcPool, TcSourceTaskDynamic, TcSourceTaskStatic,
@ -86,7 +86,7 @@ fn static_tmtc_pool_main() {
)); ));
let acs_target_id = TargetAndApidId::new(PUS_APID, RequestTargetId::AcsSubsystem as u32); let acs_target_id = TargetAndApidId::new(PUS_APID, RequestTargetId::AcsSubsystem as u32);
let (acs_thread_tx, acs_thread_rx) = channel::<CompositeRequestWithToken>(); let (acs_thread_tx, acs_thread_rx) = channel::<GenericMessage<CompositeRequest>>();
// Some request are targetable. This map is used to retrieve sender handles based on a target ID. // Some request are targetable. This map is used to retrieve sender handles based on a target ID.
let mut request_map = GenericRequestRouter::default(); let mut request_map = GenericRequestRouter::default();
request_map.0.insert(acs_target_id.into(), acs_thread_tx); request_map.0.insert(acs_target_id.into(), acs_thread_tx);
@ -322,7 +322,7 @@ fn dyn_tmtc_pool_main() {
)); ));
let acs_target_id = TargetAndApidId::new(PUS_APID, RequestTargetId::AcsSubsystem as u32); let acs_target_id = TargetAndApidId::new(PUS_APID, RequestTargetId::AcsSubsystem as u32);
let (acs_thread_tx, acs_thread_rx) = channel::<CompositeRequestWithToken>(); let (acs_thread_tx, acs_thread_rx) = channel::<GenericMessage<CompositeRequest>>();
// Some request are targetable. This map is used to retrieve sender handles based on a target ID. // Some request are targetable. This map is used to retrieve sender handles based on a target ID.
let mut request_map = GenericRequestRouter::default(); let mut request_map = GenericRequestRouter::default();
request_map.0.insert(acs_target_id.into(), acs_thread_tx); request_map.0.insert(acs_target_id.into(), acs_thread_tx);

View File

@ -7,7 +7,7 @@ use satrs::pus::action::{
DefaultActiveActionRequestMap, DefaultActiveActionRequestMap,
}; };
use satrs::pus::verification::{ use satrs::pus::verification::{
FailParams, FailParamsWithStep, TcStateAccepted, FailParams, FailParamsWithStep, TcStateAccepted, TcStateStarted,
VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender, VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender,
VerificationReportingProvider, VerificationToken, VerificationReportingProvider, VerificationToken,
}; };
@ -63,12 +63,16 @@ impl PusReplyHandler<ActivePusActionRequestStd, ActionReplyPusWithActionId> for
time_stamp: &[u8], time_stamp: &[u8],
_tm_sender: &impl EcssTmSenderCore, _tm_sender: &impl EcssTmSenderCore,
) -> Result<bool, Self::Error> { ) -> Result<bool, Self::Error> {
let verif_token: VerificationToken<TcStateStarted> = active_request
.token()
.try_into()
.expect("invalid token state");
let remove_entry = match &reply.message.variant { let remove_entry = match &reply.message.variant {
ActionReplyPus::CompletionFailed { error_code, params } => { ActionReplyPus::CompletionFailed { error_code, params } => {
let fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?; let fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?;
verification_handler verification_handler
.completion_failure( .completion_failure(
active_request.token(), verif_token,
FailParams::new( FailParams::new(
time_stamp, time_stamp,
error_code, error_code,
@ -86,7 +90,7 @@ impl PusReplyHandler<ActivePusActionRequestStd, ActionReplyPusWithActionId> for
let fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?; let fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?;
verification_handler verification_handler
.step_failure( .step_failure(
active_request.token(), verif_token,
FailParamsWithStep::new( FailParamsWithStep::new(
time_stamp, time_stamp,
&EcssEnumU16::new(*step), &EcssEnumU16::new(*step),
@ -99,13 +103,13 @@ impl PusReplyHandler<ActivePusActionRequestStd, ActionReplyPusWithActionId> for
} }
ActionReplyPus::Completed => { ActionReplyPus::Completed => {
verification_handler verification_handler
.completion_success(active_request.token(), time_stamp) .completion_success(verif_token, time_stamp)
.map_err(|e| e.0)?; .map_err(|e| e.0)?;
true true
} }
ActionReplyPus::StepSuccess { step } => { ActionReplyPus::StepSuccess { step } => {
verification_handler.step_success( verification_handler.step_success(
&active_request.token(), &verif_token,
time_stamp, time_stamp,
EcssEnumU16::new(*step), EcssEnumU16::new(*step),
)?; )?;
@ -164,9 +168,6 @@ impl PusTcToRequestConverter<ActivePusActionRequestStd, ActionRequest>
let target_id_and_apid = TargetAndApidId::from_pus_tc(tc).unwrap(); let target_id_and_apid = TargetAndApidId::from_pus_tc(tc).unwrap();
let action_id = u32::from_be_bytes(user_data[4..8].try_into().unwrap()); let action_id = u32::from_be_bytes(user_data[4..8].try_into().unwrap());
if subservice == 128 { if subservice == 128 {
let token = verif_reporter
.start_success(token, time_stamp)
.expect("sending start success verification failed");
let req_variant = if user_data.len() == 8 { let req_variant = if user_data.len() == 8 {
ActionRequestVariant::NoData ActionRequestVariant::NoData
} else { } else {
@ -176,7 +177,7 @@ impl PusTcToRequestConverter<ActivePusActionRequestStd, ActionRequest>
ActivePusActionRequestStd::new( ActivePusActionRequestStd::new(
action_id, action_id,
target_id_and_apid.into(), target_id_and_apid.into(),
token, token.into(),
Duration::from_secs(30), Duration::from_secs(30),
), ),
ActionRequest::new(action_id, req_variant), ActionRequest::new(action_id, req_variant),
@ -219,6 +220,7 @@ pub fn create_action_service_static(
pus_action_rx, pus_action_rx,
); );
let action_request_handler = PusTargetedRequestService::new( let action_request_handler = PusTargetedRequestService::new(
ComponentIdList::PusAction as ComponentId,
PusServiceHelper::new( PusServiceHelper::new(
action_srv_receiver, action_srv_receiver,
action_srv_tm_sender.clone(), action_srv_tm_sender.clone(),
@ -262,6 +264,7 @@ pub fn create_action_service_dynamic(
pus_action_rx, pus_action_rx,
); );
let action_request_handler = PusTargetedRequestService::new( let action_request_handler = PusTargetedRequestService::new(
ComponentIdList::PusAction as ComponentId,
PusServiceHelper::new( PusServiceHelper::new(
action_srv_receiver, action_srv_receiver,
action_srv_tm_sender.clone(), action_srv_tm_sender.clone(),
@ -400,6 +403,7 @@ mod tests {
let action_srv_receiver = MpscTcReceiver::new(0, "TESTBENCH", pus_action_rx); let action_srv_receiver = MpscTcReceiver::new(0, "TESTBENCH", pus_action_rx);
Self { Self {
service: PusTargetedRequestService::new( service: PusTargetedRequestService::new(
0,
PusServiceHelper::new( PusServiceHelper::new(
action_srv_receiver, action_srv_receiver,
action_srv_tm_sender.clone(), action_srv_tm_sender.clone(),
@ -514,7 +518,7 @@ mod tests {
let possible_req = testbench.request_rx.try_recv(); let possible_req = testbench.request_rx.try_recv();
assert!(possible_req.is_ok()); assert!(possible_req.is_ok());
let req = possible_req.unwrap(); let req = possible_req.unwrap();
if let CompositeRequest::Action(action_req) = req.targeted_request.message { if let CompositeRequest::Action(action_req) = req.message {
assert_eq!(action_req.action_id, action_id); assert_eq!(action_req.action_id, action_id);
assert_eq!(action_req.variant, ActionRequestVariant::NoData); assert_eq!(action_req.variant, ActionRequestVariant::NoData);
let action_reply = let action_reply =
@ -522,7 +526,7 @@ mod tests {
testbench testbench
.reply_tx .reply_tx
.send(GenericMessage::new( .send(GenericMessage::new(
req.targeted_request.request_id, req.request_id,
TARGET_ID.into(), TARGET_ID.into(),
action_reply, action_reply,
)) ))
@ -536,4 +540,24 @@ mod tests {
testbench.verify_packet_verification(7); testbench.verify_packet_verification(7);
testbench.verify_tm_empty(); testbench.verify_tm_empty();
} }
#[test]
fn test_basic_request_routing_error() {
let mut testbench = TargetedPusRequestTestbench::new_for_action();
// Create a basic action request and verify forwarding.
let mut sp_header = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap();
let sec_header = PusTcSecondaryHeader::new_simple(8, 128);
let action_id = 5_u32;
let mut app_data: [u8; 8] = [0; 8];
// Invalid ID, routing should fail.
app_data[0..4].copy_from_slice(&(TEST_APID_TARGET_ID + 1).to_be_bytes());
app_data[4..8].copy_from_slice(&action_id.to_be_bytes());
let pus8_packet = PusTcCreator::new(&mut sp_header, sec_header, &app_data, true);
testbench.add_tc(&pus8_packet);
let time_stamp: [u8; 7] = [0; 7];
let result = testbench.service.poll_and_handle_next_tc(&time_stamp);
assert!(result.is_err());
// Verify the correct result and completion failure.
}
} }

View File

@ -2,8 +2,9 @@ use log::{error, warn};
use satrs::hk::{CollectionIntervalFactor, HkRequest}; use satrs::hk::{CollectionIntervalFactor, HkRequest};
use satrs::pool::{SharedStaticMemoryPool, StoreAddr}; use satrs::pool::{SharedStaticMemoryPool, StoreAddr};
use satrs::pus::verification::{ use satrs::pus::verification::{
FailParams, TcStateAccepted, VerificationReporterWithSharedPoolMpscBoundedSender, FailParams, TcStateAccepted, TcStateStarted,
VerificationReporterWithVecMpscSender, VerificationReportingProvider, VerificationToken, VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender,
VerificationReportingProvider, VerificationToken,
}; };
use satrs::pus::{ use satrs::pus::{
ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken, ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken,
@ -55,10 +56,14 @@ impl PusReplyHandler<ActivePusRequestStd, HkReply> for HkReplyHandler {
time_stamp: &[u8], time_stamp: &[u8],
_tm_sender: &impl EcssTmSenderCore, _tm_sender: &impl EcssTmSenderCore,
) -> Result<bool, Self::Error> { ) -> Result<bool, Self::Error> {
let started_token: VerificationToken<TcStateStarted> = active_request
.token()
.try_into()
.expect("invalid token state");
match reply.message { match reply.message {
HkReply::Ack => { HkReply::Ack => {
verification_handler verification_handler
.completion_success(active_request.token(), time_stamp) .completion_success(started_token, time_stamp)
.expect("sending completio success verification failed"); .expect("sending completio success verification failed");
} }
}; };
@ -201,10 +206,6 @@ impl PusTcToRequestConverter<ActivePusRequestStd, HkRequest> for ExampleHkReques
return Err(GenericConversionError::InvalidSubservice(subservice)); return Err(GenericConversionError::InvalidSubservice(subservice));
} }
}; };
let token = verif_reporter
.start_success(token, time_stamp)
.expect("Sending start success verification failed");
Ok(( Ok((
ActivePusRequestStd::new(target_id_and_apid.into(), token, self.timeout), ActivePusRequestStd::new(target_id_and_apid.into(), token, self.timeout),
request, request,
@ -238,6 +239,7 @@ pub fn create_hk_service_static(
pus_hk_rx, pus_hk_rx,
); );
let pus_3_handler = PusTargetedRequestService::new( let pus_3_handler = PusTargetedRequestService::new(
ComponentIdList::PusHk as ComponentId,
PusServiceHelper::new( PusServiceHelper::new(
hk_srv_receiver, hk_srv_receiver,
hk_srv_tm_sender, hk_srv_tm_sender,
@ -279,6 +281,7 @@ pub fn create_hk_service_dynamic(
pus_hk_rx, pus_hk_rx,
); );
let pus_3_handler = PusTargetedRequestService::new( let pus_3_handler = PusTargetedRequestService::new(
ComponentIdList::PusHk as ComponentId,
PusServiceHelper::new( PusServiceHelper::new(
hk_srv_receiver, hk_srv_receiver,
hk_srv_tm_sender, hk_srv_tm_sender,
@ -362,3 +365,8 @@ impl<
self.service.check_for_request_timeouts(); self.service.check_for_request_timeouts();
} }
} }
#[cfg(test)]
pub mod tests {
// TODO: Add unittests for HK converter.
}

View File

@ -2,7 +2,8 @@ use crate::requests::GenericRequestRouter;
use crate::tmtc::MpscStoreAndSendError; use crate::tmtc::MpscStoreAndSendError;
use log::warn; use log::warn;
use satrs::pus::verification::{ use satrs::pus::verification::{
self, FailParams, TcStateAccepted, VerificationReportingProvider, VerificationToken, self, FailParams, TcStateAccepted, TcStateStarted, VerificationReportingProvider,
VerificationToken,
}; };
use satrs::pus::{ use satrs::pus::{
ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter,
@ -16,6 +17,7 @@ use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusServiceId; use satrs::spacepackets::ecss::PusServiceId;
use satrs::spacepackets::time::cds::TimeProvider; use satrs::spacepackets::time::cds::TimeProvider;
use satrs::spacepackets::time::TimeWriter; use satrs::spacepackets::time::TimeWriter;
use satrs::ComponentId;
use satrs_example::config::{tmtc_err, CustomPusServiceId}; use satrs_example::config::{tmtc_err, CustomPusServiceId};
use std::fmt::Debug; use std::fmt::Debug;
use std::sync::mpsc::{self, Sender}; use std::sync::mpsc::{self, Sender};
@ -120,6 +122,7 @@ pub struct PusTargetedRequestService<
RequestType, RequestType,
ReplyType, ReplyType,
> { > {
pub id: ComponentId,
pub service_helper: pub service_helper:
PusServiceHelper<TcReceiver, TmSender, TcInMemConverter, VerificationReporter>, PusServiceHelper<TcReceiver, TmSender, TcInMemConverter, VerificationReporter>,
pub request_router: GenericRequestRouter, pub request_router: GenericRequestRouter,
@ -158,6 +161,7 @@ where
GenericRequestRouter: PusRequestRouter<RequestType, Error = GenericRoutingError>, GenericRequestRouter: PusRequestRouter<RequestType, Error = GenericRoutingError>,
{ {
pub fn new( pub fn new(
id: ComponentId,
service_helper: PusServiceHelper< service_helper: PusServiceHelper<
TcReceiver, TcReceiver,
TmSender, TmSender,
@ -171,6 +175,7 @@ where
reply_receiver: mpsc::Receiver<GenericMessage<ReplyType>>, reply_receiver: mpsc::Receiver<GenericMessage<ReplyType>>,
) -> Self { ) -> Self {
Self { Self {
id,
service_helper, service_helper,
request_converter, request_converter,
active_request_map, active_request_map,
@ -194,7 +199,7 @@ where
.tc_in_mem_converter_mut() .tc_in_mem_converter_mut()
.cache(&ecss_tc_and_token.tc_in_memory)?; .cache(&ecss_tc_and_token.tc_in_memory)?;
let tc = self.service_helper.tc_in_mem_converter().convert()?; let tc = self.service_helper.tc_in_mem_converter().convert()?;
let (request_info, request) = match self.request_converter.convert( let (mut request_info, request) = match self.request_converter.convert(
ecss_tc_and_token.token, ecss_tc_and_token.token,
&tc, &tc,
time_stamp, time_stamp,
@ -206,14 +211,24 @@ where
return Err(e.into()); return Err(e.into());
} }
}; };
let accepted_token: VerificationToken<TcStateAccepted> = request_info
.token()
.try_into()
.expect("token not in expected accepted state");
let verif_request_id = verification::RequestId::new(&tc).raw(); let verif_request_id = verification::RequestId::new(&tc).raw();
match self.request_router.route( match self.request_router.route(
request_info.target_id(),
verif_request_id, verif_request_id,
self.id,
request_info.target_id(),
request, request,
request_info.token(),
) { ) {
Ok(()) => { Ok(()) => {
let started_token = self
.service_helper
.verif_reporter()
.start_success(accepted_token, time_stamp)
.expect("Start success failure");
request_info.set_token(started_token.into());
self.active_request_map self.active_request_map
.insert(&verif_request_id, request_info); .insert(&verif_request_id, request_info);
} }
@ -221,10 +236,11 @@ where
self.request_router.handle_error_generic( self.request_router.handle_error_generic(
&request_info, &request_info,
&tc, &tc,
e, e.clone(),
time_stamp, time_stamp,
self.service_helper.verif_reporter(), self.service_helper.verif_reporter(),
); );
return Err(e.into());
} }
} }
Ok(PusPacketHandlerResult::RequestHandled) Ok(PusPacketHandlerResult::RequestHandled)
@ -336,6 +352,8 @@ where
} }
} }
/// Generic timeout handling: Handle the verification failure with a dedicated return code
/// and also log the error.
pub fn generic_pus_request_timeout_handler( pub fn generic_pus_request_timeout_handler(
active_request: &(impl ActiveRequestProvider + Debug), active_request: &(impl ActiveRequestProvider + Debug),
verification_handler: &impl VerificationReportingProvider, verification_handler: &impl VerificationReportingProvider,
@ -343,9 +361,13 @@ pub fn generic_pus_request_timeout_handler(
service_str: &'static str, service_str: &'static str,
) -> Result<(), EcssTmtcError> { ) -> Result<(), EcssTmtcError> {
log::warn!("timeout for active request {active_request:?} on {service_str} service"); log::warn!("timeout for active request {active_request:?} on {service_str} service");
let started_token: VerificationToken<TcStateStarted> = active_request
.token()
.try_into()
.expect("token not in expected started state");
verification_handler verification_handler
.completion_failure( .completion_failure(
active_request.token(), started_token,
FailParams::new( FailParams::new(
time_stamp, time_stamp,
&satrs_example::config::tmtc_err::REQUEST_TIMEOUT, &satrs_example::config::tmtc_err::REQUEST_TIMEOUT,
@ -452,7 +474,7 @@ pub(crate) mod tests {
request::TargetAndApidId, request::TargetAndApidId,
}; };
use crate::requests::CompositeRequestWithToken; use crate::requests::CompositeRequest;
use super::*; use super::*;
@ -484,6 +506,6 @@ pub(crate) mod tests {
pub tm_funnel_rx: mpsc::Receiver<Vec<u8>>, pub tm_funnel_rx: mpsc::Receiver<Vec<u8>>,
pub pus_packet_tx: mpsc::Sender<EcssTcAndToken>, pub pus_packet_tx: mpsc::Sender<EcssTcAndToken>,
pub reply_tx: mpsc::Sender<GenericMessage<ReplyType>>, pub reply_tx: mpsc::Sender<GenericMessage<ReplyType>>,
pub request_rx: mpsc::Receiver<CompositeRequestWithToken>, pub request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
} }
} }

View File

@ -0,0 +1 @@
// TODO: Add PUS mode service, similar to action and HK service.

View File

@ -5,7 +5,7 @@ use log::warn;
use satrs::action::ActionRequest; use satrs::action::ActionRequest;
use satrs::hk::HkRequest; use satrs::hk::HkRequest;
use satrs::pus::verification::{ use satrs::pus::verification::{
FailParams, TcStateStarted, VerificationReportingProvider, VerificationToken, FailParams, TcStateAccepted, VerificationReportingProvider, VerificationToken,
}; };
use satrs::pus::{ActiveRequestProvider, GenericRoutingError, PusRequestRouter}; use satrs::pus::{ActiveRequestProvider, GenericRoutingError, PusRequestRouter};
use satrs::queue::GenericSendError; use satrs::queue::GenericSendError;
@ -22,12 +22,12 @@ pub enum CompositeRequest {
Action(ActionRequest), Action(ActionRequest),
} }
#[derive(Clone, Debug)] //#[derive(Clone, Debug)]
pub struct CompositeRequestWithToken { // pub struct CompositeRequestWithToken {
pub(crate) targeted_request: GenericMessage<CompositeRequest>, //pub(crate) targeted_request: GenericMessage<CompositeRequest>,
pub(crate) token: VerificationToken<TcStateStarted>, //}
}
/*
impl CompositeRequestWithToken { impl CompositeRequestWithToken {
pub fn new( pub fn new(
target_id: ComponentId, target_id: ComponentId,
@ -41,9 +41,12 @@ impl CompositeRequestWithToken {
} }
} }
} }
*/
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct GenericRequestRouter(pub HashMap<ComponentId, mpsc::Sender<CompositeRequestWithToken>>); pub struct GenericRequestRouter(
pub HashMap<ComponentId, mpsc::Sender<GenericMessage<CompositeRequest>>>,
);
impl GenericRequestRouter { impl GenericRequestRouter {
pub(crate) fn handle_error_generic( pub(crate) fn handle_error_generic(
@ -58,13 +61,17 @@ impl GenericRequestRouter {
"Routing request for service {} failed: {error:?}", "Routing request for service {} failed: {error:?}",
tc.service() tc.service()
); );
let accepted_token: VerificationToken<TcStateAccepted> = active_request
.token()
.try_into()
.expect("token is not in accepted state");
match error { match error {
GenericRoutingError::UnknownTargetId(id) => { GenericRoutingError::UnknownTargetId(id) => {
let mut fail_data: [u8; 8] = [0; 8]; let mut fail_data: [u8; 8] = [0; 8];
fail_data.copy_from_slice(&id.to_be_bytes()); fail_data.copy_from_slice(&id.to_be_bytes());
verif_reporter verif_reporter
.completion_failure( .completion_failure(
active_request.token(), accepted_token,
FailParams::new(time_stamp, &tmtc_err::UNKNOWN_TARGET_ID, &fail_data), FailParams::new(time_stamp, &tmtc_err::UNKNOWN_TARGET_ID, &fail_data),
) )
.expect("Sending start failure failed"); .expect("Sending start failure failed");
@ -74,7 +81,7 @@ impl GenericRequestRouter {
fail_data.copy_from_slice(&active_request.target_id().to_be_bytes()); fail_data.copy_from_slice(&active_request.target_id().to_be_bytes());
verif_reporter verif_reporter
.completion_failure( .completion_failure(
active_request.token(), accepted_token,
FailParams::new(time_stamp, &tmtc_err::ROUTING_ERROR, &fail_data), FailParams::new(time_stamp, &tmtc_err::ROUTING_ERROR, &fail_data),
) )
.expect("Sending start failure failed"); .expect("Sending start failure failed");
@ -87,22 +94,22 @@ impl PusRequestRouter<HkRequest> for GenericRequestRouter {
fn route( fn route(
&self, &self,
target_id: ComponentId,
request_id: RequestId, request_id: RequestId,
source_id: ComponentId,
target_id: ComponentId,
hk_request: HkRequest, hk_request: HkRequest,
token: VerificationToken<TcStateStarted>,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
if let Some(sender) = self.0.get(&target_id) { if let Some(sender) = self.0.get(&target_id) {
sender sender
.send(CompositeRequestWithToken::new( .send(GenericMessage::new(
target_id,
request_id, request_id,
source_id,
CompositeRequest::Hk(hk_request), CompositeRequest::Hk(hk_request),
token,
)) ))
.map_err(|_| GenericRoutingError::Send(GenericSendError::RxDisconnected))?; .map_err(|_| GenericRoutingError::Send(GenericSendError::RxDisconnected))?;
return Ok(());
} }
Ok(()) Err(GenericRoutingError::UnknownTargetId(target_id))
} }
} }
@ -111,22 +118,21 @@ impl PusRequestRouter<ActionRequest> for GenericRequestRouter {
fn route( fn route(
&self, &self,
target_id: ComponentId,
request_id: RequestId, request_id: RequestId,
source_id: ComponentId,
target_id: ComponentId,
action_request: ActionRequest, action_request: ActionRequest,
token: VerificationToken<TcStateStarted>,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
if let Some(sender) = self.0.get(&target_id) { if let Some(sender) = self.0.get(&target_id) {
println!("routed action request");
sender sender
.send(CompositeRequestWithToken::new( .send(GenericMessage::new(
target_id,
request_id, request_id,
source_id,
CompositeRequest::Action(action_request), CompositeRequest::Action(action_request),
token,
)) ))
.map_err(|_| GenericRoutingError::Send(GenericSendError::RxDisconnected))?; .map_err(|_| GenericRoutingError::Send(GenericSendError::RxDisconnected))?;
return Ok(());
} }
Ok(()) Err(GenericRoutingError::UnknownTargetId(target_id))
} }
} }

View File

@ -142,7 +142,10 @@ pub mod std_mod {
use std::sync::mpsc; use std::sync::mpsc;
use crate::{ use crate::{
pus::{verification, DefaultActiveRequestMap}, pus::{
verification::{self, TcStateToken},
DefaultActiveRequestMap,
},
ComponentId, ComponentId,
}; };
@ -158,7 +161,8 @@ pub mod std_mod {
delegate! { delegate! {
to self.common { to self.common {
fn target_id(&self) -> ComponentId; fn target_id(&self) -> ComponentId;
fn token(&self) -> VerificationToken<verification::TcStateStarted>; fn token(&self) -> verification::TcStateToken;
fn set_token(&mut self, token: verification::TcStateToken);
fn has_timed_out(&self) -> bool; fn has_timed_out(&self) -> bool;
fn timeout(&self) -> core::time::Duration; fn timeout(&self) -> core::time::Duration;
} }
@ -169,7 +173,7 @@ pub mod std_mod {
pub fn new( pub fn new(
action_id: ActionId, action_id: ActionId,
target_id: ComponentId, target_id: ComponentId,
token: VerificationToken<verification::TcStateStarted>, token: TcStateToken,
timeout: core::time::Duration, timeout: core::time::Duration,
) -> Self { ) -> Self {
Self { Self {

View File

@ -269,7 +269,7 @@ mod tests {
} }
impl EcssChannel for TestSender { impl EcssChannel for TestSender {
fn channel_id(&self) -> ComponentId { fn id(&self) -> ComponentId {
0 0
} }
} }

View File

@ -144,7 +144,7 @@ impl Error for EcssTmtcError {
} }
pub trait EcssChannel: Send { pub trait EcssChannel: Send {
/// Each sender can have an ID associated with it /// Each sender can have an ID associated with it
fn channel_id(&self) -> ComponentId; fn id(&self) -> ComponentId;
fn name(&self) -> &'static str { fn name(&self) -> &'static str {
"unset" "unset"
} }
@ -288,7 +288,8 @@ pub trait ActiveRequestMapProvider<V>: Sized {
pub trait ActiveRequestProvider { pub trait ActiveRequestProvider {
fn target_id(&self) -> ComponentId; fn target_id(&self) -> ComponentId;
fn token(&self) -> VerificationToken<TcStateStarted>; fn token(&self) -> TcStateToken;
fn set_token(&mut self, token: TcStateToken);
fn has_timed_out(&self) -> bool; fn has_timed_out(&self) -> bool;
fn timeout(&self) -> Duration; fn timeout(&self) -> Duration;
} }
@ -300,10 +301,10 @@ pub trait PusRequestRouter<Request> {
fn route( fn route(
&self, &self,
target_id: ComponentId,
request_id: RequestId, request_id: RequestId,
source_id: ComponentId,
target_id: ComponentId,
request: Request, request: Request,
token: VerificationToken<TcStateStarted>,
) -> Result<(), Self::Error>; ) -> Result<(), Self::Error>;
} }
@ -658,7 +659,6 @@ pub mod std_mod {
use spacepackets::ecss::WritablePusPacket; use spacepackets::ecss::WritablePusPacket;
use spacepackets::time::StdTimestampError; use spacepackets::time::StdTimestampError;
use spacepackets::ByteConversionError; use spacepackets::ByteConversionError;
use std::println;
use std::string::String; use std::string::String;
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::mpsc::TryRecvError; use std::sync::mpsc::TryRecvError;
@ -667,7 +667,7 @@ pub mod std_mod {
#[cfg(feature = "crossbeam")] #[cfg(feature = "crossbeam")]
pub use cb_mod::*; pub use cb_mod::*;
use super::verification::{TcStateStarted, VerificationReportingProvider}; use super::verification::{TcStateStarted, TcStateToken, VerificationReportingProvider};
use super::{AcceptedEcssTcAndToken, ActiveRequestProvider, TcInMemory}; use super::{AcceptedEcssTcAndToken, ActiveRequestProvider, TcInMemory};
impl From<mpsc::SendError<StoreAddr>> for EcssTmtcError { impl From<mpsc::SendError<StoreAddr>> for EcssTmtcError {
@ -733,7 +733,7 @@ pub mod std_mod {
} }
impl<Sender: EcssTmSenderCore> EcssChannel for TmInSharedPoolSenderWithId<Sender> { impl<Sender: EcssTmSenderCore> EcssChannel for TmInSharedPoolSenderWithId<Sender> {
fn channel_id(&self) -> ComponentId { fn id(&self) -> ComponentId {
self.channel_id self.channel_id
} }
@ -802,7 +802,7 @@ pub mod std_mod {
} }
impl<Sender: EcssTmSenderCore> EcssChannel for TmAsVecSenderWithId<Sender> { impl<Sender: EcssTmSenderCore> EcssChannel for TmAsVecSenderWithId<Sender> {
fn channel_id(&self) -> ComponentId { fn id(&self) -> ComponentId {
self.id self.id
} }
fn name(&self) -> &'static str { fn name(&self) -> &'static str {
@ -826,7 +826,7 @@ pub mod std_mod {
} }
impl EcssChannel for MpscTcReceiver { impl EcssChannel for MpscTcReceiver {
fn channel_id(&self) -> ComponentId { fn id(&self) -> ComponentId {
self.id self.id
} }
@ -840,7 +840,7 @@ pub mod std_mod {
self.receiver.try_recv().map_err(|e| match e { self.receiver.try_recv().map_err(|e| match e {
TryRecvError::Empty => TryRecvTmtcError::Empty, TryRecvError::Empty => TryRecvTmtcError::Empty,
TryRecvError::Disconnected => TryRecvTmtcError::Tmtc(EcssTmtcError::from( TryRecvError::Disconnected => TryRecvTmtcError::Tmtc(EcssTmtcError::from(
GenericReceiveError::TxDisconnected(Some(self.channel_id())), GenericReceiveError::TxDisconnected(Some(self.id())),
)), )),
}) })
} }
@ -1056,7 +1056,7 @@ pub mod std_mod {
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct ActivePusRequestStd { pub struct ActivePusRequestStd {
target_id: ComponentId, target_id: ComponentId,
token: VerificationToken<TcStateStarted>, token: TcStateToken,
start_time: std::time::Instant, start_time: std::time::Instant,
timeout: Duration, timeout: Duration,
} }
@ -1064,12 +1064,12 @@ pub mod std_mod {
impl ActivePusRequestStd { impl ActivePusRequestStd {
pub fn new( pub fn new(
target_id: ComponentId, target_id: ComponentId,
token: VerificationToken<TcStateStarted>, token: impl Into<TcStateToken>,
timeout: Duration, timeout: Duration,
) -> Self { ) -> Self {
Self { Self {
target_id, target_id,
token, token: token.into(),
start_time: std::time::Instant::now(), start_time: std::time::Instant::now(),
timeout, timeout,
} }
@ -1080,13 +1080,16 @@ pub mod std_mod {
fn target_id(&self) -> ComponentId { fn target_id(&self) -> ComponentId {
self.target_id self.target_id
} }
fn token(&self) -> VerificationToken<TcStateStarted> { fn token(&self) -> TcStateToken {
self.token self.token
} }
fn timeout(&self) -> Duration { fn timeout(&self) -> Duration {
self.timeout self.timeout
} }
fn set_token(&mut self, token: TcStateToken) {
self.token = token;
}
fn has_timed_out(&self) -> bool { fn has_timed_out(&self) -> bool {
std::time::Instant::now() - self.start_time > self.timeout std::time::Instant::now() - self.start_time > self.timeout

View File

@ -1708,7 +1708,7 @@ pub mod tests {
} }
impl EcssChannel for TestSender { impl EcssChannel for TestSender {
fn channel_id(&self) -> ComponentId { fn id(&self) -> ComponentId {
0 0
} }
fn name(&self) -> &'static str { fn name(&self) -> &'static str {