bump sat-rs and improve PUS stack #29

Merged
muellerr merged 1 commits from bump-satrs-improve-pus-stack into main 2024-05-02 12:35:37 +02:00
13 changed files with 397 additions and 396 deletions
Showing only changes of commit 66a18e08e5 - Show all commits

26
Cargo.lock generated
View File

@ -248,6 +248,17 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "delegate"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e018fccbeeb50ff26562ece792ed06659b9c2dae79ece77c4456bb10d9bf79b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.60",
]
[[package]] [[package]]
name = "derive-new" name = "derive-new"
version = "0.6.0" version = "0.6.0"
@ -631,6 +642,7 @@ name = "ops-sat-rs"
version = "0.1.1" version = "0.1.1"
dependencies = [ dependencies = [
"chrono", "chrono",
"delegate 0.12.0",
"derive-new", "derive-new",
"env_logger", "env_logger",
"fern", "fern",
@ -776,13 +788,13 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
[[package]] [[package]]
name = "satrs" name = "satrs"
version = "0.2.0-rc.5" version = "0.2.0-rc.5"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#29f71c2a571e7492cf5d997d4c11c3f844de83bc" source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#9ffe4d0ae02064444da606b2cd1a3c0dd6858fbb"
dependencies = [ dependencies = [
"bus", "bus",
"cobs", "cobs",
"crc", "crc",
"crossbeam-channel", "crossbeam-channel",
"delegate", "delegate 0.10.0",
"derive-new", "derive-new",
"downcast-rs", "downcast-rs",
"dyn-clone", "dyn-clone",
@ -835,9 +847,9 @@ dependencies = [
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.199" version = "1.0.200"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c9f6e76df036c77cd94996771fb40db98187f096dd0b9af39c6c6e452ba966a" checksum = "ddc6f9cc94d67c0e21aaf7eda3a010fd3af78ebf6e096aa6e2e13c79749cce4f"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
@ -855,9 +867,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.199" version = "1.0.200"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11bd257a6541e141e42ca6d24ae26f7714887b47e89aa739099104c7e4d3b7fc" checksum = "856f046b9400cee3c8c94ed572ecdb752444c24528c035cd35882aad6f492bcb"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -926,7 +938,7 @@ checksum = "fa9f4d7df5fa3bc25ecfc95f1f612fc3d16c566df538d3d3c82db0e523096216"
dependencies = [ dependencies = [
"chrono", "chrono",
"crc", "crc",
"delegate", "delegate 0.10.0",
"num-traits", "num-traits",
"num_enum", "num_enum",
"serde", "serde",

View File

@ -10,6 +10,7 @@ fern = "0.6"
toml = "0.8" toml = "0.8"
chrono = "0.4" chrono = "0.4"
log = "0.4" log = "0.4"
delegate = "0.12"
lazy_static = "1" lazy_static = "1"
humantime = "2" humantime = "2"
strum = { version = "0.26", features = ["derive"] } strum = { version = "0.26", features = ["derive"] }

View File

@ -2,8 +2,8 @@ use std::net::{SocketAddr, UdpSocket};
use std::sync::mpsc; use std::sync::mpsc;
use log::{info, warn}; use log::{info, warn};
use ops_sat_rs::HandlingStatus;
use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer}; use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer};
use satrs::pus::HandlingStatus;
use satrs::queue::GenericSendError; use satrs::queue::GenericSendError;
use satrs::tmtc::PacketAsVec; use satrs::tmtc::PacketAsVec;
@ -125,7 +125,7 @@ mod tests {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap(); let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap();
let tm_handler = TestTmHandler::default(); let tm_handler = TestTmHandler::default();
let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); let _tm_handler_calls = tm_handler.addrs_to_send_to.clone();
let mut udp_dyn_server = UdpTmtcServer { let mut udp_dyn_server = UdpTmtcServer {
udp_tc_server, udp_tc_server,
tm_handler, tm_handler,

View File

@ -3,12 +3,6 @@ use satrs::spacepackets::time::TimeWriter;
pub mod config; pub mod config;
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum HandlingStatus {
Empty,
HandledOne,
}
#[derive(Debug)] #[derive(Debug)]
pub struct TimeStampHelper { pub struct TimeStampHelper {
stamper: CdsTime, stamper: CdsTime,

View File

@ -1,4 +1,4 @@
use log::{error, warn}; use log::warn;
use ops_sat_rs::config::components::PUS_ACTION_SERVICE; use ops_sat_rs::config::components::PUS_ACTION_SERVICE;
use ops_sat_rs::config::tmtc_err; use ops_sat_rs::config::tmtc_err;
use ops_sat_rs::TimeStampHelper; use ops_sat_rs::TimeStampHelper;
@ -13,13 +13,13 @@ use satrs::pus::verification::{
}; };
use satrs::pus::{ use satrs::pus::{
ActiveRequestProvider, EcssTcAndToken, EcssTcInVecConverter, EcssTmSender, EcssTmtcError, ActiveRequestProvider, EcssTcAndToken, EcssTcInVecConverter, EcssTmSender, EcssTmtcError,
GenericConversionError, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, GenericConversionError, HandlingStatus, PusPacketHandlingError, PusReplyHandler,
PusTcToRequestConverter, PusTmVariant, PusServiceHelper, PusTcToRequestConverter, PusTmVariant,
}; };
use satrs::request::{GenericMessage, UniqueApidTargetId}; use satrs::request::{GenericMessage, UniqueApidTargetId};
use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader}; use satrs::spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader};
use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket}; use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket, PusServiceId};
use satrs::spacepackets::SpHeader; use satrs::spacepackets::SpHeader;
use satrs::tmtc::PacketAsVec; use satrs::tmtc::PacketAsVec;
use std::sync::mpsc; use std::sync::mpsc;
@ -28,8 +28,8 @@ use std::time::Duration;
use crate::requests::GenericRequestRouter; use crate::requests::GenericRequestRouter;
use super::{ use super::{
create_verification_reporter, generic_pus_request_timeout_handler, HandlingStatus, create_verification_reporter, generic_pus_request_timeout_handler, PusTargetedRequestService,
PusTargetedRequestService, TargetedPusService, TargetedPusService,
}; };
pub const DATA_REPLY: u8 = 130; pub const DATA_REPLY: u8 = 130;
@ -248,47 +248,23 @@ pub struct ActionServiceWrapper {
} }
impl TargetedPusService for ActionServiceWrapper { impl TargetedPusService for ActionServiceWrapper {
/// Returns [true] if the packet handling is finished. const SERVICE_ID: u8 = PusServiceId::Action as u8;
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { const SERVICE_STR: &'static str = "action";
match self.service.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 8 partial packet handling success: {e:?}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 8 invalid subservice {invalid}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 8 subservice {subservice} not implemented");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::Empty => (),
},
Err(error) => {
error!("PUS packet handling error: {error:?}");
}
}
// To avoid permanent loops, treat queue empty by default (all tasks done).
HandlingStatus::Empty
}
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus { delegate::delegate! {
// This only fails if all senders disconnected. Treat it like an empty queue. to self.service {
self.service fn poll_and_handle_next_tc(
.poll_and_check_next_reply(time_stamp) &mut self,
.unwrap_or_else(|e| { time_stamp: &[u8],
warn!("PUS 8: Handling reply failed with error {e:?}"); ) -> Result<HandlingStatus, PusPacketHandlingError>;
HandlingStatus::Empty
})
}
fn check_for_request_timeouts(&mut self) { fn poll_and_handle_next_reply(
self.service.check_for_request_timeouts(); &mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError>;
fn check_for_request_timeouts(&mut self);
}
} }
} }
@ -317,8 +293,8 @@ mod tests {
use satrs::pus::test_util::{ use satrs::pus::test_util::{
TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1, TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1,
}; };
use satrs::pus::verification;
use satrs::pus::verification::test_util::TestVerificationReporter; use satrs::pus::verification::test_util::TestVerificationReporter;
use satrs::pus::{verification, TcInMemory};
use satrs::request::MessageMetadata; use satrs::request::MessageMetadata;
use satrs::ComponentId; use satrs::ComponentId;
use satrs::{ use satrs::{
@ -423,7 +399,7 @@ mod tests {
} }
let result = result.unwrap(); let result = result.unwrap();
match result { match result {
PusPacketHandlerResult::RequestHandled => (), HandlingStatus::HandledOne => (),
_ => panic!("unexpected result {result:?}"), _ => panic!("unexpected result {result:?}"),
} }
} }
@ -435,19 +411,19 @@ mod tests {
} }
let result = result.unwrap(); let result = result.unwrap();
match result { match result {
PusPacketHandlerResult::Empty => (), HandlingStatus::Empty => (),
_ => panic!("unexpected result {result:?}"), _ => panic!("unexpected result {result:?}"),
} }
} }
pub fn verify_next_reply_is_handled_properly(&mut self, time_stamp: &[u8]) { pub fn verify_next_reply_is_handled_properly(&mut self, time_stamp: &[u8]) {
let result = self.service.poll_and_check_next_reply(time_stamp); let result = self.service.poll_and_handle_next_reply(time_stamp);
assert!(result.is_ok()); assert!(result.is_ok());
assert_eq!(result.unwrap(), HandlingStatus::HandledOne); assert_eq!(result.unwrap(), HandlingStatus::HandledOne);
} }
pub fn verify_all_replies_handled(&mut self, time_stamp: &[u8]) { pub fn verify_all_replies_handled(&mut self, time_stamp: &[u8]) {
let result = self.service.poll_and_check_next_reply(time_stamp); let result = self.service.poll_and_handle_next_reply(time_stamp);
assert!(result.is_ok()); assert!(result.is_ok());
assert_eq!(result.unwrap(), HandlingStatus::Empty); assert_eq!(result.unwrap(), HandlingStatus::Empty);
} }
@ -472,11 +448,7 @@ mod tests {
.check_next_is_acceptance_success(id, accepted_token.request_id()); .check_next_is_acceptance_success(id, accepted_token.request_id());
self.pus_packet_tx self.pus_packet_tx
.send(EcssTcAndToken::new( .send(EcssTcAndToken::new(
TcInMemory::Vec(PacketAsVec::new( PacketAsVec::new(self.service.service_helper.id(), tc.to_vec().unwrap()),
self.service.service_helper.id(),
//tc.to_vec().unwrap().into(),
tc.to_vec().unwrap(),
)),
accepted_token, accepted_token,
)) ))
.unwrap(); .unwrap();

View File

@ -1,15 +1,16 @@
use std::sync::mpsc; use std::sync::mpsc;
use super::HandlingStatus; use super::{DirectPusService, HandlingStatus};
use crate::pus::create_verification_reporter; use crate::pus::create_verification_reporter;
use log::{error, warn};
use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT; use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT;
use satrs::pus::event_man::EventRequestWithToken; use satrs::pus::event_man::EventRequestWithToken;
use satrs::pus::event_srv::PusEventServiceHandler; use satrs::pus::event_srv::PusEventServiceHandler;
use satrs::pus::verification::VerificationReporter; use satrs::pus::verification::VerificationReporter;
use satrs::pus::{ use satrs::pus::{
EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper, DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver,
PartialPusHandlingError, PusServiceHelper,
}; };
use satrs::spacepackets::ecss::PusServiceId;
use satrs::tmtc::PacketAsVec; use satrs::tmtc::PacketAsVec;
pub fn create_event_service( pub fn create_event_service(
@ -41,32 +42,51 @@ pub struct EventServiceWrapper {
>, >,
} }
impl EventServiceWrapper { impl DirectPusService for EventServiceWrapper {
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { const SERVICE_ID: u8 = PusServiceId::Event as u8;
match self.handler.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result { const SERVICE_STR: &'static str = "events";
PusPacketHandlerResult::RequestHandled => {
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
let error_handler = |partial_error: &PartialPusHandlingError| {
log::warn!(
"PUS {}({}) partial error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
partial_error
);
};
let result = self
.handler
.poll_and_handle_next_tc(error_handler, time_stamp);
if let Err(e) = result {
log::warn!(
"PUS {}({}) error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
return HandlingStatus::HandledOne; return HandlingStatus::HandledOne;
} }
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { match result.unwrap() {
warn!("PUS 5 partial packet handling success: {e:?}"); DirectPusPacketHandlerResult::Handled(handling_status) => return handling_status,
return HandlingStatus::HandledOne; DirectPusPacketHandlerResult::CustomSubservice(subservice, _) => {
log::warn!(
"PUS {}({}) subservice {} not implemented",
Self::SERVICE_ID,
Self::SERVICE_STR,
subservice
);
} }
PusPacketHandlerResult::CustomSubservice(invalid, _) => { DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 5 invalid subservice {invalid}"); log::warn!(
return HandlingStatus::HandledOne; "PUS {}({}) subservice {} not implemented",
} Self::SERVICE_ID,
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { Self::SERVICE_STR,
warn!("PUS 5 subservice {subservice} not implemented"); subservice
return HandlingStatus::HandledOne; );
}
PusPacketHandlerResult::Empty => (),
},
Err(error) => {
error!("PUS packet handling error: {error:?}");
} }
} }
// To avoid permanent loops, treat queue empty by default (all tasks done). HandlingStatus::HandledOne
HandlingStatus::Empty
} }
} }

View File

@ -1,5 +1,4 @@
use derive_new::new; use derive_new::new;
use log::{error, warn};
use ops_sat_rs::config::components::PUS_HK_SERVICE; use ops_sat_rs::config::components::PUS_HK_SERVICE;
use ops_sat_rs::config::{hk_err, tmtc_err}; use ops_sat_rs::config::{hk_err, tmtc_err};
use satrs::hk::{CollectionIntervalFactor, HkRequest, HkRequestVariant, UniqueId}; use satrs::hk::{CollectionIntervalFactor, HkRequest, HkRequestVariant, UniqueId};
@ -10,7 +9,7 @@ use satrs::pus::verification::{
use satrs::pus::{ use satrs::pus::{
ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken, ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken,
EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError,
PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter, PusPacketHandlingError, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter,
}; };
use satrs::request::{GenericMessage, UniqueApidTargetId}; use satrs::request::{GenericMessage, UniqueApidTargetId};
use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::tc::PusTcReader;
@ -22,7 +21,7 @@ use std::time::Duration;
use crate::pus::{create_verification_reporter, generic_pus_request_timeout_handler}; use crate::pus::{create_verification_reporter, generic_pus_request_timeout_handler};
use crate::requests::GenericRequestRouter; use crate::requests::GenericRequestRouter;
use super::{HandlingStatus, PusTargetedRequestService}; use super::{HandlingStatus, PusTargetedRequestService, TargetedPusService};
#[derive(Clone, PartialEq, Debug, new)] #[derive(Clone, PartialEq, Debug, new)]
pub struct HkReply { pub struct HkReply {
@ -267,47 +266,25 @@ pub struct HkServiceWrapper {
>, >,
} }
impl HkServiceWrapper { impl TargetedPusService for HkServiceWrapper {
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { const SERVICE_ID: u8 = 3;
match self.service.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 3 partial packet handling success: {e:?}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 3 invalid subservice {invalid}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 3 subservice {subservice} not implemented");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::Empty => (),
},
Err(error) => {
error!("PUS packet handling error: {error:?}");
}
}
// To avoid permanent loops, treat queue empty by default (all tasks done).
HandlingStatus::Empty
}
pub fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus { const SERVICE_STR: &'static str = "housekeeping";
// This only fails if all senders disconnected. Treat it like an empty queue.
self.service
.poll_and_check_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!("PUS 3: Handling reply failed with error {e:?}");
HandlingStatus::Empty
})
}
pub fn check_for_request_timeouts(&mut self) { delegate::delegate! {
self.service.check_for_request_timeouts(); to self.service {
fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, PusPacketHandlingError>;
fn poll_and_handle_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError>;
fn check_for_request_timeouts(&mut self);
}
} }
} }

View File

@ -10,7 +10,7 @@ use crate::requests::GenericRequestRouter;
use log::warn; use log::warn;
use ops_sat_rs::config::components::PUS_ROUTING_SERVICE; use ops_sat_rs::config::components::PUS_ROUTING_SERVICE;
use ops_sat_rs::config::{tmtc_err, CustomPusServiceId}; use ops_sat_rs::config::{tmtc_err, CustomPusServiceId};
use ops_sat_rs::{HandlingStatus, TimeStampHelper}; use ops_sat_rs::TimeStampHelper;
use satrs::pus::verification::{ use satrs::pus::verification::{
self, FailParams, TcStateAccepted, TcStateStarted, VerificationReporter, self, FailParams, TcStateAccepted, TcStateStarted, VerificationReporter,
VerificationReporterCfg, VerificationReportingProvider, VerificationToken, VerificationReporterCfg, VerificationReportingProvider, VerificationToken,
@ -18,8 +18,8 @@ use satrs::pus::verification::{
use satrs::pus::{ use satrs::pus::{
ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter,
EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError, EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError,
MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusPacketHandlingError, HandlingStatus, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlingError, PusReplyHandler,
PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory,
}; };
use satrs::queue::{GenericReceiveError, GenericSendError}; use satrs::queue::{GenericReceiveError, GenericSendError};
use satrs::request::{Apid, GenericMessage, MessageMetadata}; use satrs::request::{Apid, GenericMessage, MessageMetadata};
@ -78,7 +78,7 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
&mut self, &mut self,
sender_id: ComponentId, sender_id: ComponentId,
tc: Vec<u8>, tc: Vec<u8>,
) -> Result<PusPacketHandlerResult, GenericSendError> { ) -> Result<HandlingStatus, GenericSendError> {
let pus_tc_result = PusTcReader::new(&tc); let pus_tc_result = PusTcReader::new(&tc);
if pus_tc_result.is_err() { if pus_tc_result.is_err() {
log::warn!( log::warn!(
@ -87,7 +87,7 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
pus_tc_result.unwrap_err() pus_tc_result.unwrap_err()
); );
log::warn!("raw data: {:x?}", tc); log::warn!("raw data: {:x?}", tc);
return Ok(PusPacketHandlerResult::RequestHandled); return Ok(HandlingStatus::HandledOne);
} }
let pus_tc = pus_tc_result.unwrap().0; let pus_tc = pus_tc_result.unwrap().0;
let init_token = self.verif_reporter.add_tc(&pus_tc); let init_token = self.verif_reporter.add_tc(&pus_tc);
@ -159,17 +159,65 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
} }
} }
} }
Ok(PusPacketHandlerResult::RequestHandled) Ok(HandlingStatus::HandledOne)
} }
} }
pub trait TargetedPusService { pub trait TargetedPusService {
/// Returns [true] interface the packet handling is finished. const SERVICE_ID: u8;
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus; const SERVICE_STR: &'static str;
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus;
fn poll_and_handle_next_tc_default_handler(&mut self, time_stamp: &[u8]) -> HandlingStatus {
let result = self.poll_and_handle_next_tc(time_stamp);
if let Err(e) = result {
log::error!(
"PUS service {}({})packet handling error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
// To avoid permanent loops on error cases.
return HandlingStatus::Empty;
}
result.unwrap()
}
fn poll_and_handle_next_reply_default_handler(&mut self, time_stamp: &[u8]) -> HandlingStatus {
// This only fails if all senders disconnected. Treat it like an empty queue.
self.poll_and_handle_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!(
"PUS servce {}({}): Handling reply failed with error {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
HandlingStatus::Empty
})
}
fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, PusPacketHandlingError>;
fn poll_and_handle_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError>;
fn check_for_request_timeouts(&mut self); fn check_for_request_timeouts(&mut self);
} }
/// Generic trait for services which handle packets directly. Kept minimal right now because
/// of the difficulty to allow flexible user code for these services..
pub trait DirectPusService {
const SERVICE_ID: u8;
const SERVICE_STR: &'static str;
fn poll_and_handle_next_tc(&mut self, timestamp: &[u8]) -> HandlingStatus;
}
/// This is a generic handlers class for all PUS services where a PUS telecommand is converted /// This is a generic handlers class for all PUS services where a PUS telecommand is converted
/// to a targeted request. /// to a targeted request.
/// ///
@ -262,10 +310,10 @@ where
pub fn poll_and_handle_next_tc( pub fn poll_and_handle_next_tc(
&mut self, &mut self,
time_stamp: &[u8], time_stamp: &[u8],
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> { ) -> Result<HandlingStatus, PusPacketHandlingError> {
let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?; let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?;
if possible_packet.is_none() { if possible_packet.is_none() {
return Ok(PusPacketHandlerResult::Empty); return Ok(HandlingStatus::Empty);
} }
let ecss_tc_and_token = possible_packet.unwrap(); let ecss_tc_and_token = possible_packet.unwrap();
self.service_helper self.service_helper
@ -321,7 +369,7 @@ where
return Err(e.into()); return Err(e.into());
} }
} }
Ok(PusPacketHandlerResult::RequestHandled) Ok(HandlingStatus::HandledOne)
} }
fn handle_conversion_to_request_error( fn handle_conversion_to_request_error(
@ -374,7 +422,7 @@ where
} }
} }
pub fn poll_and_check_next_reply( pub fn poll_and_handle_next_reply(
&mut self, &mut self,
time_stamp: &[u8], time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError> { ) -> Result<HandlingStatus, EcssTmtcError> {

View File

@ -1,15 +1,14 @@
use derive_new::new; use derive_new::new;
use log::{error, warn};
use satrs::tmtc::PacketAsVec; use satrs::tmtc::PacketAsVec;
use std::sync::mpsc; use std::sync::mpsc;
use std::time::Duration; use std::time::Duration;
use crate::requests::GenericRequestRouter; use crate::requests::GenericRequestRouter;
use ops_sat_rs::config::components::PUS_MODE_SERVICE; use ops_sat_rs::config::components::PUS_MODE_SERVICE;
use ops_sat_rs::config::{mode_err, tmtc_err}; use ops_sat_rs::config::{mode_err, tmtc_err, CustomPusServiceId};
use satrs::pus::verification::VerificationReporter; use satrs::pus::verification::VerificationReporter;
use satrs::pus::{ use satrs::pus::{
DefaultActiveRequestMap, EcssTcAndToken, EcssTcInVecConverter, PusPacketHandlerResult, DefaultActiveRequestMap, EcssTcAndToken, EcssTcInVecConverter, PusPacketHandlingError,
PusServiceHelper, PusServiceHelper,
}; };
use satrs::request::GenericMessage; use satrs::request::GenericMessage;
@ -239,48 +238,27 @@ pub struct ModeServiceWrapper {
} }
impl TargetedPusService for ModeServiceWrapper { impl TargetedPusService for ModeServiceWrapper {
/// Returns [true] if the packet handling is finished. const SERVICE_ID: u8 = CustomPusServiceId::Mode as u8;
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self.service.poll_and_handle_next_tc(time_stamp) { const SERVICE_STR: &'static str = "mode";
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => { delegate::delegate! {
return HandlingStatus::HandledOne; to self.service {
} fn poll_and_handle_next_tc(
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { &mut self,
warn!("PUS mode service: partial packet handling success: {e:?}"); time_stamp: &[u8],
return HandlingStatus::HandledOne; ) -> Result<HandlingStatus, PusPacketHandlingError>;
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => { fn poll_and_handle_next_reply(
warn!("PUS mode service: invalid subservice {invalid}"); &mut self,
return HandlingStatus::HandledOne; time_stamp: &[u8],
} ) -> Result<HandlingStatus, EcssTmtcError>;
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS mode service: {subservice} not implemented"); fn check_for_request_timeouts(&mut self);
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::Empty => (),
},
Err(error) => {
error!("PUS mode service: packet handling error: {error:?}");
} }
} }
// To avoid permanent loops, treat queue empty by default (all tasks done).
HandlingStatus::Empty
} }
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
self.service
.poll_and_check_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!("PUS action service: Handling reply failed with error {e:?}");
HandlingStatus::HandledOne
})
}
fn check_for_request_timeouts(&mut self) {
self.service.check_for_request_timeouts();
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use ops_sat_rs::config::tmtc_err; use ops_sat_rs::config::tmtc_err;

View File

@ -2,65 +2,20 @@ use std::sync::mpsc;
use std::time::Duration; use std::time::Duration;
use crate::pus::create_verification_reporter; use crate::pus::create_verification_reporter;
use log::{error, info, warn}; use log::info;
use ops_sat_rs::config::components::PUS_SCHEDULER_SERVICE; use ops_sat_rs::config::components::PUS_SCHEDULER_SERVICE;
use satrs::pool::{PoolProvider, StaticMemoryPool}; use satrs::pool::StaticMemoryPool;
use satrs::pus::scheduler::{PusScheduler, TcInfo}; use satrs::pus::scheduler::{PusScheduler, TcInfo};
use satrs::pus::scheduler_srv::PusSchedServiceHandler; use satrs::pus::scheduler_srv::PusSchedServiceHandler;
use satrs::pus::verification::VerificationReporter; use satrs::pus::verification::VerificationReporter;
use satrs::pus::{ use satrs::pus::{
EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper, DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, HandlingStatus,
MpscTcReceiver, PartialPusHandlingError, PusServiceHelper,
}; };
use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool}; use satrs::spacepackets::ecss::PusServiceId;
use satrs::ComponentId; use satrs::tmtc::PacketAsVec;
use super::HandlingStatus; use super::DirectPusService;
pub trait TcReleaser {
fn release(&mut self, sender_id: ComponentId, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool;
}
impl TcReleaser for PacketSenderWithSharedPool {
fn release(
&mut self,
sender_id: ComponentId,
enabled: bool,
_info: &TcInfo,
tc: &[u8],
) -> bool {
if enabled {
let shared_pool = self.shared_pool.get_mut();
// Transfer TC from scheduler TC pool to shared TC pool.
let released_tc_addr = shared_pool
.0
.write()
.expect("locking pool failed")
.add(tc)
.expect("adding TC to shared pool failed");
self.sender
.send(PacketInPool::new(sender_id, released_tc_addr))
.expect("sending TC to TC source failed");
}
true
}
}
impl TcReleaser for mpsc::Sender<PacketAsVec> {
fn release(
&mut self,
sender_id: ComponentId,
enabled: bool,
_info: &TcInfo,
tc: &[u8],
) -> bool {
if enabled {
// Send released TC to centralized TC source.
self.send(PacketAsVec::new(sender_id, tc.to_vec()))
.expect("sending TC to TC source failed");
}
true
}
}
pub struct SchedulingService { pub struct SchedulingService {
pub pus_11_handler: PusSchedServiceHandler< pub pus_11_handler: PusSchedServiceHandler<
@ -72,14 +27,72 @@ pub struct SchedulingService {
>, >,
pub sched_tc_pool: StaticMemoryPool, pub sched_tc_pool: StaticMemoryPool,
pub releaser_buf: [u8; 4096], pub releaser_buf: [u8; 4096],
pub tc_releaser: Box<dyn TcReleaser + Send>, pub tc_releaser: mpsc::Sender<PacketAsVec>,
}
impl DirectPusService for SchedulingService {
const SERVICE_ID: u8 = PusServiceId::Verification as u8;
const SERVICE_STR: &'static str = "verification";
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
let error_handler = |partial_error: &PartialPusHandlingError| {
log::warn!(
"PUS {}({}) partial error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
partial_error
);
};
let result = self.pus_11_handler.poll_and_handle_next_tc(
error_handler,
time_stamp,
&mut self.sched_tc_pool,
);
if let Err(e) = result {
log::warn!(
"PUS {}({}) error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
return HandlingStatus::HandledOne;
}
match result.unwrap() {
DirectPusPacketHandlerResult::Handled(handling_status) => return handling_status,
DirectPusPacketHandlerResult::CustomSubservice(subservice, _) => {
log::warn!(
"PUS {}({}) subservice {} not implemented",
Self::SERVICE_ID,
Self::SERVICE_STR,
subservice
);
}
DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
log::warn!(
"PUS {}({}) subservice {} not implemented",
Self::SERVICE_ID,
Self::SERVICE_STR,
subservice
);
}
}
HandlingStatus::HandledOne
}
} }
impl SchedulingService { impl SchedulingService {
pub fn release_tcs(&mut self) { pub fn release_tcs(&mut self) {
let id = self.pus_11_handler.service_helper.id(); let id = self.pus_11_handler.service_helper.id();
let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool { let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool {
self.tc_releaser.release(id, enabled, info, tc) if enabled {
// Send released TC to centralized TC source.
self.tc_releaser
.send(PacketAsVec::new(id, tc.to_vec()))
.expect("sending TC to TC source failed");
}
true
}; };
self.pus_11_handler self.pus_11_handler
@ -99,37 +112,6 @@ impl SchedulingService {
info!("{released_tcs} TC(s) released from scheduler"); info!("{released_tcs} TC(s) released from scheduler");
} }
} }
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self
.pus_11_handler
.poll_and_handle_next_tc(time_stamp, &mut self.sched_tc_pool)
{
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS11 partial packet handling success: {e:?}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS11 invalid subservice {invalid}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS11: Subservice {subservice} not implemented");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::Empty => (),
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
}
}
// To avoid permanent loops, treat queue empty by default (all tasks done).
HandlingStatus::Empty
}
} }
pub fn create_scheduler_service( pub fn create_scheduler_service(
@ -158,6 +140,6 @@ pub fn create_scheduler_service(
pus_11_handler, pus_11_handler,
sched_tc_pool, sched_tc_pool,
releaser_buf: [0; 4096], releaser_buf: [0; 4096],
tc_releaser: Box::new(tc_source_sender), tc_releaser: tc_source_sender,
} }
} }

View File

@ -5,7 +5,7 @@ use satrs::spacepackets::time::{cds, TimeWriter};
use super::{ use super::{
action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper, action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper,
mode::ModeServiceWrapper, scheduler::SchedulingService, TargetedPusService, mode::ModeServiceWrapper, scheduler::SchedulingService, DirectPusService, TargetedPusService,
}; };
#[derive(new)] #[derive(new)]
@ -23,59 +23,28 @@ impl PusStack {
// Release all telecommands which reached their release time before calling the service // Release all telecommands which reached their release time before calling the service
// handlers. // handlers.
self.schedule_srv.release_tcs(); self.schedule_srv.release_tcs();
let time_stamp = cds::CdsTime::now_with_u16_days() let timestamp = cds::CdsTime::now_with_u16_days()
.expect("time stamp generation error") .expect("time stamp generation error")
.to_vec() .to_vec()
.unwrap(); .unwrap();
let mut loop_count = 0; let mut loop_count = 0_u32;
// Hot loop which will run continuously until all request and reply handling is done.
loop { loop {
let mut nothing_to_do = true; let mut nothing_to_do = true;
let mut is_srv_finished = Self::direct_service_checker(&mut self.test_srv, &timestamp, &mut nothing_to_do);
|_srv_id: u8, Self::direct_service_checker(&mut self.schedule_srv, &timestamp, &mut nothing_to_do);
tc_handling_status: HandlingStatus, Self::direct_service_checker(&mut self.event_srv, &timestamp, &mut nothing_to_do);
reply_handling_status: Option<HandlingStatus>| { Self::targeted_service_checker(
if tc_handling_status == HandlingStatus::HandledOne &mut self.action_srv_wrapper,
|| (reply_handling_status.is_some() &timestamp,
&& reply_handling_status.unwrap() == HandlingStatus::HandledOne) &mut nothing_to_do,
{
nothing_to_do = false;
}
};
is_srv_finished(
17,
self.test_srv.poll_and_handle_next_packet(&time_stamp),
None,
); );
is_srv_finished( Self::targeted_service_checker(
11, &mut self.hk_srv_wrapper,
self.schedule_srv.poll_and_handle_next_tc(&time_stamp), &timestamp,
None, &mut nothing_to_do,
); );
is_srv_finished(5, self.event_srv.poll_and_handle_next_tc(&time_stamp), None); Self::targeted_service_checker(&mut self.mode_srv, &timestamp, &mut nothing_to_do);
is_srv_finished(
8,
self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
Some(
self.action_srv_wrapper
.poll_and_handle_next_reply(&time_stamp),
),
);
is_srv_finished(
3,
self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)),
);
is_srv_finished(
200,
self.mode_srv.poll_and_handle_next_tc(&time_stamp),
Some(self.mode_srv.poll_and_handle_next_reply(&time_stamp)),
);
// Safety mechanism to avoid infinite loops.
loop_count += 1;
if loop_count >= 500 {
log::warn!("reached PUS stack loop count 500, breaking");
break;
}
if nothing_to_do { if nothing_to_do {
// Timeout checking is only done once. // Timeout checking is only done once.
self.action_srv_wrapper.check_for_request_timeouts(); self.action_srv_wrapper.check_for_request_timeouts();
@ -83,6 +52,37 @@ impl PusStack {
self.mode_srv.check_for_request_timeouts(); self.mode_srv.check_for_request_timeouts();
break; break;
} }
// Safety mechanism to avoid infinite loops.
loop_count += 1;
if loop_count >= 500 {
log::warn!("reached PUS stack loop count 500, breaking");
break;
}
}
}
pub fn direct_service_checker<S: DirectPusService>(
service: &mut S,
timestamp: &[u8],
nothing_to_do: &mut bool,
) {
let handling_status = service.poll_and_handle_next_tc(timestamp);
if handling_status == HandlingStatus::HandledOne {
*nothing_to_do = false;
}
}
pub fn targeted_service_checker<S: TargetedPusService>(
service: &mut S,
timestamp: &[u8],
nothing_to_do: &mut bool,
) {
let request_handling = service.poll_and_handle_next_tc_default_handler(timestamp);
let reply_handling = service.poll_and_handle_next_reply_default_handler(timestamp);
if request_handling == HandlingStatus::HandledOne
|| reply_handling == HandlingStatus::HandledOne
{
*nothing_to_do = false;
} }
} }
} }

View File

@ -1,22 +1,20 @@
use crate::pus::create_verification_reporter; use crate::pus::create_verification_reporter;
use log::{info, warn}; use log::info;
use ops_sat_rs::config::components::PUS_TEST_SERVICE; use ops_sat_rs::config::components::PUS_TEST_SERVICE;
use ops_sat_rs::config::{tmtc_err, TEST_EVENT}; use ops_sat_rs::config::{tmtc_err, TEST_EVENT};
use satrs::event_man::{EventMessage, EventMessageU32}; use satrs::event_man::{EventMessage, EventMessageU32};
use satrs::pus::test::PusService17TestHandler; use satrs::pus::test::PusService17TestHandler;
use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider};
use satrs::pus::{ use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, HandlingStatus,
PusPacketHandlerResult, PusServiceHelper, MpscTcReceiver, MpscTmAsVecSender, PartialPusHandlingError, PusServiceHelper,
}; };
use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::queue::GenericSendError;
use satrs::spacepackets::ecss::PusPacket; use satrs::spacepackets::ecss::PusServiceId;
use satrs::spacepackets::time::cds::CdsTime;
use satrs::spacepackets::time::TimeWriter;
use satrs::tmtc::PacketAsVec; use satrs::tmtc::PacketAsVec;
use std::sync::mpsc; use std::sync::mpsc;
use super::HandlingStatus; use super::DirectPusService;
pub fn create_test_service( pub fn create_test_service(
tm_funnel_tx: mpsc::Sender<PacketAsVec>, tm_funnel_tx: mpsc::Sender<PacketAsVec>,
@ -46,64 +44,84 @@ pub struct TestCustomServiceWrapper {
pub event_tx: mpsc::SyncSender<EventMessageU32>, pub event_tx: mpsc::SyncSender<EventMessageU32>,
} }
impl TestCustomServiceWrapper { impl DirectPusService for TestCustomServiceWrapper {
pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> HandlingStatus { const SERVICE_ID: u8 = PusServiceId::Test as u8;
let res = self.handler.poll_and_handle_next_tc(time_stamp); const SERVICE_STR: &'static str = "test";
if res.is_err() {
warn!("PUS17 handlers failed with error {:?}", res.unwrap_err()); fn poll_and_handle_next_tc(&mut self, timestamp: &[u8]) -> HandlingStatus {
return HandlingStatus::Empty; let error_handler = |partial_error: &PartialPusHandlingError| {
} log::warn!(
match res.unwrap() { "PUS {}({}) partial error: {:?}",
PusPacketHandlerResult::RequestHandled => { Self::SERVICE_ID,
info!("Received PUS ping command TC[17,1]"); Self::SERVICE_STR,
info!("Sent ping reply PUS TM[17,2]"); partial_error
return HandlingStatus::HandledOne; );
} };
PusPacketHandlerResult::RequestHandledPartialSuccess(partial_err) => { let res = self
warn!( .handler
"Handled PUS ping command with partial success: {:?}", .poll_and_handle_next_tc(error_handler, timestamp);
partial_err if let Err(e) = res {
log::warn!(
"PUS {}({}) error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
); );
return HandlingStatus::HandledOne; return HandlingStatus::HandledOne;
} }
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { match res.unwrap() {
warn!("PUS17: Subservice {subservice} not implemented"); DirectPusPacketHandlerResult::Handled(handling_status) => {
return HandlingStatus::HandledOne; if handling_status == HandlingStatus::HandledOne {
info!("Received PUS ping command TC[17,1]");
info!("Sent ping reply PUS TM[17,2]");
} }
// TODO: adapt interface events are implemented return handling_status;
PusPacketHandlerResult::CustomSubservice(subservice, token) => { }
let (tc, _) = PusTcReader::new( DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
self.handler log::warn!(
.service_helper "PUS {}({}) subservice {} not implemented",
.tc_in_mem_converter Self::SERVICE_ID,
.tc_slice_raw(), Self::SERVICE_STR,
) subservice
.unwrap(); );
let time_stamper = CdsTime::now_with_u16_days().unwrap(); }
let mut stamp_buf: [u8; 7] = [0; 7]; DirectPusPacketHandlerResult::CustomSubservice(subservice, token) => {
time_stamper.write_to_bytes(&mut stamp_buf).unwrap();
if subservice == 128 { if subservice == 128 {
info!("Generating test event"); info!("generating test event");
self.event_tx if let Err(e) = self
.event_tx
.send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into())) .send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into()))
.expect("Sending test event failed"); .map_err(|_| GenericSendError::RxDisconnected)
let start_token = self {
// This really should not happen but I want to avoid panicking..
log::warn!("failed to send test event: {:?}", e);
}
match self.handler.service_helper.verif_reporter().start_success(
self.handler.service_helper.tm_sender(),
token,
timestamp,
) {
Ok(started_token) => {
if let Err(e) = self
.handler .handler
.service_helper
.verif_reporter()
.start_success(self.handler.service_helper.tm_sender(), token, &stamp_buf)
.expect("Error sending start success");
self.handler
.service_helper .service_helper
.verif_reporter() .verif_reporter()
.completion_success( .completion_success(
self.handler.service_helper.tm_sender(), self.handler.service_helper.tm_sender(),
start_token, started_token,
&stamp_buf, timestamp,
) )
.expect("Error sending completion success"); {
error_handler(&PartialPusHandlingError::Verification(e));
}
}
Err(e) => {
error_handler(&PartialPusHandlingError::Verification(e));
}
}
} else { } else {
let fail_data = [tc.subservice()]; let fail_data = [subservice];
self.handler self.handler
.service_helper .service_helper
.verif_reporter() .verif_reporter()
@ -111,18 +129,15 @@ impl TestCustomServiceWrapper {
self.handler.service_helper.tm_sender(), self.handler.service_helper.tm_sender(),
token, token,
FailParams::new( FailParams::new(
&stamp_buf, timestamp,
&tmtc_err::INVALID_PUS_SUBSERVICE, &tmtc_err::INVALID_PUS_SUBSERVICE,
&fail_data, &fail_data,
), ),
) )
.expect("Sending start failure verification failed"); .expect("Sending start failure verification failed");
} }
return HandlingStatus::HandledOne; }
} }
PusPacketHandlerResult::Empty => (), HandlingStatus::HandledOne
}
// To avoid permanent loops, treat queue empty by default (all tasks done).
HandlingStatus::Empty
} }
} }

View File

@ -1,7 +1,9 @@
use std::sync::mpsc::{self, TryRecvError}; use std::sync::mpsc::{self, TryRecvError};
use ops_sat_rs::HandlingStatus; use satrs::{
use satrs::{pus::MpscTmAsVecSender, tmtc::PacketAsVec}; pus::{HandlingStatus, MpscTmAsVecSender},
tmtc::PacketAsVec,
};
use crate::pus::PusTcDistributor; use crate::pus::PusTcDistributor;