simplified PUS stack #178

Merged
muellerr merged 2 commits from simplify-pus-stack into main 2024-05-02 10:01:17 +02:00
15 changed files with 365 additions and 368 deletions

View File

@ -3,14 +3,13 @@ use std::net::{SocketAddr, UdpSocket};
use std::sync::mpsc; use std::sync::mpsc;
use log::{info, warn}; use log::{info, warn};
use satrs::pus::HandlingStatus;
use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderRaw}; use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderRaw};
use satrs::{ use satrs::{
hal::std::udp_server::{ReceiveResult, UdpTcServer}, hal::std::udp_server::{ReceiveResult, UdpTcServer},
pool::{PoolProviderWithGuards, SharedStaticMemoryPool}, pool::{PoolProviderWithGuards, SharedStaticMemoryPool},
}; };
use crate::pus::HandlingStatus;
pub trait UdpTmHandler { pub trait UdpTmHandler {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr); fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr);
} }

View File

@ -1,4 +1,4 @@
use log::{error, warn}; use log::warn;
use satrs::action::{ActionRequest, ActionRequestVariant}; use satrs::action::{ActionRequest, ActionRequestVariant};
use satrs::pool::SharedStaticMemoryPool; use satrs::pool::SharedStaticMemoryPool;
use satrs::pus::action::{ use satrs::pus::action::{
@ -12,7 +12,7 @@ use satrs::pus::verification::{
use satrs::pus::{ use satrs::pus::{
ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter,
EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, MpscTcReceiver, EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, MpscTcReceiver,
MpscTmAsVecSender, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, MpscTmAsVecSender, PusPacketHandlingError, PusReplyHandler, PusServiceHelper,
PusTcToRequestConverter, PusTcToRequestConverter,
}; };
use satrs::request::{GenericMessage, UniqueApidTargetId}; use satrs::request::{GenericMessage, UniqueApidTargetId};
@ -278,43 +278,23 @@ pub struct ActionServiceWrapper<TmSender: EcssTmSender, TcInMemConverter: EcssTc
impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter> TargetedPusService impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter> TargetedPusService
for ActionServiceWrapper<TmSender, TcInMemConverter> for ActionServiceWrapper<TmSender, TcInMemConverter>
{ {
/// Returns [true] if the packet handling is finished. const SERVICE_ID: u8 = 8;
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { const SERVICE_STR: &'static str = "action";
match self.service.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 8 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 8 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 8 subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
},
Err(error) => {
error!("PUS packet handling error: {error:?}");
// To avoid permanent loops on error cases.
return HandlingStatus::Empty;
}
}
HandlingStatus::HandledOne
}
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus { delegate::delegate! {
// This only fails if all senders disconnected. Treat it like an empty queue. to self.service {
self.service fn poll_and_handle_next_tc(
.poll_and_check_next_reply(time_stamp) &mut self,
.unwrap_or_else(|e| { time_stamp: &[u8],
warn!("PUS 8: Handling reply failed with error {e:?}"); ) -> Result<HandlingStatus, PusPacketHandlingError>;
HandlingStatus::Empty
})
}
fn check_for_request_timeouts(&mut self) { fn poll_and_handle_next_reply(
self.service.check_for_request_timeouts(); &mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError>;
fn check_for_request_timeouts(&mut self);
}
} }
} }
@ -429,7 +409,7 @@ mod tests {
} }
let result = result.unwrap(); let result = result.unwrap();
match result { match result {
PusPacketHandlerResult::RequestHandled => (), HandlingStatus::HandledOne => (),
_ => panic!("unexpected result {result:?}"), _ => panic!("unexpected result {result:?}"),
} }
} }
@ -441,19 +421,19 @@ mod tests {
} }
let result = result.unwrap(); let result = result.unwrap();
match result { match result {
PusPacketHandlerResult::Empty => (), HandlingStatus::Empty => (),
_ => panic!("unexpected result {result:?}"), _ => panic!("unexpected result {result:?}"),
} }
} }
pub fn verify_next_reply_is_handled_properly(&mut self, time_stamp: &[u8]) { pub fn verify_next_reply_is_handled_properly(&mut self, time_stamp: &[u8]) {
let result = self.service.poll_and_check_next_reply(time_stamp); let result = self.service.poll_and_handle_next_reply(time_stamp);
assert!(result.is_ok()); assert!(result.is_ok());
assert_eq!(result.unwrap(), HandlingStatus::HandledOne); assert_eq!(result.unwrap(), HandlingStatus::HandledOne);
} }
pub fn verify_all_replies_handled(&mut self, time_stamp: &[u8]) { pub fn verify_all_replies_handled(&mut self, time_stamp: &[u8]) {
let result = self.service.poll_and_check_next_reply(time_stamp); let result = self.service.poll_and_handle_next_reply(time_stamp);
assert!(result.is_ok()); assert!(result.is_ok());
assert_eq!(result.unwrap(), HandlingStatus::Empty); assert_eq!(result.unwrap(), HandlingStatus::Empty);
} }

View File

@ -7,8 +7,9 @@ use satrs::pus::event_man::EventRequestWithToken;
use satrs::pus::event_srv::PusEventServiceHandler; use satrs::pus::event_srv::PusEventServiceHandler;
use satrs::pus::verification::VerificationReporter; use satrs::pus::verification::VerificationReporter;
use satrs::pus::{ use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInMemConverter,
EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusServiceHelper, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTmSender, MpscTcReceiver,
MpscTmAsVecSender, PartialPusHandlingError, PusServiceHelper,
}; };
use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool}; use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool};
use satrs_example::config::components::PUS_EVENT_MANAGEMENT; use satrs_example::config::components::PUS_EVENT_MANAGEMENT;
@ -65,22 +66,24 @@ impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter>
EventServiceWrapper<TmSender, TcInMemConverter> EventServiceWrapper<TmSender, TcInMemConverter>
{ {
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self.handler.poll_and_handle_next_tc(time_stamp) { let error_handler = |partial_error: &PartialPusHandlingError| {
log::warn!("PUS 5 partial error: {:?}", partial_error);
};
match self
.handler
.poll_and_handle_next_tc(error_handler, time_stamp)
{
Ok(result) => match result { Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {} DirectPusPacketHandlerResult::Handled(handling_status) => return handling_status,
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { DirectPusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 5 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 5 invalid subservice {invalid}"); warn!("PUS 5 invalid subservice {invalid}");
} }
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 5 subservice {subservice} not implemented"); warn!("PUS 5 subservice {subservice} not implemented");
} }
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
}, },
Err(error) => { Err(error) => {
error!("PUS packet handling error: {error:?}") error!("PUS 5 packet handling error: {error:?}")
} }
} }
HandlingStatus::HandledOne HandlingStatus::HandledOne

View File

@ -1,5 +1,4 @@
use derive_new::new; use derive_new::new;
use log::{error, warn};
use satrs::hk::{CollectionIntervalFactor, HkRequest, HkRequestVariant, UniqueId}; use satrs::hk::{CollectionIntervalFactor, HkRequest, HkRequestVariant, UniqueId};
use satrs::pool::SharedStaticMemoryPool; use satrs::pool::SharedStaticMemoryPool;
use satrs::pus::verification::{ use satrs::pus::verification::{
@ -10,7 +9,7 @@ use satrs::pus::{
ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken, ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken,
EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTmSender, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTmSender,
EcssTmtcError, GenericConversionError, MpscTcReceiver, MpscTmAsVecSender, EcssTmtcError, GenericConversionError, MpscTcReceiver, MpscTmAsVecSender,
PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter, PusPacketHandlingError, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter,
}; };
use satrs::request::{GenericMessage, UniqueApidTargetId}; use satrs::request::{GenericMessage, UniqueApidTargetId};
use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::tc::PusTcReader;
@ -24,7 +23,7 @@ use std::time::Duration;
use crate::pus::{create_verification_reporter, generic_pus_request_timeout_handler}; use crate::pus::{create_verification_reporter, generic_pus_request_timeout_handler};
use crate::requests::GenericRequestRouter; use crate::requests::GenericRequestRouter;
use super::{HandlingStatus, PusTargetedRequestService}; use super::{HandlingStatus, PusTargetedRequestService, TargetedPusService};
#[derive(Clone, PartialEq, Debug, new)] #[derive(Clone, PartialEq, Debug, new)]
pub struct HkReply { pub struct HkReply {
@ -297,45 +296,27 @@ pub struct HkServiceWrapper<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMe
>, >,
} }
impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter> impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter> TargetedPusService
HkServiceWrapper<TmSender, TcInMemConverter> for HkServiceWrapper<TmSender, TcInMemConverter>
{ {
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { const SERVICE_ID: u8 = 3;
match self.service.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 3 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 3 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 3 subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
},
Err(error) => {
error!("PUS packet handling error: {error:?}");
// To avoid permanent loops on error cases.
return HandlingStatus::Empty;
}
}
HandlingStatus::HandledOne
}
pub fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus { const SERVICE_STR: &'static str = "housekeeping";
// This only fails if all senders disconnected. Treat it like an empty queue.
self.service
.poll_and_check_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!("PUS 3: Handling reply failed with error {e:?}");
HandlingStatus::Empty
})
}
pub fn check_for_request_timeouts(&mut self) { delegate::delegate! {
self.service.check_for_request_timeouts(); to self.service {
fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, PusPacketHandlingError>;
fn poll_and_handle_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError>;
fn check_for_request_timeouts(&mut self);
}
} }
} }

View File

@ -8,8 +8,8 @@ use satrs::pus::verification::{
use satrs::pus::{ use satrs::pus::{
ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter,
EcssTcReceiver, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError, EcssTcReceiver, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError,
PusPacketHandlerResult, PusPacketHandlingError, PusReplyHandler, PusRequestRouter, HandlingStatus, PusPacketHandlingError, PusReplyHandler, PusRequestRouter, PusServiceHelper,
PusServiceHelper, PusTcToRequestConverter, TcInMemory, PusTcToRequestConverter, TcInMemory,
}; };
use satrs::queue::{GenericReceiveError, GenericSendError}; use satrs::queue::{GenericReceiveError, GenericSendError};
use satrs::request::{Apid, GenericMessage, MessageMetadata}; use satrs::request::{Apid, GenericMessage, MessageMetadata};
@ -31,12 +31,6 @@ pub mod scheduler;
pub mod stack; pub mod stack;
pub mod test; pub mod test;
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum HandlingStatus {
Empty,
HandledOne,
}
pub fn create_verification_reporter(owner_id: ComponentId, apid: Apid) -> VerificationReporter { pub fn create_verification_reporter(owner_id: ComponentId, apid: Apid) -> VerificationReporter {
let verif_cfg = VerificationReporterCfg::new(apid, 1, 2, 8).unwrap(); let verif_cfg = VerificationReporterCfg::new(apid, 1, 2, 8).unwrap();
// Every software component which needs to generate verification telemetry, gets a cloned // Every software component which needs to generate verification telemetry, gets a cloned
@ -79,7 +73,7 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
pub fn handle_tc_packet_vec( pub fn handle_tc_packet_vec(
&mut self, &mut self,
packet_as_vec: PacketAsVec, packet_as_vec: PacketAsVec,
) -> Result<PusPacketHandlerResult, GenericSendError> { ) -> Result<HandlingStatus, GenericSendError> {
self.handle_tc_generic(packet_as_vec.sender_id, None, &packet_as_vec.packet) self.handle_tc_generic(packet_as_vec.sender_id, None, &packet_as_vec.packet)
} }
@ -87,7 +81,7 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
&mut self, &mut self,
packet_in_pool: PacketInPool, packet_in_pool: PacketInPool,
pus_tc_copy: &[u8], pus_tc_copy: &[u8],
) -> Result<PusPacketHandlerResult, GenericSendError> { ) -> Result<HandlingStatus, GenericSendError> {
self.handle_tc_generic( self.handle_tc_generic(
packet_in_pool.sender_id, packet_in_pool.sender_id,
Some(packet_in_pool.store_addr), Some(packet_in_pool.store_addr),
@ -100,7 +94,7 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
sender_id: ComponentId, sender_id: ComponentId,
addr_opt: Option<PoolAddr>, addr_opt: Option<PoolAddr>,
raw_tc: &[u8], raw_tc: &[u8],
) -> Result<PusPacketHandlerResult, GenericSendError> { ) -> Result<HandlingStatus, GenericSendError> {
let pus_tc_result = PusTcReader::new(raw_tc); let pus_tc_result = PusTcReader::new(raw_tc);
if pus_tc_result.is_err() { if pus_tc_result.is_err() {
log::warn!( log::warn!(
@ -109,7 +103,8 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
pus_tc_result.unwrap_err() pus_tc_result.unwrap_err()
); );
log::warn!("raw data: {:x?}", raw_tc); log::warn!("raw data: {:x?}", raw_tc);
return Ok(PusPacketHandlerResult::RequestHandled); // TODO: Shouldn't this be an error?
return Ok(HandlingStatus::HandledOne);
} }
let pus_tc = pus_tc_result.unwrap().0; let pus_tc = pus_tc_result.unwrap().0;
let init_token = self.verif_reporter.add_tc(&pus_tc); let init_token = self.verif_reporter.add_tc(&pus_tc);
@ -189,14 +184,53 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
} }
} }
} }
Ok(PusPacketHandlerResult::RequestHandled) Ok(HandlingStatus::HandledOne)
} }
} }
pub trait TargetedPusService { pub trait TargetedPusService {
/// Returns [true] if the packet handling is finished. const SERVICE_ID: u8;
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus; const SERVICE_STR: &'static str;
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus;
fn poll_and_handle_next_tc_default_handler(&mut self, time_stamp: &[u8]) -> HandlingStatus {
let result = self.poll_and_handle_next_tc(time_stamp);
if let Err(e) = result {
log::error!(
"PUS service {}({})packet handling error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
// To avoid permanent loops on error cases.
return HandlingStatus::Empty;
}
result.unwrap()
}
fn poll_and_handle_next_reply_default_handler(&mut self, time_stamp: &[u8]) -> HandlingStatus {
// This only fails if all senders disconnected. Treat it like an empty queue.
self.poll_and_handle_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!(
"PUS servce {}({}): Handling reply failed with error {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
HandlingStatus::Empty
})
}
fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, PusPacketHandlingError>;
fn poll_and_handle_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError>;
fn check_for_request_timeouts(&mut self); fn check_for_request_timeouts(&mut self);
} }
@ -297,10 +331,10 @@ where
pub fn poll_and_handle_next_tc( pub fn poll_and_handle_next_tc(
&mut self, &mut self,
time_stamp: &[u8], time_stamp: &[u8],
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { ) -> Result<HandlingStatus, PusPacketHandlingError> {
let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?; let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?;
if possible_packet.is_none() { if possible_packet.is_none() {
return Ok(PusPacketHandlerResult::Empty); return Ok(HandlingStatus::Empty);
} }
let ecss_tc_and_token = possible_packet.unwrap(); let ecss_tc_and_token = possible_packet.unwrap();
self.service_helper self.service_helper
@ -356,7 +390,7 @@ where
return Err(e.into()); return Err(e.into());
} }
} }
Ok(PusPacketHandlerResult::RequestHandled) Ok(HandlingStatus::HandledOne)
} }
fn handle_conversion_to_request_error( fn handle_conversion_to_request_error(
@ -409,7 +443,7 @@ where
} }
} }
pub fn poll_and_check_next_reply( pub fn poll_and_handle_next_reply(
&mut self, &mut self,
time_stamp: &[u8], time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError> { ) -> Result<HandlingStatus, EcssTmtcError> {

View File

@ -1,5 +1,4 @@
use derive_new::new; use derive_new::new;
use log::{error, warn};
use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool}; use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool};
use std::sync::mpsc; use std::sync::mpsc;
use std::time::Duration; use std::time::Duration;
@ -9,7 +8,7 @@ use satrs::pool::SharedStaticMemoryPool;
use satrs::pus::verification::VerificationReporter; use satrs::pus::verification::VerificationReporter;
use satrs::pus::{ use satrs::pus::{
DefaultActiveRequestMap, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, DefaultActiveRequestMap, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter,
EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlingError,
PusServiceHelper, PusServiceHelper,
}; };
use satrs::request::GenericMessage; use satrs::request::GenericMessage;
@ -36,7 +35,7 @@ use satrs::{
ComponentId, ComponentId,
}; };
use satrs_example::config::components::PUS_MODE_SERVICE; use satrs_example::config::components::PUS_MODE_SERVICE;
use satrs_example::config::{mode_err, tmtc_err}; use satrs_example::config::{mode_err, tmtc_err, CustomPusServiceId};
use super::{ use super::{
create_verification_reporter, generic_pus_request_timeout_handler, HandlingStatus, create_verification_reporter, generic_pus_request_timeout_handler, HandlingStatus,
@ -272,44 +271,27 @@ pub struct ModeServiceWrapper<TmSender: EcssTmSender, TcInMemConverter: EcssTcIn
impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter> TargetedPusService impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter> TargetedPusService
for ModeServiceWrapper<TmSender, TcInMemConverter> for ModeServiceWrapper<TmSender, TcInMemConverter>
{ {
/// Returns [true] if the packet handling is finished. const SERVICE_ID: u8 = CustomPusServiceId::Mode as u8;
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self.service.poll_and_handle_next_tc(time_stamp) { const SERVICE_STR: &'static str = "mode";
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {} delegate::delegate! {
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { to self.service {
warn!("PUS mode service: partial packet handling success: {e:?}") fn poll_and_handle_next_tc(
} &mut self,
PusPacketHandlerResult::CustomSubservice(invalid, _) => { time_stamp: &[u8],
warn!("PUS mode service: invalid subservice {invalid}"); ) -> Result<HandlingStatus, PusPacketHandlingError>;
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { fn poll_and_handle_next_reply(
warn!("PUS mode service: {subservice} not implemented"); &mut self,
} time_stamp: &[u8],
PusPacketHandlerResult::Empty => return HandlingStatus::Empty, ) -> Result<HandlingStatus, EcssTmtcError>;
},
Err(error) => { fn check_for_request_timeouts(&mut self);
error!("PUS mode service: packet handling error: {error:?}");
// To avoid permanent loops on error cases.
return HandlingStatus::Empty;
} }
} }
HandlingStatus::HandledOne
} }
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
self.service
.poll_and_check_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!("PUS action service: Handling reply failed with error {e:?}");
HandlingStatus::HandledOne
})
}
fn check_for_request_timeouts(&mut self) {
self.service.check_for_request_timeouts();
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use satrs::pus::test_util::{TEST_APID, TEST_COMPONENT_ID_0, TEST_UNIQUE_ID_0}; use satrs::pus::test_util::{TEST_APID, TEST_COMPONENT_ID_0, TEST_UNIQUE_ID_0};

View File

@ -8,8 +8,9 @@ use satrs::pus::scheduler::{PusScheduler, TcInfo};
use satrs::pus::scheduler_srv::PusSchedServiceHandler; use satrs::pus::scheduler_srv::PusSchedServiceHandler;
use satrs::pus::verification::VerificationReporter; use satrs::pus::verification::VerificationReporter;
use satrs::pus::{ use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInMemConverter,
EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusServiceHelper, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTmSender, MpscTcReceiver,
MpscTmAsVecSender, PartialPusHandlingError, PusServiceHelper,
}; };
use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool}; use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool};
use satrs::ComponentId; use satrs::ComponentId;
@ -105,25 +106,25 @@ impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter>
} }
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self let error_handler = |patial_error: &PartialPusHandlingError| {
.pus_11_handler log::warn!("PUS 11 partial error: {:?}", patial_error);
.poll_and_handle_next_tc(time_stamp, &mut self.sched_tc_pool) };
{ match self.pus_11_handler.poll_and_handle_next_tc(
error_handler,
time_stamp,
&mut self.sched_tc_pool,
) {
Ok(result) => match result { Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {} DirectPusPacketHandlerResult::Handled(handling_status) => return handling_status,
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { DirectPusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS11 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 11 invalid subservice {invalid}"); warn!("PUS 11 invalid subservice {invalid}");
} }
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS11: Subservice {subservice} not implemented"); warn!("PUS 11 Subservice {subservice} not implemented");
} }
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
}, },
Err(error) => { Err(error) => {
error!("PUS packet handling error: {error:?}") error!("PUS 11 packet handling error: {error:?}")
} }
} }
HandlingStatus::HandledOne HandlingStatus::HandledOne

View File

@ -2,8 +2,12 @@ use crate::pus::mode::ModeServiceWrapper;
use derive_new::new; use derive_new::new;
use satrs::{ use satrs::{
pus::{EcssTcInMemConverter, EcssTmSender}, pus::{EcssTcInMemConverter, EcssTmSender},
spacepackets::time::{cds, TimeWriter}, spacepackets::{
ecss::PusServiceId,
time::{cds, TimeWriter},
},
}; };
use satrs_example::config::CustomPusServiceId;
use super::{ use super::{
action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper, action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper,
@ -32,6 +36,7 @@ impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter>
.expect("time stamp generation error") .expect("time stamp generation error")
.to_vec() .to_vec()
.unwrap(); .unwrap();
// Hot loop which will run continuously until all request and reply handling is done.
loop { loop {
let mut nothing_to_do = true; let mut nothing_to_do = true;
let mut is_srv_finished = let mut is_srv_finished =
@ -46,33 +51,46 @@ impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter>
} }
}; };
is_srv_finished( is_srv_finished(
17, PusServiceId::Test as u8,
self.test_srv.poll_and_handle_next_packet(&time_stamp), self.test_srv.poll_and_handle_next_tc(&time_stamp),
None, None,
); );
is_srv_finished( is_srv_finished(
11, PusServiceId::Scheduling as u8,
self.schedule_srv.poll_and_handle_next_tc(&time_stamp), self.schedule_srv.poll_and_handle_next_tc(&time_stamp),
None, None,
); );
is_srv_finished(5, self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
is_srv_finished( is_srv_finished(
8, PusServiceId::Event as u8,
self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp), self.event_srv.poll_and_handle_next_tc(&time_stamp),
None,
);
is_srv_finished(
PusServiceId::Action as u8,
self.action_srv_wrapper
.poll_and_handle_next_tc_default_handler(&time_stamp),
Some( Some(
self.action_srv_wrapper self.action_srv_wrapper
.poll_and_handle_next_reply(&time_stamp), .poll_and_handle_next_reply_default_handler(&time_stamp),
), ),
); );
is_srv_finished( is_srv_finished(
3, PusServiceId::Housekeeping as u8,
self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp), self.hk_srv_wrapper
Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)), .poll_and_handle_next_tc_default_handler(&time_stamp),
Some(
self.hk_srv_wrapper
.poll_and_handle_next_reply_default_handler(&time_stamp),
),
); );
is_srv_finished( is_srv_finished(
200, CustomPusServiceId::Mode as u8,
self.mode_srv.poll_and_handle_next_tc(&time_stamp), self.mode_srv
Some(self.mode_srv.poll_and_handle_next_reply(&time_stamp)), .poll_and_handle_next_tc_default_handler(&time_stamp),
Some(
self.mode_srv
.poll_and_handle_next_reply_default_handler(&time_stamp),
),
); );
if nothing_to_do { if nothing_to_do {
// Timeout checking is only done once. // Timeout checking is only done once.

View File

@ -4,11 +4,11 @@ use satrs::event_man::{EventMessage, EventMessageU32};
use satrs::pool::SharedStaticMemoryPool; use satrs::pool::SharedStaticMemoryPool;
use satrs::pus::test::PusService17TestHandler; use satrs::pus::test::PusService17TestHandler;
use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider};
use satrs::pus::EcssTcInSharedStoreConverter;
use satrs::pus::{ use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, EcssTmSender, MpscTcReceiver, DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter,
MpscTmAsVecSender, PusPacketHandlerResult, PusServiceHelper, EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, PusServiceHelper,
}; };
use satrs::pus::{EcssTcInSharedStoreConverter, PartialPusHandlingError};
use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusPacket; use satrs::spacepackets::ecss::PusPacket;
use satrs::spacepackets::time::cds::CdsTime; use satrs::spacepackets::time::cds::CdsTime;
@ -67,27 +67,29 @@ pub struct TestCustomServiceWrapper<TmSender: EcssTmSender, TcInMemConverter: Ec
impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter> impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter>
TestCustomServiceWrapper<TmSender, TcInMemConverter> TestCustomServiceWrapper<TmSender, TcInMemConverter>
{ {
pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> HandlingStatus { pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
let res = self.handler.poll_and_handle_next_tc(time_stamp); let error_handler = |patial_error: &PartialPusHandlingError| {
log::warn!("PUS 17 partial error: {:?}", patial_error);
};
let res = self
.handler
.poll_and_handle_next_tc(error_handler, time_stamp);
if res.is_err() { if res.is_err() {
warn!("PUS17 handler failed with error {:?}", res.unwrap_err()); warn!("PUS 17 handler error: {:?}", res.unwrap_err());
return HandlingStatus::HandledOne; return HandlingStatus::HandledOne;
} }
match res.unwrap() { match res.unwrap() {
PusPacketHandlerResult::RequestHandled => { DirectPusPacketHandlerResult::Handled(handling_status) => {
if handling_status == HandlingStatus::HandledOne {
info!("Received PUS ping command TC[17,1]"); info!("Received PUS ping command TC[17,1]");
info!("Sent ping reply PUS TM[17,2]"); info!("Sent ping reply PUS TM[17,2]");
} }
PusPacketHandlerResult::RequestHandledPartialSuccess(partial_err) => { return handling_status;
warn!(
"Handled PUS ping command with partial success: {:?}",
partial_err
);
} }
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS17: Subservice {subservice} not implemented") warn!("PUS17: Subservice {subservice} not implemented")
} }
PusPacketHandlerResult::CustomSubservice(subservice, token) => { DirectPusPacketHandlerResult::CustomSubservice(subservice, token) => {
let (tc, _) = PusTcReader::new( let (tc, _) = PusTcReader::new(
self.handler self.handler
.service_helper .service_helper
@ -135,7 +137,6 @@ impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter>
.expect("Sending start failure verification failed"); .expect("Sending start failure verification failed");
} }
} }
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
} }
HandlingStatus::HandledOne HandlingStatus::HandledOne
} }

View File

@ -1,12 +1,13 @@
use satrs::{ use satrs::{
pool::PoolProvider, pool::PoolProvider,
pus::HandlingStatus,
tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool, SharedPacketPool}, tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool, SharedPacketPool},
}; };
use std::sync::mpsc::{self, TryRecvError}; use std::sync::mpsc::{self, TryRecvError};
use satrs::pus::MpscTmAsVecSender; use satrs::pus::MpscTmAsVecSender;
use crate::pus::{HandlingStatus, PusTcDistributor}; use crate::pus::PusTcDistributor;
// TC source components where static pools are the backing memory of the received telecommands. // TC source components where static pools are the backing memory of the received telecommands.
pub struct TcSourceTaskStatic { pub struct TcSourceTaskStatic {

View File

@ -54,11 +54,11 @@ pub type GenericActionReplyPus = GenericMessage<ActionReplyPus>;
impl GenericActionReplyPus { impl GenericActionReplyPus {
pub fn new_action_reply( pub fn new_action_reply(
requestor_info: MessageMetadata, replier_info: MessageMetadata,
action_id: ActionId, action_id: ActionId,
reply: ActionReplyVariant, reply: ActionReplyVariant,
) -> Self { ) -> Self {
Self::new(requestor_info, ActionReplyPus::new(action_id, reply)) Self::new(replier_info, ActionReplyPus::new(action_id, reply))
} }
} }

View File

@ -1,7 +1,7 @@
use crate::events::EventU32; use crate::events::EventU32;
use crate::pus::event_man::{EventRequest, EventRequestWithToken}; use crate::pus::event_man::{EventRequest, EventRequestWithToken};
use crate::pus::verification::TcStateToken; use crate::pus::verification::TcStateToken;
use crate::pus::{PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError}; use crate::pus::{DirectPusPacketHandlerResult, PartialPusHandlingError, PusPacketHandlingError};
use crate::queue::GenericSendError; use crate::queue::GenericSendError;
use spacepackets::ecss::event::Subservice; use spacepackets::ecss::event::Subservice;
use spacepackets::ecss::PusPacket; use spacepackets::ecss::PusPacket;
@ -10,7 +10,7 @@ use std::sync::mpsc::Sender;
use super::verification::VerificationReportingProvider; use super::verification::VerificationReportingProvider;
use super::{ use super::{
EcssTcInMemConverter, EcssTcReceiver, EcssTmSender, GenericConversionError, EcssTcInMemConverter, EcssTcReceiver, EcssTmSender, GenericConversionError,
GenericRoutingError, PusServiceHelper, GenericRoutingError, HandlingStatus, PusServiceHelper,
}; };
pub struct PusEventServiceHandler< pub struct PusEventServiceHandler<
@ -46,13 +46,14 @@ impl<
} }
} }
pub fn poll_and_handle_next_tc( pub fn poll_and_handle_next_tc<ErrorCb: FnMut(&PartialPusHandlingError)>(
&mut self, &mut self,
mut error_callback: ErrorCb,
time_stamp: &[u8], time_stamp: &[u8],
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { ) -> Result<DirectPusPacketHandlerResult, PusPacketHandlingError> {
let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?; let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?;
if possible_packet.is_none() { if possible_packet.is_none() {
return Ok(PusPacketHandlerResult::Empty); return Ok(HandlingStatus::Empty.into());
} }
let ecss_tc_and_token = possible_packet.unwrap(); let ecss_tc_and_token = possible_packet.unwrap();
self.service_helper self.service_helper
@ -62,13 +63,13 @@ impl<
let subservice = tc.subservice(); let subservice = tc.subservice();
let srv = Subservice::try_from(subservice); let srv = Subservice::try_from(subservice);
if srv.is_err() { if srv.is_err() {
return Ok(PusPacketHandlerResult::CustomSubservice( return Ok(DirectPusPacketHandlerResult::CustomSubservice(
tc.subservice(), tc.subservice(),
ecss_tc_and_token.token, ecss_tc_and_token.token,
)); ));
} }
let handle_enable_disable_request = let mut handle_enable_disable_request =
|enable: bool| -> Result<PusPacketHandlerResult, PusPacketHandlingError> { |enable: bool| -> Result<DirectPusPacketHandlerResult, PusPacketHandlingError> {
if tc.user_data().len() < 4 { if tc.user_data().len() < 4 {
return Err(GenericConversionError::NotEnoughAppData { return Err(GenericConversionError::NotEnoughAppData {
expected: 4, expected: 4,
@ -79,21 +80,20 @@ impl<
let user_data = tc.user_data(); let user_data = tc.user_data();
let event_u32 = let event_u32 =
EventU32::from(u32::from_be_bytes(user_data[0..4].try_into().unwrap())); EventU32::from(u32::from_be_bytes(user_data[0..4].try_into().unwrap()));
let start_token = self let mut token: TcStateToken = ecss_tc_and_token.token.into();
.service_helper match self.service_helper.common.verif_reporter.start_success(
.common
.verif_reporter
.start_success(
&self.service_helper.common.tm_sender, &self.service_helper.common.tm_sender,
ecss_tc_and_token.token, ecss_tc_and_token.token,
time_stamp, time_stamp,
) ) {
.map_err(|_| PartialPusHandlingError::Verification); Ok(start_token) => {
let partial_error = start_token.clone().err();
let mut token: TcStateToken = ecss_tc_and_token.token.into();
if let Ok(start_token) = start_token {
token = start_token.into(); token = start_token.into();
} }
Err(e) => {
error_callback(&PartialPusHandlingError::Verification(e));
}
}
let event_req_with_token = if enable { let event_req_with_token = if enable {
EventRequestWithToken { EventRequestWithToken {
request: EventRequest::Enable(event_u32), request: EventRequest::Enable(event_u32),
@ -112,12 +112,7 @@ impl<
GenericSendError::RxDisconnected, GenericSendError::RxDisconnected,
)) ))
})?; })?;
if let Some(partial_error) = partial_error { Ok(HandlingStatus::HandledOne.into())
return Ok(PusPacketHandlerResult::RequestHandledPartialSuccess(
partial_error,
));
}
Ok(PusPacketHandlerResult::RequestHandled)
}; };
match srv.unwrap() { match srv.unwrap() {
@ -136,14 +131,14 @@ impl<
handle_enable_disable_request(false)?; handle_enable_disable_request(false)?;
} }
Subservice::TcReportDisabledList | Subservice::TmDisabledEventsReport => { Subservice::TcReportDisabledList | Subservice::TmDisabledEventsReport => {
return Ok(PusPacketHandlerResult::SubserviceNotImplemented( return Ok(DirectPusPacketHandlerResult::SubserviceNotImplemented(
subservice, subservice,
ecss_tc_and_token.token, ecss_tc_and_token.token,
)); ));
} }
} }
Ok(PusPacketHandlerResult::RequestHandled) Ok(HandlingStatus::HandledOne.into())
} }
} }
@ -167,7 +162,7 @@ mod tests {
use crate::pus::verification::{ use crate::pus::verification::{
RequestId, VerificationReporter, VerificationReportingProvider, RequestId, VerificationReporter, VerificationReportingProvider,
}; };
use crate::pus::{GenericConversionError, MpscTcReceiver}; use crate::pus::{GenericConversionError, HandlingStatus, MpscTcReceiver};
use crate::tmtc::PacketSenderWithSharedPool; use crate::tmtc::PacketSenderWithSharedPool;
use crate::{ use crate::{
events::EventU32, events::EventU32,
@ -175,7 +170,7 @@ mod tests {
event_man::EventRequestWithToken, event_man::EventRequestWithToken,
tests::PusServiceHandlerWithSharedStoreCommon, tests::PusServiceHandlerWithSharedStoreCommon,
verification::{TcStateAccepted, VerificationToken}, verification::{TcStateAccepted, VerificationToken},
EcssTcInSharedStoreConverter, PusPacketHandlerResult, PusPacketHandlingError, DirectPusPacketHandlerResult, EcssTcInSharedStoreConverter, PusPacketHandlingError,
}, },
}; };
@ -229,9 +224,11 @@ mod tests {
} }
impl SimplePusPacketHandler for Pus5HandlerWithStoreTester { impl SimplePusPacketHandler for Pus5HandlerWithStoreTester {
fn handle_one_tc(&mut self) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { fn handle_one_tc(
&mut self,
) -> Result<DirectPusPacketHandlerResult, PusPacketHandlingError> {
let time_stamp = cds::CdsTime::new_with_u16_days(0, 0).to_vec().unwrap(); let time_stamp = cds::CdsTime::new_with_u16_days(0, 0).to_vec().unwrap();
self.handler.poll_and_handle_next_tc(&time_stamp) self.handler.poll_and_handle_next_tc(|_| {}, &time_stamp)
} }
} }
@ -293,10 +290,13 @@ mod tests {
let result = test_harness.handle_one_tc(); let result = test_harness.handle_one_tc();
assert!(result.is_ok()); assert!(result.is_ok());
let result = result.unwrap(); let result = result.unwrap();
if let PusPacketHandlerResult::Empty = result { assert!(
} else { matches!(
panic!("unexpected result type {result:?}") result,
} DirectPusPacketHandlerResult::Handled(HandlingStatus::Empty)
),
"unexpected result type {result:?}"
)
} }
#[test] #[test]
@ -311,7 +311,7 @@ mod tests {
let result = test_harness.handle_one_tc(); let result = test_harness.handle_one_tc();
assert!(result.is_ok()); assert!(result.is_ok());
let result = result.unwrap(); let result = result.unwrap();
if let PusPacketHandlerResult::CustomSubservice(subservice, _) = result { if let DirectPusPacketHandlerResult::CustomSubservice(subservice, _) = result {
assert_eq!(subservice, 200); assert_eq!(subservice, 200);
} else { } else {
panic!("unexpected result type {result:?}") panic!("unexpected result type {result:?}")

View File

@ -45,6 +45,15 @@ pub use std_mod::*;
use self::verification::VerificationReportingProvider; use self::verification::VerificationReportingProvider;
/// Generic handling status for an object which is able to continuosly handle a queue to handle
/// request or replies until the queue is empty.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum HandlingStatus {
HandledOne,
Empty,
}
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone)]
pub enum PusTmVariant<'time, 'src_data> { pub enum PusTmVariant<'time, 'src_data> {
InStore(PoolAddr), InStore(PoolAddr),
@ -649,14 +658,11 @@ pub mod alloc_mod {
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub mod std_mod { pub mod std_mod {
use super::*;
use crate::pool::{ use crate::pool::{
PoolAddr, PoolError, PoolProvider, PoolProviderWithGuards, SharedStaticMemoryPool, PoolAddr, PoolError, PoolProvider, PoolProviderWithGuards, SharedStaticMemoryPool,
}; };
use crate::pus::verification::{TcStateAccepted, VerificationToken}; use crate::pus::verification::{TcStateAccepted, VerificationToken};
use crate::pus::{
EcssTcAndToken, EcssTcReceiver, EcssTmSender, EcssTmtcError, GenericReceiveError,
GenericSendError, PusTmVariant, TryRecvTmtcError,
};
use crate::tmtc::{PacketAsVec, PacketSenderWithSharedPool}; use crate::tmtc::{PacketAsVec, PacketSenderWithSharedPool};
use crate::ComponentId; use crate::ComponentId;
use alloc::vec::Vec; use alloc::vec::Vec;
@ -920,26 +926,24 @@ pub mod std_mod {
#[error("generic timestamp generation error")] #[error("generic timestamp generation error")]
Time(#[from] StdTimestampError), Time(#[from] StdTimestampError),
#[error("error sending telemetry: {0}")] #[error("error sending telemetry: {0}")]
TmSend(#[from] EcssTmtcError), TmSend(EcssTmtcError),
#[error("error sending verification message")] #[error("error sending verification message")]
Verification, Verification(EcssTmtcError),
#[error("invalid verification token")] #[error("invalid verification token")]
NoVerificationToken, NoVerificationToken,
} }
/// Generic result type for handlers which can process PUS packets. /// Generic result type for handlers which can process PUS packets.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum PusPacketHandlerResult { pub enum DirectPusPacketHandlerResult {
RequestHandled, Handled(HandlingStatus),
RequestHandledPartialSuccess(PartialPusHandlingError),
SubserviceNotImplemented(u8, VerificationToken<TcStateAccepted>), SubserviceNotImplemented(u8, VerificationToken<TcStateAccepted>),
CustomSubservice(u8, VerificationToken<TcStateAccepted>), CustomSubservice(u8, VerificationToken<TcStateAccepted>),
Empty,
} }
impl From<PartialPusHandlingError> for PusPacketHandlerResult { impl From<HandlingStatus> for DirectPusPacketHandlerResult {
fn from(value: PartialPusHandlingError) -> Self { fn from(value: HandlingStatus) -> Self {
Self::RequestHandledPartialSuccess(value) Self::Handled(value)
} }
} }
@ -1222,7 +1226,7 @@ pub mod test_util {
use super::{ use super::{
verification::{self, TcStateAccepted, VerificationToken}, verification::{self, TcStateAccepted, VerificationToken},
PusPacketHandlerResult, PusPacketHandlingError, DirectPusPacketHandlerResult, PusPacketHandlingError,
}; };
pub const TEST_APID: u16 = 0x101; pub const TEST_APID: u16 = 0x101;
@ -1246,7 +1250,8 @@ pub mod test_util {
} }
pub trait SimplePusPacketHandler { pub trait SimplePusPacketHandler {
fn handle_one_tc(&mut self) -> Result<PusPacketHandlerResult, PusPacketHandlingError>; fn handle_one_tc(&mut self)
-> Result<DirectPusPacketHandlerResult, PusPacketHandlingError>;
} }
} }

View File

@ -1,11 +1,12 @@
use super::scheduler::PusSchedulerProvider; use super::scheduler::PusSchedulerProvider;
use super::verification::{VerificationReporter, VerificationReportingProvider}; use super::verification::{VerificationReporter, VerificationReportingProvider};
use super::{ use super::{
EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcReceiver, DirectPusPacketHandlerResult, EcssTcInMemConverter, EcssTcInSharedStoreConverter,
EcssTmSender, MpscTcReceiver, PusServiceHelper, EcssTcInVecConverter, EcssTcReceiver, EcssTmSender, HandlingStatus, MpscTcReceiver,
PartialPusHandlingError, PusServiceHelper,
}; };
use crate::pool::PoolProvider; use crate::pool::PoolProvider;
use crate::pus::{PusPacketHandlerResult, PusPacketHandlingError}; use crate::pus::PusPacketHandlingError;
use crate::tmtc::{PacketAsVec, PacketSenderWithSharedPool}; use crate::tmtc::{PacketAsVec, PacketSenderWithSharedPool};
use alloc::string::ToString; use alloc::string::ToString;
use spacepackets::ecss::{scheduling, PusPacket}; use spacepackets::ecss::{scheduling, PusPacket};
@ -64,14 +65,15 @@ impl<
&self.scheduler &self.scheduler
} }
pub fn poll_and_handle_next_tc( pub fn poll_and_handle_next_tc<ErrorCb: FnMut(&PartialPusHandlingError)>(
&mut self, &mut self,
mut error_callback: ErrorCb,
time_stamp: &[u8], time_stamp: &[u8],
sched_tc_pool: &mut (impl PoolProvider + ?Sized), sched_tc_pool: &mut (impl PoolProvider + ?Sized),
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { ) -> Result<DirectPusPacketHandlerResult, PusPacketHandlingError> {
let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?; let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?;
if possible_packet.is_none() { if possible_packet.is_none() {
return Ok(PusPacketHandlerResult::Empty); return Ok(HandlingStatus::Empty.into());
} }
let ecss_tc_and_token = possible_packet.unwrap(); let ecss_tc_and_token = possible_packet.unwrap();
self.service_helper self.service_helper
@ -81,34 +83,34 @@ impl<
let subservice = PusPacket::subservice(&tc); let subservice = PusPacket::subservice(&tc);
let standard_subservice = scheduling::Subservice::try_from(subservice); let standard_subservice = scheduling::Subservice::try_from(subservice);
if standard_subservice.is_err() { if standard_subservice.is_err() {
return Ok(PusPacketHandlerResult::CustomSubservice( return Ok(DirectPusPacketHandlerResult::CustomSubservice(
subservice, subservice,
ecss_tc_and_token.token, ecss_tc_and_token.token,
)); ));
} }
let partial_error = None;
match standard_subservice.unwrap() { match standard_subservice.unwrap() {
scheduling::Subservice::TcEnableScheduling => { scheduling::Subservice::TcEnableScheduling => {
let start_token = self let opt_started_token = match self.service_helper.verif_reporter().start_success(
.service_helper
.verif_reporter()
.start_success(
&self.service_helper.common.tm_sender, &self.service_helper.common.tm_sender,
ecss_tc_and_token.token, ecss_tc_and_token.token,
time_stamp, time_stamp,
) ) {
.expect("Error sending start success"); Ok(started_token) => Some(started_token),
Err(e) => {
error_callback(&PartialPusHandlingError::Verification(e));
None
}
};
self.scheduler.enable(); self.scheduler.enable();
if self.scheduler.is_enabled() {
self.service_helper if self.scheduler.is_enabled() && opt_started_token.is_some() {
.verif_reporter() if let Err(e) = self.service_helper.verif_reporter().completion_success(
.completion_success(
&self.service_helper.common.tm_sender, &self.service_helper.common.tm_sender,
start_token, opt_started_token.unwrap(),
time_stamp, time_stamp,
) ) {
.expect("Error sending completion success"); error_callback(&PartialPusHandlingError::Verification(e));
}
} else { } else {
return Err(PusPacketHandlingError::Other( return Err(PusPacketHandlingError::Other(
"failed to enabled scheduler".to_string(), "failed to enabled scheduler".to_string(),
@ -116,26 +118,27 @@ impl<
} }
} }
scheduling::Subservice::TcDisableScheduling => { scheduling::Subservice::TcDisableScheduling => {
let start_token = self let opt_started_token = match self.service_helper.verif_reporter().start_success(
.service_helper
.verif_reporter()
.start_success(
&self.service_helper.common.tm_sender, &self.service_helper.common.tm_sender,
ecss_tc_and_token.token, ecss_tc_and_token.token,
time_stamp, time_stamp,
) ) {
.expect("Error sending start success"); Ok(started_token) => Some(started_token),
Err(e) => {
error_callback(&PartialPusHandlingError::Verification(e));
None
}
};
self.scheduler.disable(); self.scheduler.disable();
if !self.scheduler.is_enabled() { if !self.scheduler.is_enabled() && opt_started_token.is_some() {
self.service_helper if let Err(e) = self.service_helper.verif_reporter().completion_success(
.verif_reporter()
.completion_success(
&self.service_helper.common.tm_sender, &self.service_helper.common.tm_sender,
start_token, opt_started_token.unwrap(),
time_stamp, time_stamp,
) ) {
.expect("Error sending completion success"); error_callback(&PartialPusHandlingError::Verification(e));
}
} else { } else {
return Err(PusPacketHandlingError::Other( return Err(PusPacketHandlingError::Other(
"failed to disable scheduler".to_string(), "failed to disable scheduler".to_string(),
@ -194,18 +197,13 @@ impl<
} }
_ => { _ => {
// Treat unhandled standard subservices as custom subservices for now. // Treat unhandled standard subservices as custom subservices for now.
return Ok(PusPacketHandlerResult::CustomSubservice( return Ok(DirectPusPacketHandlerResult::CustomSubservice(
subservice, subservice,
ecss_tc_and_token.token, ecss_tc_and_token.token,
)); ));
} }
} }
if let Some(partial_error) = partial_error { Ok(HandlingStatus::HandledOne.into())
return Ok(PusPacketHandlerResult::RequestHandledPartialSuccess(
partial_error,
));
}
Ok(PusPacketHandlerResult::RequestHandled)
} }
} }
/// Helper type definition for a PUS 11 handler with a dynamic TMTC memory backend and regular /// Helper type definition for a PUS 11 handler with a dynamic TMTC memory backend and regular
@ -257,7 +255,7 @@ mod tests {
verification::{RequestId, TcStateAccepted, VerificationToken}, verification::{RequestId, TcStateAccepted, VerificationToken},
EcssTcInSharedStoreConverter, EcssTcInSharedStoreConverter,
}; };
use crate::pus::{MpscTcReceiver, PusPacketHandlerResult, PusPacketHandlingError}; use crate::pus::{DirectPusPacketHandlerResult, MpscTcReceiver, PusPacketHandlingError};
use crate::tmtc::PacketSenderWithSharedPool; use crate::tmtc::PacketSenderWithSharedPool;
use alloc::collections::VecDeque; use alloc::collections::VecDeque;
use delegate::delegate; use delegate::delegate;
@ -298,10 +296,12 @@ mod tests {
} }
} }
pub fn handle_one_tc(&mut self) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { pub fn handle_one_tc(
&mut self,
) -> Result<DirectPusPacketHandlerResult, PusPacketHandlingError> {
let time_stamp = cds::CdsTime::new_with_u16_days(0, 0).to_vec().unwrap(); let time_stamp = cds::CdsTime::new_with_u16_days(0, 0).to_vec().unwrap();
self.handler self.handler
.poll_and_handle_next_tc(&time_stamp, &mut self.sched_tc_pool) .poll_and_handle_next_tc(|_| {}, &time_stamp, &mut self.sched_tc_pool)
} }
} }
@ -387,7 +387,7 @@ mod tests {
let time_stamp = cds::CdsTime::new_with_u16_days(0, 0).to_vec().unwrap(); let time_stamp = cds::CdsTime::new_with_u16_days(0, 0).to_vec().unwrap();
test_harness test_harness
.handler .handler
.poll_and_handle_next_tc(&time_stamp, &mut test_harness.sched_tc_pool) .poll_and_handle_next_tc(|_| {}, &time_stamp, &mut test_harness.sched_tc_pool)
.unwrap(); .unwrap();
test_harness.check_next_verification_tm(1, request_id); test_harness.check_next_verification_tm(1, request_id);
test_harness.check_next_verification_tm(3, request_id); test_harness.check_next_verification_tm(3, request_id);

View File

@ -1,5 +1,5 @@
use crate::pus::{ use crate::pus::{
PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError, PusTmVariant, DirectPusPacketHandlerResult, PartialPusHandlingError, PusPacketHandlingError, PusTmVariant,
}; };
use crate::tmtc::{PacketAsVec, PacketSenderWithSharedPool}; use crate::tmtc::{PacketAsVec, PacketSenderWithSharedPool};
use spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader}; use spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader};
@ -10,7 +10,7 @@ use std::sync::mpsc;
use super::verification::{VerificationReporter, VerificationReportingProvider}; use super::verification::{VerificationReporter, VerificationReportingProvider};
use super::{ use super::{
EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcReceiver, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcReceiver,
EcssTmSender, GenericConversionError, MpscTcReceiver, PusServiceHelper, EcssTmSender, GenericConversionError, HandlingStatus, MpscTcReceiver, PusServiceHelper,
}; };
/// This is a helper class for [std] environments to handle generic PUS 17 (test service) packets. /// This is a helper class for [std] environments to handle generic PUS 17 (test service) packets.
@ -43,13 +43,14 @@ impl<
Self { service_helper } Self { service_helper }
} }
pub fn poll_and_handle_next_tc( pub fn poll_and_handle_next_tc<ErrorCb: FnMut(&PartialPusHandlingError)>(
&mut self, &mut self,
mut error_callback: ErrorCb,
time_stamp: &[u8], time_stamp: &[u8],
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { ) -> Result<DirectPusPacketHandlerResult, PusPacketHandlingError> {
let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?; let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?;
if possible_packet.is_none() { if possible_packet.is_none() {
return Ok(PusPacketHandlerResult::Empty); return Ok(HandlingStatus::Empty.into());
} }
let ecss_tc_and_token = possible_packet.unwrap(); let ecss_tc_and_token = possible_packet.unwrap();
self.service_helper self.service_helper
@ -60,21 +61,16 @@ impl<
return Err(GenericConversionError::WrongService(tc.service()).into()); return Err(GenericConversionError::WrongService(tc.service()).into());
} }
if tc.subservice() == 1 { if tc.subservice() == 1 {
let mut partial_error = None; let opt_started_token = match self.service_helper.verif_reporter().start_success(
let result = self
.service_helper
.verif_reporter()
.start_success(
&self.service_helper.common.tm_sender, &self.service_helper.common.tm_sender,
ecss_tc_and_token.token, ecss_tc_and_token.token,
time_stamp, time_stamp,
) ) {
.map_err(|_| PartialPusHandlingError::Verification); Ok(token) => Some(token),
let start_token = if let Ok(result) = result { Err(e) => {
Some(result) error_callback(&PartialPusHandlingError::Verification(e));
} else {
partial_error = Some(result.unwrap_err());
None None
}
}; };
// Sequence count will be handled centrally in TM funnel. // Sequence count will be handled centrally in TM funnel.
// It is assumed that the verification reporter was built with a valid APID, so we use // It is assumed that the verification reporter was built with a valid APID, so we use
@ -83,42 +79,30 @@ impl<
SpHeader::new_for_unseg_tm(self.service_helper.verif_reporter().apid(), 0, 0); SpHeader::new_for_unseg_tm(self.service_helper.verif_reporter().apid(), 0, 0);
let tc_header = PusTmSecondaryHeader::new_simple(17, 2, time_stamp); let tc_header = PusTmSecondaryHeader::new_simple(17, 2, time_stamp);
let ping_reply = PusTmCreator::new(reply_header, tc_header, &[], true); let ping_reply = PusTmCreator::new(reply_header, tc_header, &[], true);
let result = self if let Err(e) = self
.service_helper .service_helper
.common .common
.tm_sender .tm_sender
.send_tm(self.service_helper.id(), PusTmVariant::Direct(ping_reply)) .send_tm(self.service_helper.id(), PusTmVariant::Direct(ping_reply))
.map_err(PartialPusHandlingError::TmSend); {
if let Err(err) = result { error_callback(&PartialPusHandlingError::TmSend(e));
partial_error = Some(err);
} }
if let Some(start_token) = opt_started_token {
if let Some(start_token) = start_token { if let Err(e) = self.service_helper.verif_reporter().completion_success(
if self
.service_helper
.verif_reporter()
.completion_success(
&self.service_helper.common.tm_sender, &self.service_helper.common.tm_sender,
start_token, start_token,
time_stamp, time_stamp,
) ) {
.is_err() error_callback(&PartialPusHandlingError::Verification(e));
{
partial_error = Some(PartialPusHandlingError::Verification)
} }
} }
if let Some(partial_error) = partial_error {
return Ok(PusPacketHandlerResult::RequestHandledPartialSuccess(
partial_error,
));
};
} else { } else {
return Ok(PusPacketHandlerResult::CustomSubservice( return Ok(DirectPusPacketHandlerResult::CustomSubservice(
tc.subservice(), tc.subservice(),
ecss_tc_and_token.token, ecss_tc_and_token.token,
)); ));
} }
Ok(PusPacketHandlerResult::RequestHandled) Ok(HandlingStatus::HandledOne.into())
} }
} }
@ -158,8 +142,9 @@ mod tests {
}; };
use crate::pus::verification::{TcStateAccepted, VerificationToken}; use crate::pus::verification::{TcStateAccepted, VerificationToken};
use crate::pus::{ use crate::pus::{
EcssTcInSharedStoreConverter, EcssTcInVecConverter, GenericConversionError, MpscTcReceiver, DirectPusPacketHandlerResult, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
MpscTmAsVecSender, PusPacketHandlerResult, PusPacketHandlingError, GenericConversionError, HandlingStatus, MpscTcReceiver, MpscTmAsVecSender,
PartialPusHandlingError, PusPacketHandlingError,
}; };
use crate::tmtc::PacketSenderWithSharedPool; use crate::tmtc::PacketSenderWithSharedPool;
use crate::ComponentId; use crate::ComponentId;
@ -221,9 +206,12 @@ mod tests {
} }
} }
impl SimplePusPacketHandler for Pus17HandlerWithStoreTester { impl SimplePusPacketHandler for Pus17HandlerWithStoreTester {
fn handle_one_tc(&mut self) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { fn handle_one_tc(
&mut self,
) -> Result<DirectPusPacketHandlerResult, PusPacketHandlingError> {
let time_stamp = cds::CdsTime::new_with_u16_days(0, 0).to_vec().unwrap(); let time_stamp = cds::CdsTime::new_with_u16_days(0, 0).to_vec().unwrap();
self.handler.poll_and_handle_next_tc(&time_stamp) self.handler
.poll_and_handle_next_tc(|_partial_error: &PartialPusHandlingError| {}, &time_stamp)
} }
} }
@ -276,9 +264,12 @@ mod tests {
} }
} }
impl SimplePusPacketHandler for Pus17HandlerWithVecTester { impl SimplePusPacketHandler for Pus17HandlerWithVecTester {
fn handle_one_tc(&mut self) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { fn handle_one_tc(
&mut self,
) -> Result<DirectPusPacketHandlerResult, PusPacketHandlingError> {
let time_stamp = cds::CdsTime::new_with_u16_days(0, 0).to_vec().unwrap(); let time_stamp = cds::CdsTime::new_with_u16_days(0, 0).to_vec().unwrap();
self.handler.poll_and_handle_next_tc(&time_stamp) self.handler
.poll_and_handle_next_tc(|_partial_error: &PartialPusHandlingError| {}, &time_stamp)
} }
} }
@ -328,10 +319,11 @@ mod tests {
let mut test_harness = Pus17HandlerWithStoreTester::new(0); let mut test_harness = Pus17HandlerWithStoreTester::new(0);
let result = test_harness.handle_one_tc(); let result = test_harness.handle_one_tc();
assert!(result.is_ok()); assert!(result.is_ok());
let result = result.unwrap(); match result.unwrap() {
if let PusPacketHandlerResult::Empty = result { DirectPusPacketHandlerResult::Handled(handled) => {
} else { assert_eq!(handled, HandlingStatus::Empty);
panic!("unexpected result type {result:?}") }
_ => panic!("unexpected result"),
} }
} }
@ -367,7 +359,7 @@ mod tests {
let result = test_harness.handle_one_tc(); let result = test_harness.handle_one_tc();
assert!(result.is_ok()); assert!(result.is_ok());
let result = result.unwrap(); let result = result.unwrap();
if let PusPacketHandlerResult::CustomSubservice(subservice, _) = result { if let DirectPusPacketHandlerResult::CustomSubservice(subservice, _) = result {
assert_eq!(subservice, 200); assert_eq!(subservice, 200);
} else { } else {
panic!("unexpected result type {result:?}") panic!("unexpected result type {result:?}")