its not getting easier..
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit

This commit is contained in:
Robin Müller 2024-03-20 16:35:55 +01:00
parent 83e0a50e2a
commit d5e61090fe
7 changed files with 246 additions and 71 deletions

View File

@ -3,8 +3,8 @@ use satrs::action::{ActionRequest, ActionRequestVariant};
use satrs::params::WritableToBeBytes;
use satrs::pool::{SharedStaticMemoryPool, StoreAddr};
use satrs::pus::action::{
ActionReplyPus, ActionReplyPusWithActionId, ActionRequestWithId, ActivePusActionRequestStd,
DefaultActiveActionRequestMap,
ActionReplyPus, ActionReplyPusWithActionId, ActionRequestWithId, ActionRequestorBoundedMpsc,
ActionRequestorMpsc, ActivePusActionRequestStd, DefaultActiveActionRequestMap,
};
use satrs::pus::verification::{
self, FailParams, FailParamsWithStep, TcStateAccepted,
@ -18,7 +18,7 @@ use satrs::pus::{
PusTcToRequestConverter, TmAsVecSenderWithId, TmAsVecSenderWithMpsc,
TmInSharedPoolSenderWithBoundedMpsc, TmInSharedPoolSenderWithId,
};
use satrs::request::{GenericMessage, TargetAndApidId};
use satrs::request::{GenericMessage, MessageReceiver, MessageSender, TargetAndApidId};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket};
use satrs::tmtc::tm_helper::SharedTmPool;
@ -29,7 +29,7 @@ use std::time::Duration;
use crate::requests::GenericRequestRouter;
use super::PusTargetedRequestService;
use super::{generic_pus_request_timeout_handler, PusTargetedRequestService};
pub struct ActionReplyHandler {
fail_data_buf: [u8; 128],
@ -50,8 +50,9 @@ impl PusReplyHandler<ActivePusActionRequestStd, ActionReplyPusWithActionId> for
&mut self,
reply: &GenericMessage<ActionReplyPusWithActionId>,
_tm_sender: &impl EcssTmSenderCore,
) {
) -> Result<(), Self::Error> {
log::warn!("received unexpected reply for service 8: {reply:?}");
Ok(())
}
fn handle_reply(
@ -115,15 +116,20 @@ impl PusReplyHandler<ActivePusActionRequestStd, ActionReplyPusWithActionId> for
Ok(remove_entry)
}
/*
fn timeout_callback(&self, active_request: &ActiveRequestType) {
log::warn!("timeout for active request {active_request} on service {SERVICE}");
fn handle_request_timeout(
&mut self,
active_request: &ActivePusActionRequestStd,
verification_handler: &impl VerificationReportingProvider,
time_stamp: &[u8],
_tm_sender: &impl EcssTmSenderCore,
) -> Result<(), Self::Error> {
generic_pus_request_timeout_handler(
active_request,
verification_handler,
time_stamp,
"action",
)
}
fn timeout_error_code(&self) -> satrs::res_code::ResultU16 {
REQUEST_TIMEOUT
}
*/
}
#[derive(Default)]
@ -194,12 +200,13 @@ pub fn create_action_service_static(
verif_reporter: VerificationReporterWithSharedPoolMpscBoundedSender,
tc_pool: SharedStaticMemoryPool,
pus_action_rx: mpsc::Receiver<EcssTcAndToken>,
action_router: GenericRequestRouter,
action_router: ActionRequestorBoundedMpsc,
) -> Pus8Wrapper<
MpscTcReceiver,
TmInSharedPoolSenderWithBoundedMpsc,
EcssTcInSharedStoreConverter,
VerificationReporterWithSharedPoolMpscBoundedSender,
mpsc::SyncSender<GenericMessage<ActionRequest>>,
> {
let action_srv_tm_sender = TmInSharedPoolSenderWithId::new(
TmSenderId::PusAction as ChannelId,
@ -221,11 +228,11 @@ pub fn create_action_service_static(
EcssTcInSharedStoreConverter::new(tc_pool.clone(), 2048),
),
ExampleActionRequestConverter::default(),
action_router,
// TODO: Implementation which does not use run-time allocation? Maybe something like
// a bounded wrapper which pre-allocates using [HashMap::with_capacity]..
DefaultActiveActionRequestMap::default(),
ActionReplyHandler::default(),
action_router,
);
Pus8Wrapper {
action_request_handler,
@ -236,12 +243,13 @@ pub fn create_action_service_dynamic(
tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithVecMpscSender,
pus_action_rx: mpsc::Receiver<EcssTcAndToken>,
action_router: GenericRequestRouter,
action_router: ActionRequestorMpsc,
) -> Pus8Wrapper<
MpscTcReceiver,
TmAsVecSenderWithMpsc,
EcssTcInVecConverter,
VerificationReporterWithVecMpscSender,
mpsc::Sender<GenericMessage<ActionRequest>>,
> {
let action_srv_tm_sender = TmAsVecSenderWithId::new(
TmSenderId::PusAction as ChannelId,
@ -262,42 +270,21 @@ pub fn create_action_service_dynamic(
EcssTcInVecConverter::default(),
),
ExampleActionRequestConverter::default(),
action_router,
DefaultActiveActionRequestMap::default(),
ActionReplyHandler::default(),
action_router,
);
Pus8Wrapper {
action_request_handler,
}
}
/*
#[derive(Default)]
pub struct PusActionReplyHook {}
impl ReplyHandlerHook<ActivePusActionRequest, ActionReplyPusWithActionId> for PusActionReplyHook {
fn handle_unexpected_reply(
&mut self,
reply: &satrs::request::GenericMessage<ActionReplyPusWithActionId>,
) {
println!("received unexpected action reply {:?}", reply);
}
fn timeout_callback(&self, active_request: &ActivePusActionRequest) {
println!("active request {active_request:?} timed out");
}
fn timeout_error_code(&self) -> satrs::res_code::ResultU16 {
todo!()
}
}
*/
pub struct Pus8Wrapper<
TcReceiver: EcssTcReceiverCore,
TmSender: EcssTmSenderCore,
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
RequestSender: MessageSender<ActionRequest>,
> {
pub(crate) action_request_handler: PusTargetedRequestService<
TcReceiver,
@ -308,7 +295,8 @@ pub struct Pus8Wrapper<
ActionReplyHandler,
DefaultActiveActionRequestMap,
ActivePusActionRequestStd,
ActionRequestWithId,
RequestSender,
ActionRequest,
ActionReplyPusWithActionId,
>,
}
@ -318,7 +306,8 @@ impl<
TmSender: EcssTmSenderCore,
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
> Pus8Wrapper<TcReceiver, TmSender, TcInMemConverter, VerificationReporter>
RequestSender: MessageSender<ActionRequest>,
> Pus8Wrapper<TcReceiver, TmSender, TcInMemConverter, VerificationReporter, RequestSender>
{
pub fn handle_next_packet(&mut self, time_stamp: &[u8]) -> bool {
match self.action_request_handler.handle_one_tc(time_stamp) {

View File

@ -8,9 +8,10 @@ use satrs::pus::verification::{
use satrs::pus::{
ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken,
EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcReceiverCore,
EcssTmSenderCore, MpscTcReceiver, PusPacketHandlerResult, PusPacketHandlingError,
PusReplyHandler, PusServiceHelper, PusTcToRequestConverter, TmAsVecSenderWithId,
TmAsVecSenderWithMpsc, TmInSharedPoolSenderWithBoundedMpsc, TmInSharedPoolSenderWithId,
EcssTmSenderCore, EcssTmtcError, MpscTcReceiver, PusPacketHandlerResult,
PusPacketHandlingError, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter,
TmAsVecSenderWithId, TmAsVecSenderWithMpsc, TmInSharedPoolSenderWithBoundedMpsc,
TmInSharedPoolSenderWithId,
};
use satrs::request::TargetAndApidId;
use satrs::spacepackets::ecss::tc::PusTcReader;
@ -21,6 +22,7 @@ use satrs_example::config::{hk_err, tmtc_err, TcReceiverId, TmSenderId, PUS_APID
use std::sync::mpsc::{self};
use std::time::Duration;
use crate::pus::generic_pus_request_timeout_handler;
use crate::requests::GenericRequestRouter;
use super::PusTargetedRequestService;
@ -34,14 +36,15 @@ pub enum HkReply {
pub struct HkReplyHandler {}
impl PusReplyHandler<ActivePusRequestStd, HkReply> for HkReplyHandler {
type Error = ();
type Error = EcssTmtcError;
fn handle_unexpected_reply(
&mut self,
reply: &satrs::request::GenericMessage<HkReply>,
_tm_sender: &impl EcssTmSenderCore,
) {
) -> Result<(), Self::Error> {
log::warn!("received unexpected reply for service 3: {reply:?}");
Ok(())
}
fn handle_reply(
@ -61,6 +64,16 @@ impl PusReplyHandler<ActivePusRequestStd, HkReply> for HkReplyHandler {
};
Ok(true)
}
fn handle_request_timeout(
&mut self,
active_request: &ActivePusRequestStd,
verification_handler: &impl VerificationReportingProvider,
time_stamp: &[u8],
_tm_sender: &impl EcssTmSenderCore,
) -> Result<(), Self::Error> {
generic_pus_request_timeout_handler(active_request, verification_handler, time_stamp, "HK")
}
}
pub struct ExampleHkRequestConverter {

View File

@ -1,20 +1,22 @@
use crate::requests::GenericRequestRouter;
use crate::tmtc::MpscStoreAndSendError;
use log::warn;
use satrs::pus::action::ActivePusActionRequestStd;
use satrs::pus::verification::{self, FailParams, VerificationReportingProvider};
use satrs::pus::{
ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter,
EcssTcReceiverCore, EcssTmSenderCore, GenericRoutingError, PusPacketHandlerResult,
PusPacketHandlingError, PusReplyHandler, PusRequestRouter, PusServiceHelper,
PusTcToRequestConverter, TcInMemory,
EcssTcReceiverCore, EcssTmSenderCore, EcssTmtcError, GenericRoutingError,
PusPacketHandlerResult, PusPacketHandlingError, PusReplyHandler, PusRequestRouter,
PusServiceHelper, PusTcToRequestConverter, TcInMemory,
};
use satrs::request::GenericMessage;
use satrs::request::{GenericMessage, MessageReceiver, MessageSender, MessageSenderAndReceiver};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusServiceId;
use satrs::spacepackets::time::cds::TimeProvider;
use satrs::spacepackets::time::TimeWriter;
use satrs_example::config::{tmtc_err, CustomPusServiceId};
use std::sync::mpsc::Sender;
use std::fmt::Debug;
use std::sync::mpsc::{self, Sender};
pub mod action;
pub mod event;
@ -82,18 +84,21 @@ pub struct PusTargetedRequestService<
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
RequestConverter: PusTcToRequestConverter<ActiveRequestInfo, RequestType, Error = PusPacketHandlingError>,
ReplyHandler: PusReplyHandler<ActiveRequestInfo, ReplyType>,
ReplyHandler: PusReplyHandler<ActiveRequestInfo, ReplyType, Error = EcssTmtcError>,
ActiveRequestMap: ActiveRequestMapProvider<ActiveRequestInfo>,
ActiveRequestInfo: ActiveRequestProvider,
RequestType,
RequestSender: MessageSender<RequestType>,
RequestType: Send,
ReplyType,
> {
pub service_helper:
PusServiceHelper<TcReceiver, TmSender, TcInMemConverter, VerificationReporter>,
pub request_router: GenericRequestRouter,
//pub request_router: GenericRequestRouter,
pub request_converter: RequestConverter,
pub active_request_map: ActiveRequestMap,
pub reply_hook: ReplyHandler,
pub request_router_reply_receiver:
MessageSenderAndReceiver<RequestType, ReplyType, RequestSender, mpsc::Receiver<ReplyType>>,
pub reply_handler: ReplyHandler,
phantom: std::marker::PhantomData<(RequestType, ActiveRequestInfo, ReplyType)>,
}
@ -103,10 +108,11 @@ impl<
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
RequestConverter: PusTcToRequestConverter<ActiveRequestInfo, RequestType, Error = PusPacketHandlingError>,
ReplyHandler: PusReplyHandler<ActiveRequestInfo, ReplyType>,
ReplyHandler: PusReplyHandler<ActiveRequestInfo, ReplyType, Error = EcssTmtcError>,
ActiveRequestMap: ActiveRequestMapProvider<ActiveRequestInfo>,
ActiveRequestInfo: ActiveRequestProvider,
RequestType,
RequestSender: MessageSender<RequestType>,
RequestType: Send,
ReplyType,
>
PusTargetedRequestService<
@ -118,6 +124,7 @@ impl<
ReplyHandler,
ActiveRequestMap,
ActiveRequestInfo,
RequestSender,
RequestType,
ReplyType,
>
@ -132,16 +139,21 @@ where
VerificationReporter,
>,
request_converter: RequestConverter,
request_router: GenericRequestRouter,
active_request_map: ActiveRequestMap,
reply_hook: ReplyHandler,
request_router_reply_receiver: MessageSenderAndReceiver<
RequestType,
ReplyType,
RequestSender,
mpsc::Receiver<GenericMessage<ReplyType>>,
>,
) -> Self {
Self {
service_helper,
request_router,
request_converter,
active_request_map,
reply_hook,
reply_handler: reply_hook,
request_router_reply_receiver,
phantom: std::marker::PhantomData,
}
}
@ -166,9 +178,11 @@ where
&self.service_helper.common.verification_handler,
)?;
let verif_request_id = verification::RequestId::new(&tc);
if let Err(e) =
self.request_router
.route(request_info.target_id(), request, ecss_tc_and_token.token)
if let Err(e) = self
.request_router_reply_receiver
.message_sender_map
.send_message(request_id, local_channel_id, target_channel_id, request)
// .(request_info.target_id(), request, ecss_tc_and_token.token)
{
let target_id = request_info.target_id();
self.active_request_map
@ -186,7 +200,66 @@ where
Ok(PusPacketHandlerResult::RequestHandled)
}
pub fn insert_reply(&mut self, reply: &GenericMessage<ReplyType>) {}
pub fn insert_reply(
&mut self,
reply: &GenericMessage<ReplyType>,
time_stamp: &[u8],
) -> Result<(), EcssTmtcError> {
let active_req_opt = self.active_request_map.get(reply.request_id);
if active_req_opt.is_none() {
self.reply_handler
.handle_unexpected_reply(reply, &self.service_helper.common.tm_sender)?;
}
let active_request = active_req_opt.unwrap();
let request_finished = self
.reply_handler
.handle_reply(
reply,
active_request,
&self.service_helper.common.verification_handler,
time_stamp,
&self.service_helper.common.tm_sender,
)
.unwrap_or(false);
if request_finished {
self.active_request_map.remove(reply.request_id);
}
Ok(())
}
pub fn check_for_request_timeouts(&mut self) {
let mut requests_to_delete = Vec::new();
self.active_request_map
.for_each(|request_id, request_info| {
if request_info.has_timed_out() {
requests_to_delete.push(*request_id);
}
});
if !requests_to_delete.is_empty() {
for request_id in requests_to_delete {
self.active_request_map.remove(request_id);
}
}
}
}
pub fn generic_pus_request_timeout_handler(
active_request: &(impl ActiveRequestProvider + Debug),
verification_handler: &impl VerificationReportingProvider,
time_stamp: &[u8],
service_str: &'static str,
) -> Result<(), EcssTmtcError> {
log::warn!("timeout for active request {active_request:?} on {service_str} service");
verification_handler
.completion_failure(
active_request.token(),
FailParams::new(
time_stamp,
&satrs_example::config::tmtc_err::REQUEST_TIMEOUT,
&[],
),
)
.map_err(|e| e.0)
}
impl<VerificationReporter: VerificationReportingProvider> PusReceiver<VerificationReporter> {

View File

@ -375,7 +375,8 @@ pub mod alloc_mod {
}
}
/// Helper type defintion for a mode handler object which can send mode requests.
/// Helper type defintion for a mode handler object which can send mode requests and receive
/// mode replies.
pub type ModeRequestorInterface<S, R> = MessageSenderAndReceiver<ModeRequest, ModeReply, S, R>;
impl<S: MessageSender<ModeRequest>, R: MessageReceiver<ModeReply>> ModeRequestorInterface<S, R> {

View File

@ -75,11 +75,72 @@ impl GenericActionReplyPus {
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub mod alloc_mod {}
pub mod alloc_mod {
use crate::{
action::ActionRequest,
queue::GenericTargetedMessagingError,
request::{
GenericMessage, MessageReceiver, MessageSender, MessageSenderAndReceiver, RequestId,
},
ChannelId,
};
use super::ActionReplyPusWithActionId;
/// Helper type definition for a mode handler which can handle mode requests.
pub type ActionRequestHandlerInterface<S, R> =
MessageSenderAndReceiver<ActionReplyPusWithActionId, ActionRequest, S, R>;
impl<S: MessageSender<ActionReplyPusWithActionId>, R: MessageReceiver<ActionRequest>>
ActionRequestHandlerInterface<S, R>
{
pub fn try_recv_action_request(
&self,
) -> Result<Option<GenericMessage<ActionRequest>>, GenericTargetedMessagingError> {
self.try_recv_message()
}
pub fn send_action_reply(
&self,
request_id: RequestId,
target_id: ChannelId,
reply: ActionReplyPusWithActionId,
) -> Result<(), GenericTargetedMessagingError> {
self.send_message(request_id, target_id, reply)
}
}
/// Helper type defintion for a mode handler object which can send mode requests and receive
/// mode replies.
pub type ActionRequestorInterface<S, R> =
MessageSenderAndReceiver<ActionRequest, ActionReplyPusWithActionId, S, R>;
impl<S: MessageSender<ActionRequest>, R: MessageReceiver<ActionReplyPusWithActionId>>
ActionRequestorInterface<S, R>
{
pub fn try_recv_action_reply(
&self,
) -> Result<Option<GenericMessage<ActionReplyPusWithActionId>>, GenericTargetedMessagingError>
{
self.try_recv_message()
}
pub fn send_action_request(
&self,
request_id: RequestId,
target_id: ChannelId,
request: ActionRequest,
) -> Result<(), GenericTargetedMessagingError> {
self.send_message(request_id, target_id, request)
}
}
}
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub mod std_mod {
use std::sync::mpsc;
use crate::pus::{verification, DefaultActiveRequestMap};
use super::*;
@ -115,6 +176,39 @@ pub mod std_mod {
}
}
pub type DefaultActiveActionRequestMap = DefaultActiveRequestMap<ActivePusActionRequestStd>;
pub type ActionRequestHandlerMpsc = ActionRequestHandlerInterface<
mpsc::Sender<GenericMessage<ActionReplyPusWithActionId>>,
mpsc::Receiver<GenericMessage<ActionRequest>>,
>;
pub type ActionRequestHandlerMpscBounded = ActionRequestHandlerInterface<
mpsc::SyncSender<GenericMessage<ActionReplyPusWithActionId>>,
mpsc::Receiver<GenericMessage<ActionRequest>>,
>;
pub type ActionRequestorMpsc = ActionRequestorInterface<
mpsc::Sender<GenericMessage<ActionRequest>>,
mpsc::Receiver<GenericMessage<ActionReplyPusWithActionId>>,
>;
pub type ActionRequestorBoundedMpsc = ActionRequestorInterface<
mpsc::SyncSender<GenericMessage<ActionRequest>>,
mpsc::Receiver<GenericMessage<ActionReplyPusWithActionId>>,
>;
/*
pub type ModeRequestorAndHandlerMpsc = ModeInterface<
mpsc::Sender<GenericMessage<ModeRequest>>,
mpsc::Receiver<GenericMessage<ModeReply>>,
mpsc::Sender<GenericMessage<ModeReply>>,
mpsc::Receiver<GenericMessage<ModeRequest>>,
>;
pub type ModeRequestorAndHandlerMpscBounded = ModeInterface<
mpsc::SyncSender<GenericMessage<ModeRequest>>,
mpsc::Receiver<GenericMessage<ModeReply>>,
mpsc::SyncSender<GenericMessage<ModeReply>>,
mpsc::Receiver<GenericMessage<ModeRequest>>,
>;
*/
}
#[cfg(test)]

View File

@ -332,7 +332,15 @@ pub trait PusReplyHandler<ActiveRequestInfo: ActiveRequestProvider, ReplyType> {
&mut self,
reply: &GenericMessage<ReplyType>,
tm_sender: &impl EcssTmSenderCore,
);
) -> Result<(), Self::Error>;
fn handle_request_timeout(
&mut self,
active_request: &ActiveRequestInfo,
verification_handler: &impl VerificationReportingProvider,
time_stamp: &[u8],
tm_sender: &impl EcssTmSenderCore,
) -> Result<(), Self::Error>;
}
#[cfg(feature = "alloc")]

View File

@ -44,8 +44,6 @@ pub mod std_mod {
fn can_not_reach_mode_result_code(&self) -> ResultU16;
}
use super::{ModeReply, MODE_SERVICE_ID};
/// Type definition for a PUS mode servicd reply handler which constrains the
/// [PusServiceReplyHandler] active request and reply generics to the [ActiveActionRequest] and
/// [ActionReplyPusWithIds] type.
@ -174,10 +172,9 @@ mod tests {
use crate::{
mode::{
ModeAndSubmode, ModeReplySender, ModeRequest, ModeRequestSender,
ModeAndSubmode, ModeReply, ModeReplySender, ModeRequest, ModeRequestSender,
ModeRequestorAndHandlerMpsc, ModeRequestorMpsc,
},
pus::mode::ModeReply,
request::GenericMessage,
};