diff --git a/satrs-example/src/pus/action.rs b/satrs-example/src/pus/action.rs index 8f056d9..c2bfc1f 100644 --- a/satrs-example/src/pus/action.rs +++ b/satrs-example/src/pus/action.rs @@ -3,8 +3,8 @@ use satrs::action::{ActionRequest, ActionRequestVariant}; use satrs::params::WritableToBeBytes; use satrs::pool::{SharedStaticMemoryPool, StoreAddr}; use satrs::pus::action::{ - ActionReplyPus, ActionReplyPusWithActionId, ActionRequestWithId, ActivePusActionRequestStd, - DefaultActiveActionRequestMap, + ActionReplyPus, ActionReplyPusWithActionId, ActionRequestWithId, ActionRequestorBoundedMpsc, + ActionRequestorMpsc, ActivePusActionRequestStd, DefaultActiveActionRequestMap, }; use satrs::pus::verification::{ self, FailParams, FailParamsWithStep, TcStateAccepted, @@ -18,7 +18,7 @@ use satrs::pus::{ PusTcToRequestConverter, TmAsVecSenderWithId, TmAsVecSenderWithMpsc, TmInSharedPoolSenderWithBoundedMpsc, TmInSharedPoolSenderWithId, }; -use satrs::request::{GenericMessage, TargetAndApidId}; +use satrs::request::{GenericMessage, MessageReceiver, MessageSender, TargetAndApidId}; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket}; use satrs::tmtc::tm_helper::SharedTmPool; @@ -29,7 +29,7 @@ use std::time::Duration; use crate::requests::GenericRequestRouter; -use super::PusTargetedRequestService; +use super::{generic_pus_request_timeout_handler, PusTargetedRequestService}; pub struct ActionReplyHandler { fail_data_buf: [u8; 128], @@ -50,8 +50,9 @@ impl PusReplyHandler for &mut self, reply: &GenericMessage, _tm_sender: &impl EcssTmSenderCore, - ) { + ) -> Result<(), Self::Error> { log::warn!("received unexpected reply for service 8: {reply:?}"); + Ok(()) } fn handle_reply( @@ -115,15 +116,20 @@ impl PusReplyHandler for Ok(remove_entry) } - /* - fn timeout_callback(&self, active_request: &ActiveRequestType) { - log::warn!("timeout for active request {active_request} on service {SERVICE}"); + fn handle_request_timeout( + &mut self, + active_request: &ActivePusActionRequestStd, + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + _tm_sender: &impl EcssTmSenderCore, + ) -> Result<(), Self::Error> { + generic_pus_request_timeout_handler( + active_request, + verification_handler, + time_stamp, + "action", + ) } - - fn timeout_error_code(&self) -> satrs::res_code::ResultU16 { - REQUEST_TIMEOUT - } - */ } #[derive(Default)] @@ -194,12 +200,13 @@ pub fn create_action_service_static( verif_reporter: VerificationReporterWithSharedPoolMpscBoundedSender, tc_pool: SharedStaticMemoryPool, pus_action_rx: mpsc::Receiver, - action_router: GenericRequestRouter, + action_router: ActionRequestorBoundedMpsc, ) -> Pus8Wrapper< MpscTcReceiver, TmInSharedPoolSenderWithBoundedMpsc, EcssTcInSharedStoreConverter, VerificationReporterWithSharedPoolMpscBoundedSender, + mpsc::SyncSender>, > { let action_srv_tm_sender = TmInSharedPoolSenderWithId::new( TmSenderId::PusAction as ChannelId, @@ -221,11 +228,11 @@ pub fn create_action_service_static( EcssTcInSharedStoreConverter::new(tc_pool.clone(), 2048), ), ExampleActionRequestConverter::default(), - action_router, // TODO: Implementation which does not use run-time allocation? Maybe something like // a bounded wrapper which pre-allocates using [HashMap::with_capacity].. DefaultActiveActionRequestMap::default(), ActionReplyHandler::default(), + action_router, ); Pus8Wrapper { action_request_handler, @@ -236,12 +243,13 @@ pub fn create_action_service_dynamic( tm_funnel_tx: mpsc::Sender>, verif_reporter: VerificationReporterWithVecMpscSender, pus_action_rx: mpsc::Receiver, - action_router: GenericRequestRouter, + action_router: ActionRequestorMpsc, ) -> Pus8Wrapper< MpscTcReceiver, TmAsVecSenderWithMpsc, EcssTcInVecConverter, VerificationReporterWithVecMpscSender, + mpsc::Sender>, > { let action_srv_tm_sender = TmAsVecSenderWithId::new( TmSenderId::PusAction as ChannelId, @@ -262,42 +270,21 @@ pub fn create_action_service_dynamic( EcssTcInVecConverter::default(), ), ExampleActionRequestConverter::default(), - action_router, DefaultActiveActionRequestMap::default(), ActionReplyHandler::default(), + action_router, ); Pus8Wrapper { action_request_handler, } } -/* -#[derive(Default)] -pub struct PusActionReplyHook {} - -impl ReplyHandlerHook for PusActionReplyHook { - fn handle_unexpected_reply( - &mut self, - reply: &satrs::request::GenericMessage, - ) { - println!("received unexpected action reply {:?}", reply); - } - - fn timeout_callback(&self, active_request: &ActivePusActionRequest) { - println!("active request {active_request:?} timed out"); - } - - fn timeout_error_code(&self) -> satrs::res_code::ResultU16 { - todo!() - } -} -*/ - pub struct Pus8Wrapper< TcReceiver: EcssTcReceiverCore, TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, + RequestSender: MessageSender, > { pub(crate) action_request_handler: PusTargetedRequestService< TcReceiver, @@ -308,7 +295,8 @@ pub struct Pus8Wrapper< ActionReplyHandler, DefaultActiveActionRequestMap, ActivePusActionRequestStd, - ActionRequestWithId, + RequestSender, + ActionRequest, ActionReplyPusWithActionId, >, } @@ -318,7 +306,8 @@ impl< TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, - > Pus8Wrapper + RequestSender: MessageSender, + > Pus8Wrapper { pub fn handle_next_packet(&mut self, time_stamp: &[u8]) -> bool { match self.action_request_handler.handle_one_tc(time_stamp) { diff --git a/satrs-example/src/pus/hk.rs b/satrs-example/src/pus/hk.rs index ed151be..30eed3c 100644 --- a/satrs-example/src/pus/hk.rs +++ b/satrs-example/src/pus/hk.rs @@ -8,9 +8,10 @@ use satrs::pus::verification::{ use satrs::pus::{ ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcReceiverCore, - EcssTmSenderCore, MpscTcReceiver, PusPacketHandlerResult, PusPacketHandlingError, - PusReplyHandler, PusServiceHelper, PusTcToRequestConverter, TmAsVecSenderWithId, - TmAsVecSenderWithMpsc, TmInSharedPoolSenderWithBoundedMpsc, TmInSharedPoolSenderWithId, + EcssTmSenderCore, EcssTmtcError, MpscTcReceiver, PusPacketHandlerResult, + PusPacketHandlingError, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter, + TmAsVecSenderWithId, TmAsVecSenderWithMpsc, TmInSharedPoolSenderWithBoundedMpsc, + TmInSharedPoolSenderWithId, }; use satrs::request::TargetAndApidId; use satrs::spacepackets::ecss::tc::PusTcReader; @@ -21,6 +22,7 @@ use satrs_example::config::{hk_err, tmtc_err, TcReceiverId, TmSenderId, PUS_APID use std::sync::mpsc::{self}; use std::time::Duration; +use crate::pus::generic_pus_request_timeout_handler; use crate::requests::GenericRequestRouter; use super::PusTargetedRequestService; @@ -34,14 +36,15 @@ pub enum HkReply { pub struct HkReplyHandler {} impl PusReplyHandler for HkReplyHandler { - type Error = (); + type Error = EcssTmtcError; fn handle_unexpected_reply( &mut self, reply: &satrs::request::GenericMessage, _tm_sender: &impl EcssTmSenderCore, - ) { + ) -> Result<(), Self::Error> { log::warn!("received unexpected reply for service 3: {reply:?}"); + Ok(()) } fn handle_reply( @@ -61,6 +64,16 @@ impl PusReplyHandler for HkReplyHandler { }; Ok(true) } + + fn handle_request_timeout( + &mut self, + active_request: &ActivePusRequestStd, + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + _tm_sender: &impl EcssTmSenderCore, + ) -> Result<(), Self::Error> { + generic_pus_request_timeout_handler(active_request, verification_handler, time_stamp, "HK") + } } pub struct ExampleHkRequestConverter { diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index fd66d4f..38d6454 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -1,20 +1,22 @@ use crate::requests::GenericRequestRouter; use crate::tmtc::MpscStoreAndSendError; use log::warn; +use satrs::pus::action::ActivePusActionRequestStd; use satrs::pus::verification::{self, FailParams, VerificationReportingProvider}; use satrs::pus::{ ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, - EcssTcReceiverCore, EcssTmSenderCore, GenericRoutingError, PusPacketHandlerResult, - PusPacketHandlingError, PusReplyHandler, PusRequestRouter, PusServiceHelper, - PusTcToRequestConverter, TcInMemory, + EcssTcReceiverCore, EcssTmSenderCore, EcssTmtcError, GenericRoutingError, + PusPacketHandlerResult, PusPacketHandlingError, PusReplyHandler, PusRequestRouter, + PusServiceHelper, PusTcToRequestConverter, TcInMemory, }; -use satrs::request::GenericMessage; +use satrs::request::{GenericMessage, MessageReceiver, MessageSender, MessageSenderAndReceiver}; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::PusServiceId; use satrs::spacepackets::time::cds::TimeProvider; use satrs::spacepackets::time::TimeWriter; use satrs_example::config::{tmtc_err, CustomPusServiceId}; -use std::sync::mpsc::Sender; +use std::fmt::Debug; +use std::sync::mpsc::{self, Sender}; pub mod action; pub mod event; @@ -82,18 +84,21 @@ pub struct PusTargetedRequestService< TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, RequestConverter: PusTcToRequestConverter, - ReplyHandler: PusReplyHandler, + ReplyHandler: PusReplyHandler, ActiveRequestMap: ActiveRequestMapProvider, ActiveRequestInfo: ActiveRequestProvider, - RequestType, + RequestSender: MessageSender, + RequestType: Send, ReplyType, > { pub service_helper: PusServiceHelper, - pub request_router: GenericRequestRouter, + //pub request_router: GenericRequestRouter, pub request_converter: RequestConverter, pub active_request_map: ActiveRequestMap, - pub reply_hook: ReplyHandler, + pub request_router_reply_receiver: + MessageSenderAndReceiver>, + pub reply_handler: ReplyHandler, phantom: std::marker::PhantomData<(RequestType, ActiveRequestInfo, ReplyType)>, } @@ -103,10 +108,11 @@ impl< TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, RequestConverter: PusTcToRequestConverter, - ReplyHandler: PusReplyHandler, + ReplyHandler: PusReplyHandler, ActiveRequestMap: ActiveRequestMapProvider, ActiveRequestInfo: ActiveRequestProvider, - RequestType, + RequestSender: MessageSender, + RequestType: Send, ReplyType, > PusTargetedRequestService< @@ -118,6 +124,7 @@ impl< ReplyHandler, ActiveRequestMap, ActiveRequestInfo, + RequestSender, RequestType, ReplyType, > @@ -132,16 +139,21 @@ where VerificationReporter, >, request_converter: RequestConverter, - request_router: GenericRequestRouter, active_request_map: ActiveRequestMap, reply_hook: ReplyHandler, + request_router_reply_receiver: MessageSenderAndReceiver< + RequestType, + ReplyType, + RequestSender, + mpsc::Receiver>, + >, ) -> Self { Self { service_helper, - request_router, request_converter, active_request_map, - reply_hook, + reply_handler: reply_hook, + request_router_reply_receiver, phantom: std::marker::PhantomData, } } @@ -166,9 +178,11 @@ where &self.service_helper.common.verification_handler, )?; let verif_request_id = verification::RequestId::new(&tc); - if let Err(e) = - self.request_router - .route(request_info.target_id(), request, ecss_tc_and_token.token) + if let Err(e) = self + .request_router_reply_receiver + .message_sender_map + .send_message(request_id, local_channel_id, target_channel_id, request) + // .(request_info.target_id(), request, ecss_tc_and_token.token) { let target_id = request_info.target_id(); self.active_request_map @@ -186,7 +200,66 @@ where Ok(PusPacketHandlerResult::RequestHandled) } - pub fn insert_reply(&mut self, reply: &GenericMessage) {} + pub fn insert_reply( + &mut self, + reply: &GenericMessage, + time_stamp: &[u8], + ) -> Result<(), EcssTmtcError> { + let active_req_opt = self.active_request_map.get(reply.request_id); + if active_req_opt.is_none() { + self.reply_handler + .handle_unexpected_reply(reply, &self.service_helper.common.tm_sender)?; + } + let active_request = active_req_opt.unwrap(); + let request_finished = self + .reply_handler + .handle_reply( + reply, + active_request, + &self.service_helper.common.verification_handler, + time_stamp, + &self.service_helper.common.tm_sender, + ) + .unwrap_or(false); + if request_finished { + self.active_request_map.remove(reply.request_id); + } + Ok(()) + } + + pub fn check_for_request_timeouts(&mut self) { + let mut requests_to_delete = Vec::new(); + self.active_request_map + .for_each(|request_id, request_info| { + if request_info.has_timed_out() { + requests_to_delete.push(*request_id); + } + }); + if !requests_to_delete.is_empty() { + for request_id in requests_to_delete { + self.active_request_map.remove(request_id); + } + } + } +} + +pub fn generic_pus_request_timeout_handler( + active_request: &(impl ActiveRequestProvider + Debug), + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + service_str: &'static str, +) -> Result<(), EcssTmtcError> { + log::warn!("timeout for active request {active_request:?} on {service_str} service"); + verification_handler + .completion_failure( + active_request.token(), + FailParams::new( + time_stamp, + &satrs_example::config::tmtc_err::REQUEST_TIMEOUT, + &[], + ), + ) + .map_err(|e| e.0) } impl PusReceiver { diff --git a/satrs/src/mode.rs b/satrs/src/mode.rs index 5c83788..09b0920 100644 --- a/satrs/src/mode.rs +++ b/satrs/src/mode.rs @@ -375,7 +375,8 @@ pub mod alloc_mod { } } - /// Helper type defintion for a mode handler object which can send mode requests. + /// Helper type defintion for a mode handler object which can send mode requests and receive + /// mode replies. pub type ModeRequestorInterface = MessageSenderAndReceiver; impl, R: MessageReceiver> ModeRequestorInterface { diff --git a/satrs/src/pus/action.rs b/satrs/src/pus/action.rs index 4bcb1eb..0a9d88e 100644 --- a/satrs/src/pus/action.rs +++ b/satrs/src/pus/action.rs @@ -75,11 +75,72 @@ impl GenericActionReplyPus { #[cfg(feature = "alloc")] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] -pub mod alloc_mod {} +pub mod alloc_mod { + use crate::{ + action::ActionRequest, + queue::GenericTargetedMessagingError, + request::{ + GenericMessage, MessageReceiver, MessageSender, MessageSenderAndReceiver, RequestId, + }, + ChannelId, + }; + + use super::ActionReplyPusWithActionId; + + /// Helper type definition for a mode handler which can handle mode requests. + pub type ActionRequestHandlerInterface = + MessageSenderAndReceiver; + + impl, R: MessageReceiver> + ActionRequestHandlerInterface + { + pub fn try_recv_action_request( + &self, + ) -> Result>, GenericTargetedMessagingError> { + self.try_recv_message() + } + + pub fn send_action_reply( + &self, + request_id: RequestId, + target_id: ChannelId, + reply: ActionReplyPusWithActionId, + ) -> Result<(), GenericTargetedMessagingError> { + self.send_message(request_id, target_id, reply) + } + } + + /// Helper type defintion for a mode handler object which can send mode requests and receive + /// mode replies. + pub type ActionRequestorInterface = + MessageSenderAndReceiver; + + impl, R: MessageReceiver> + ActionRequestorInterface + { + pub fn try_recv_action_reply( + &self, + ) -> Result>, GenericTargetedMessagingError> + { + self.try_recv_message() + } + + pub fn send_action_request( + &self, + request_id: RequestId, + target_id: ChannelId, + request: ActionRequest, + ) -> Result<(), GenericTargetedMessagingError> { + self.send_message(request_id, target_id, request) + } + } +} #[cfg(feature = "std")] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] pub mod std_mod { + use std::sync::mpsc; + use crate::pus::{verification, DefaultActiveRequestMap}; use super::*; @@ -115,6 +176,39 @@ pub mod std_mod { } } pub type DefaultActiveActionRequestMap = DefaultActiveRequestMap; + + pub type ActionRequestHandlerMpsc = ActionRequestHandlerInterface< + mpsc::Sender>, + mpsc::Receiver>, + >; + pub type ActionRequestHandlerMpscBounded = ActionRequestHandlerInterface< + mpsc::SyncSender>, + mpsc::Receiver>, + >; + + pub type ActionRequestorMpsc = ActionRequestorInterface< + mpsc::Sender>, + mpsc::Receiver>, + >; + pub type ActionRequestorBoundedMpsc = ActionRequestorInterface< + mpsc::SyncSender>, + mpsc::Receiver>, + >; + + /* + pub type ModeRequestorAndHandlerMpsc = ModeInterface< + mpsc::Sender>, + mpsc::Receiver>, + mpsc::Sender>, + mpsc::Receiver>, + >; + pub type ModeRequestorAndHandlerMpscBounded = ModeInterface< + mpsc::SyncSender>, + mpsc::Receiver>, + mpsc::SyncSender>, + mpsc::Receiver>, + >; + */ } #[cfg(test)] diff --git a/satrs/src/pus/mod.rs b/satrs/src/pus/mod.rs index c0b11ef..93bbe6a 100644 --- a/satrs/src/pus/mod.rs +++ b/satrs/src/pus/mod.rs @@ -332,7 +332,15 @@ pub trait PusReplyHandler { &mut self, reply: &GenericMessage, tm_sender: &impl EcssTmSenderCore, - ); + ) -> Result<(), Self::Error>; + + fn handle_request_timeout( + &mut self, + active_request: &ActiveRequestInfo, + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + tm_sender: &impl EcssTmSenderCore, + ) -> Result<(), Self::Error>; } #[cfg(feature = "alloc")] diff --git a/satrs/src/pus/mode.rs b/satrs/src/pus/mode.rs index 9e42124..cea2087 100644 --- a/satrs/src/pus/mode.rs +++ b/satrs/src/pus/mode.rs @@ -44,8 +44,6 @@ pub mod std_mod { fn can_not_reach_mode_result_code(&self) -> ResultU16; } - use super::{ModeReply, MODE_SERVICE_ID}; - /// Type definition for a PUS mode servicd reply handler which constrains the /// [PusServiceReplyHandler] active request and reply generics to the [ActiveActionRequest] and /// [ActionReplyPusWithIds] type. @@ -174,10 +172,9 @@ mod tests { use crate::{ mode::{ - ModeAndSubmode, ModeReplySender, ModeRequest, ModeRequestSender, + ModeAndSubmode, ModeReply, ModeReplySender, ModeRequest, ModeRequestSender, ModeRequestorAndHandlerMpsc, ModeRequestorMpsc, }, - pus::mode::ModeReply, request::GenericMessage, };