need to re-work all of this..
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit

This commit is contained in:
2024-03-18 19:57:37 +01:00
parent d2c4cd7428
commit 0f2a700ef1
19 changed files with 561 additions and 618 deletions

View File

@ -53,6 +53,8 @@ pub mod tmtc_err {
pub const UNKNOWN_TARGET_ID: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 4);
#[resultcode]
pub const ROUTING_ERROR: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 5);
#[resultcode(info = "Request timeout for targeted PUS request. P1: Request ID. P2: Target ID")]
pub const REQUEST_TIMEOUT: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 6);
#[resultcode(
info = "Not enough data inside the TC application data field. Optionally includes: \

View File

@ -2,8 +2,8 @@ use log::{error, warn};
use satrs::action::{ActionRequest, ActionRequestVariant};
use satrs::pool::{SharedStaticMemoryPool, StoreAddr};
use satrs::pus::action::{
ActionReplyPusWithActionId, ActiveActionRequest, DefaultActiveActionRequestMap,
PusService8ActionRequestHandler, PusService8ReplyHandler,
ActionReplyPus, ActionReplyPusWithActionId, ActiveActionRequest, DefaultActiveActionRequestMap,
PusService8ReplyHandler,
};
use satrs::pus::verification::{
FailParams, TcStateAccepted, VerificationReporterWithSharedPoolMpscBoundedSender,
@ -12,20 +12,56 @@ use satrs::pus::verification::{
use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
EcssTcReceiverCore, EcssTmSenderCore, MpscTcReceiver, PusPacketHandlerResult,
PusPacketHandlingError, PusServiceHelper, PusTcToRequestConverter, ReplyHandlerHook,
TmAsVecSenderWithId, TmAsVecSenderWithMpsc, TmInSharedPoolSenderWithBoundedMpsc,
TmInSharedPoolSenderWithId,
PusPacketHandlingError, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter,
ReplyHandlerHook, TmAsVecSenderWithId, TmAsVecSenderWithMpsc,
TmInSharedPoolSenderWithBoundedMpsc, TmInSharedPoolSenderWithId,
};
use satrs::request::TargetAndApidId;
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusPacket;
use satrs::tmtc::tm_helper::SharedTmPool;
use satrs::{ChannelId, TargetId};
use satrs_example::config::tmtc_err::REQUEST_TIMEOUT;
use satrs_example::config::{tmtc_err, TcReceiverId, TmSenderId, PUS_APID};
use std::sync::mpsc::{self};
use crate::requests::GenericRequestRouter;
use super::PusTargetedRequestService;
#[derive(Default)]
pub struct ActionReplyHandler {}
impl PusReplyHandler<ActiveActionRequest, ActionReplyPusWithActionId> for ActionReplyHandler {
type Error = ();
fn handle_unexpected_reply(
&mut self,
reply: &satrs::request::GenericMessage<ActionReplyPusWithActionId>,
) {
log::warn!("received unexpected reply for service {SERVICE}: {reply}");
}
fn handle_reply(
&mut self,
reply: &satrs::request::GenericMessage<ActionReplyPusWithActionId>,
verification_handler: &impl VerificationReportingProvider,
tm_sender: &impl EcssTmSenderCore,
) -> Result<bool, Self::Error> {
todo!()
}
/*
fn timeout_callback(&self, active_request: &ActiveRequestType) {
log::warn!("timeout for active request {active_request} on service {SERVICE}");
}
fn timeout_error_code(&self) -> satrs::res_code::ResultU16 {
REQUEST_TIMEOUT
}
*/
}
#[derive(Default)]
pub struct ExampleActionRequestConverter {}
@ -99,7 +135,7 @@ pub fn create_action_service_static(
"PUS_8_TC_RECV",
pus_action_rx,
);
let action_request_handler = PusService8ActionRequestHandler::new(
let action_request_handler = PusTargetedRequestService::new(
PusServiceHelper::new(
action_srv_receiver,
action_srv_tm_sender.clone(),
@ -109,18 +145,13 @@ pub fn create_action_service_static(
),
ExampleActionRequestConverter::default(),
action_router,
);
let action_reply_handler = PusService8ReplyHandler::new_from_now(
verif_reporter.clone(),
// TODO: Implementation which does not use run-time allocation? Maybe something like
// a bounded wrapper which pre-allocates using [HashMap::with_capacity]..
DefaultActiveActionRequestMap::default(),
1024,
PusActionReplyHook::default(),
action_srv_tm_sender,
)
.expect("Failed to create PUS 8 reply handler");
ActionReplyHandler::<8>::default(),
);
Pus8Wrapper {
action_request_handler,
action_reply_handler,
}
}
@ -145,7 +176,7 @@ pub fn create_action_service_dynamic(
"PUS_8_TC_RECV",
pus_action_rx,
);
let action_request_handler = PusService8ActionRequestHandler::new(
let action_request_handler = PusTargetedRequestService::new(
PusServiceHelper::new(
action_srv_receiver,
action_srv_tm_sender.clone(),
@ -155,18 +186,11 @@ pub fn create_action_service_dynamic(
),
ExampleActionRequestConverter::default(),
action_router,
);
let action_reply_handler = PusService8ReplyHandler::new_from_now(
verif_reporter.clone(),
DefaultActiveActionRequestMap::default(),
1024,
PusActionReplyHook::default(),
action_srv_tm_sender,
)
.expect("Failed to create PUS 8 reply handler");
ActionReplyHandler::<8>::default(),
);
Pus8Wrapper {
action_request_handler,
action_reply_handler,
}
}
@ -195,19 +219,13 @@ pub struct Pus8Wrapper<
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
> {
pub(crate) action_request_handler: PusService8ActionRequestHandler<
pub(crate) action_request_handler: PusTargetedRequestService<
TcReceiver,
TmSender,
TcInMemConverter,
VerificationReporter,
ExampleActionRequestConverter,
GenericRequestRouter,
>,
pub(crate) action_reply_handler: PusService8ReplyHandler<
VerificationReporter,
DefaultActiveActionRequestMap,
PusActionReplyHook,
TmSender,
ActionRequest,
>,
}
@ -239,7 +257,6 @@ impl<
error!("PUS packet handling error: {error:?}")
}
}
// self.action_reply_handler.handle_replies();
false
}
}

View File

@ -106,8 +106,8 @@ impl<
VerificationReporter: VerificationReportingProvider,
> Pus5Wrapper<TcReceiver, TmSender, TcInMemConverter, VerificationReporter>
{
pub fn handle_next_packet(&mut self) -> bool {
match self.pus_5_handler.handle_one_tc() {
pub fn handle_next_packet(&mut self, time_stamp: &[u8]) -> bool {
match self.pus_5_handler.handle_one_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {

View File

@ -1,7 +1,18 @@
use crate::requests::GenericRequestRouter;
use crate::tmtc::MpscStoreAndSendError;
use log::warn;
use satrs::pus::verification::{FailParams, VerificationReportingProvider};
use satrs::pus::{EcssTcAndToken, PusPacketHandlerResult, TcInMemory};
use satrs::pus::verification::{
self, FailParams, VerificationReporter, VerificationReporterWithSharedPoolMpscBoundedSender,
VerificationReporterWithVecMpscSender, VerificationReportingProvider,
};
use satrs::pus::{
ActiveRequestMapProvider, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken,
EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcReceiverCore,
EcssTmSenderCore, GenericRoutingError, MpscTcReceiver, PusPacketHandlerResult, PusReplyHandler,
PusServiceHelper, PusServiceReplyHandler, PusTcToRequestConverter, ReplyHandlerHook,
TcInMemory, TmAsVecSenderWithMpsc, TmInSharedPoolSenderWithBoundedMpsc,
};
use satrs::request::GenericMessage;
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusServiceId;
use satrs::spacepackets::time::cds::TimeProvider;
@ -36,13 +47,6 @@ struct TimeStampHelper {
}
impl TimeStampHelper {
pub fn new() -> Self {
Self {
stamper: TimeProvider::new_with_u16_days(0, 0),
time_stamp: [0; 7],
}
}
pub fn stamp(&self) -> &[u8] {
&self.time_stamp
}
@ -57,16 +61,137 @@ impl TimeStampHelper {
}
}
impl Default for TimeStampHelper {
fn default() -> Self {
Self {
stamper: TimeProvider::from_now_with_u16_days().expect("creating time stamper failed"),
time_stamp: Default::default(),
}
}
}
impl<VerificationReporter: VerificationReportingProvider> PusReceiver<VerificationReporter> {
pub fn new(verif_reporter: VerificationReporter, pus_router: PusTcMpscRouter) -> Self {
Self {
verif_reporter,
pus_router,
stamp_helper: TimeStampHelper::new(),
stamp_helper: TimeStampHelper::default(),
}
}
}
pub struct PusTargetedRequestService<
TcReceiver: EcssTcReceiverCore,
TmSender: EcssTmSenderCore,
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
RequestConverter: PusTcToRequestConverter<RequestType>,
ReplyHook: ReplyHandlerHook<RequestType, ReplyType>,
ActiveRequestMap: ActiveRequestMapProvider<RequestType>,
ActiveRequestType: ActiveRequestProvider,
RequestType,
ReplyType,
> {
pub service_helper:
PusServiceHelper<TcReceiver, TmSender, TcInMemConverter, VerificationReporter>,
pub request_router: GenericRequestRouter,
pub request_converter: RequestConverter,
pub active_request_map: ActiveRequestMap,
pub reply_hook: ReplyHook,
phantom: std::marker::PhantomData<RequestType>,
}
impl<
TcReceiver: EcssTcReceiverCore,
TmSender: EcssTmSenderCore,
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
RequestConverter: PusTcToRequestConverter<RequestType>,
ReplyHandler: PusReplyHandler<ActiveRequestType, ReplyType>,
ActiveRequestMap: ActiveRequestMapProvider<ActiveRequestType>,
ActiveRequestType: ActiveRequestProvider,
RequestType,
ReplyType,
>
PusTargetedRequestService<
TcReceiver,
TmSender,
TcInMemConverter,
VerificationReporter,
RequestConverter,
ReplyHandler,
ActiveRequestMap,
ActiveRequestType,
RequestType,
ReplyType,
>
{
pub fn new(
service_helper: PusServiceHelper<
TcReceiver,
TmSender,
TcInMemConverter,
VerificationReporter,
>,
request_converter: RequestConverter,
request_router: GenericRequestRouter,
active_request_map: ActiveRequestMap,
reply_hook: ReplyHandler,
) -> Self {
Self {
service_helper,
request_router,
request_converter,
active_request_map,
reply_hook,
phantom: std::marker::PhantomData,
}
}
pub fn handle_one_tc(&mut self) {
let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?;
if possible_packet.is_none() {
return Ok(PusPacketHandlerResult::Empty);
}
let ecss_tc_and_token = possible_packet.unwrap();
let tc = self
.service_helper
.tc_in_mem_converter
.convert_ecss_tc_in_memory_to_reader(&ecss_tc_and_token.tc_in_memory)?;
let mut partial_error = None;
let time_stamp = get_current_cds_short_timestamp(&mut partial_error);
let (active_request, action_request) = self.request_converter.convert(
ecss_tc_and_token.token,
&tc,
&time_stamp,
&self.service_helper.common.verification_handler,
)?;
if let Err(e) = self.request_router.route(
active_request.target_id(),
action_request,
ecss_tc_and_token.token,
) {
let verif_request_id = verification::RequestId::new(&tc);
self.active_request_map
.insert(verif_request_id.into(), active_request);
self.request_router.handle_error(
target_id,
ecss_tc_and_token.token,
&tc,
e.clone(),
&time_stamp,
&self.service_helper.common.verification_handler,
);
return Err(e.into());
}
Ok(PusPacketHandlerResult::RequestHandled)
}
pub fn insert_reply(&mut self, reply: &GenericMessage<ReplyType>) {
// self.reply_hook.insert_reply(reply, &self.active_request_map);
}
}
impl<VerificationReporter: VerificationReportingProvider> PusReceiver<VerificationReporter> {
pub fn handle_tc_packet(
&mut self,

View File

@ -103,8 +103,11 @@ impl<
}
}
pub fn handle_next_packet(&mut self) -> bool {
match self.pus_11_handler.handle_one_tc(&mut self.sched_tc_pool) {
pub fn handle_next_packet(&mut self, time_stamp: &[u8]) -> bool {
match self
.pus_11_handler
.handle_one_tc(time_stamp, &mut self.sched_tc_pool)
{
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {

View File

@ -1,6 +1,9 @@
use satrs::pus::{
verification::VerificationReportingProvider, EcssTcInMemConverter, EcssTcReceiverCore,
EcssTmSenderCore,
use satrs::{
pus::{
verification::VerificationReportingProvider, EcssTcInMemConverter, EcssTcReceiverCore,
EcssTmSenderCore,
},
spacepackets::time::{cds, TimeWriter},
};
use super::{
@ -51,6 +54,10 @@ impl<
pub fn periodic_operation(&mut self) {
self.schedule_srv.release_tcs();
let time_stamp = cds::TimeProvider::from_now_with_u16_days()
.expect("time stamp generation error")
.to_vec()
.unwrap();
loop {
let mut all_queues_empty = true;
let mut is_srv_finished = |srv_handler_finished: bool| {
@ -58,9 +65,9 @@ impl<
all_queues_empty = false;
}
};
is_srv_finished(self.test_srv.handle_next_packet());
is_srv_finished(self.schedule_srv.handle_next_packet());
is_srv_finished(self.event_srv.handle_next_packet());
is_srv_finished(self.test_srv.handle_next_packet(&time_stamp));
is_srv_finished(self.schedule_srv.handle_next_packet(&time_stamp));
is_srv_finished(self.event_srv.handle_next_packet(&time_stamp));
is_srv_finished(self.action_srv.handle_next_packet());
is_srv_finished(self.hk_srv.handle_next_packet());
if all_queues_empty {

View File

@ -111,8 +111,8 @@ impl<
VerificationReporter: VerificationReportingProvider,
> Service17CustomWrapper<TcReceiver, TmSender, TcInMemConverter, VerificationReporter>
{
pub fn handle_next_packet(&mut self) -> bool {
let res = self.pus17_handler.handle_one_tc();
pub fn handle_next_packet(&mut self, time_stamp: &[u8]) -> bool {
let res = self.pus17_handler.handle_one_tc(time_stamp);
if res.is_err() {
warn!("PUS17 handler failed with error {:?}", res.unwrap_err());
return true;