bump sat-rs and improve PUS stack #29

Merged
muellerr merged 1 commits from bump-satrs-improve-pus-stack into main 2024-05-02 12:35:37 +02:00
13 changed files with 397 additions and 396 deletions

26
Cargo.lock generated
View File

@ -248,6 +248,17 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "delegate"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e018fccbeeb50ff26562ece792ed06659b9c2dae79ece77c4456bb10d9bf79b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.60",
]
[[package]]
name = "derive-new"
version = "0.6.0"
@ -631,6 +642,7 @@ name = "ops-sat-rs"
version = "0.1.1"
dependencies = [
"chrono",
"delegate 0.12.0",
"derive-new",
"env_logger",
"fern",
@ -776,13 +788,13 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
[[package]]
name = "satrs"
version = "0.2.0-rc.5"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#29f71c2a571e7492cf5d997d4c11c3f844de83bc"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#9ffe4d0ae02064444da606b2cd1a3c0dd6858fbb"
dependencies = [
"bus",
"cobs",
"crc",
"crossbeam-channel",
"delegate",
"delegate 0.10.0",
"derive-new",
"downcast-rs",
"dyn-clone",
@ -835,9 +847,9 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.199"
version = "1.0.200"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c9f6e76df036c77cd94996771fb40db98187f096dd0b9af39c6c6e452ba966a"
checksum = "ddc6f9cc94d67c0e21aaf7eda3a010fd3af78ebf6e096aa6e2e13c79749cce4f"
dependencies = [
"serde_derive",
]
@ -855,9 +867,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.199"
version = "1.0.200"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11bd257a6541e141e42ca6d24ae26f7714887b47e89aa739099104c7e4d3b7fc"
checksum = "856f046b9400cee3c8c94ed572ecdb752444c24528c035cd35882aad6f492bcb"
dependencies = [
"proc-macro2",
"quote",
@ -926,7 +938,7 @@ checksum = "fa9f4d7df5fa3bc25ecfc95f1f612fc3d16c566df538d3d3c82db0e523096216"
dependencies = [
"chrono",
"crc",
"delegate",
"delegate 0.10.0",
"num-traits",
"num_enum",
"serde",

View File

@ -10,6 +10,7 @@ fern = "0.6"
toml = "0.8"
chrono = "0.4"
log = "0.4"
delegate = "0.12"
lazy_static = "1"
humantime = "2"
strum = { version = "0.26", features = ["derive"] }

View File

@ -2,8 +2,8 @@ use std::net::{SocketAddr, UdpSocket};
use std::sync::mpsc;
use log::{info, warn};
use ops_sat_rs::HandlingStatus;
use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer};
use satrs::pus::HandlingStatus;
use satrs::queue::GenericSendError;
use satrs::tmtc::PacketAsVec;
@ -125,7 +125,7 @@ mod tests {
let (tx, rx) = mpsc::channel();
let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap();
let tm_handler = TestTmHandler::default();
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
let _tm_handler_calls = tm_handler.addrs_to_send_to.clone();
let mut udp_dyn_server = UdpTmtcServer {
udp_tc_server,
tm_handler,

View File

@ -3,12 +3,6 @@ use satrs::spacepackets::time::TimeWriter;
pub mod config;
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum HandlingStatus {
Empty,
HandledOne,
}
#[derive(Debug)]
pub struct TimeStampHelper {
stamper: CdsTime,

View File

@ -1,4 +1,4 @@
use log::{error, warn};
use log::warn;
use ops_sat_rs::config::components::PUS_ACTION_SERVICE;
use ops_sat_rs::config::tmtc_err;
use ops_sat_rs::TimeStampHelper;
@ -13,13 +13,13 @@ use satrs::pus::verification::{
};
use satrs::pus::{
ActiveRequestProvider, EcssTcAndToken, EcssTcInVecConverter, EcssTmSender, EcssTmtcError,
GenericConversionError, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper,
PusTcToRequestConverter, PusTmVariant,
GenericConversionError, HandlingStatus, PusPacketHandlingError, PusReplyHandler,
PusServiceHelper, PusTcToRequestConverter, PusTmVariant,
};
use satrs::request::{GenericMessage, UniqueApidTargetId};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader};
use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket};
use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket, PusServiceId};
use satrs::spacepackets::SpHeader;
use satrs::tmtc::PacketAsVec;
use std::sync::mpsc;
@ -28,8 +28,8 @@ use std::time::Duration;
use crate::requests::GenericRequestRouter;
use super::{
create_verification_reporter, generic_pus_request_timeout_handler, HandlingStatus,
PusTargetedRequestService, TargetedPusService,
create_verification_reporter, generic_pus_request_timeout_handler, PusTargetedRequestService,
TargetedPusService,
};
pub const DATA_REPLY: u8 = 130;
@ -248,47 +248,23 @@ pub struct ActionServiceWrapper {
}
impl TargetedPusService for ActionServiceWrapper {
/// Returns [true] if the packet handling is finished.
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self.service.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 8 partial packet handling success: {e:?}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 8 invalid subservice {invalid}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 8 subservice {subservice} not implemented");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::Empty => (),
},
Err(error) => {
error!("PUS packet handling error: {error:?}");
}
const SERVICE_ID: u8 = PusServiceId::Action as u8;
const SERVICE_STR: &'static str = "action";
delegate::delegate! {
to self.service {
fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, PusPacketHandlingError>;
fn poll_and_handle_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError>;
fn check_for_request_timeouts(&mut self);
}
// To avoid permanent loops, treat queue empty by default (all tasks done).
HandlingStatus::Empty
}
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
// This only fails if all senders disconnected. Treat it like an empty queue.
self.service
.poll_and_check_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!("PUS 8: Handling reply failed with error {e:?}");
HandlingStatus::Empty
})
}
fn check_for_request_timeouts(&mut self) {
self.service.check_for_request_timeouts();
}
}
@ -317,8 +293,8 @@ mod tests {
use satrs::pus::test_util::{
TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1,
};
use satrs::pus::verification;
use satrs::pus::verification::test_util::TestVerificationReporter;
use satrs::pus::{verification, TcInMemory};
use satrs::request::MessageMetadata;
use satrs::ComponentId;
use satrs::{
@ -423,7 +399,7 @@ mod tests {
}
let result = result.unwrap();
match result {
PusPacketHandlerResult::RequestHandled => (),
HandlingStatus::HandledOne => (),
_ => panic!("unexpected result {result:?}"),
}
}
@ -435,19 +411,19 @@ mod tests {
}
let result = result.unwrap();
match result {
PusPacketHandlerResult::Empty => (),
HandlingStatus::Empty => (),
_ => panic!("unexpected result {result:?}"),
}
}
pub fn verify_next_reply_is_handled_properly(&mut self, time_stamp: &[u8]) {
let result = self.service.poll_and_check_next_reply(time_stamp);
let result = self.service.poll_and_handle_next_reply(time_stamp);
assert!(result.is_ok());
assert_eq!(result.unwrap(), HandlingStatus::HandledOne);
}
pub fn verify_all_replies_handled(&mut self, time_stamp: &[u8]) {
let result = self.service.poll_and_check_next_reply(time_stamp);
let result = self.service.poll_and_handle_next_reply(time_stamp);
assert!(result.is_ok());
assert_eq!(result.unwrap(), HandlingStatus::Empty);
}
@ -472,11 +448,7 @@ mod tests {
.check_next_is_acceptance_success(id, accepted_token.request_id());
self.pus_packet_tx
.send(EcssTcAndToken::new(
TcInMemory::Vec(PacketAsVec::new(
self.service.service_helper.id(),
//tc.to_vec().unwrap().into(),
tc.to_vec().unwrap(),
)),
PacketAsVec::new(self.service.service_helper.id(), tc.to_vec().unwrap()),
accepted_token,
))
.unwrap();

View File

@ -1,15 +1,16 @@
use std::sync::mpsc;
use super::HandlingStatus;
use super::{DirectPusService, HandlingStatus};
use crate::pus::create_verification_reporter;
use log::{error, warn};
use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT;
use satrs::pus::event_man::EventRequestWithToken;
use satrs::pus::event_srv::PusEventServiceHandler;
use satrs::pus::verification::VerificationReporter;
use satrs::pus::{
EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper,
DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver,
PartialPusHandlingError, PusServiceHelper,
};
use satrs::spacepackets::ecss::PusServiceId;
use satrs::tmtc::PacketAsVec;
pub fn create_event_service(
@ -41,32 +42,51 @@ pub struct EventServiceWrapper {
>,
}
impl EventServiceWrapper {
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self.handler.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 5 partial packet handling success: {e:?}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 5 invalid subservice {invalid}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 5 subservice {subservice} not implemented");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::Empty => (),
},
Err(error) => {
error!("PUS packet handling error: {error:?}");
impl DirectPusService for EventServiceWrapper {
const SERVICE_ID: u8 = PusServiceId::Event as u8;
const SERVICE_STR: &'static str = "events";
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
let error_handler = |partial_error: &PartialPusHandlingError| {
log::warn!(
"PUS {}({}) partial error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
partial_error
);
};
let result = self
.handler
.poll_and_handle_next_tc(error_handler, time_stamp);
if let Err(e) = result {
log::warn!(
"PUS {}({}) error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
return HandlingStatus::HandledOne;
}
match result.unwrap() {
DirectPusPacketHandlerResult::Handled(handling_status) => return handling_status,
DirectPusPacketHandlerResult::CustomSubservice(subservice, _) => {
log::warn!(
"PUS {}({}) subservice {} not implemented",
Self::SERVICE_ID,
Self::SERVICE_STR,
subservice
);
}
DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
log::warn!(
"PUS {}({}) subservice {} not implemented",
Self::SERVICE_ID,
Self::SERVICE_STR,
subservice
);
}
}
// To avoid permanent loops, treat queue empty by default (all tasks done).
HandlingStatus::Empty
HandlingStatus::HandledOne
}
}

View File

@ -1,5 +1,4 @@
use derive_new::new;
use log::{error, warn};
use ops_sat_rs::config::components::PUS_HK_SERVICE;
use ops_sat_rs::config::{hk_err, tmtc_err};
use satrs::hk::{CollectionIntervalFactor, HkRequest, HkRequestVariant, UniqueId};
@ -10,7 +9,7 @@ use satrs::pus::verification::{
use satrs::pus::{
ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken,
EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError,
PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter,
PusPacketHandlingError, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter,
};
use satrs::request::{GenericMessage, UniqueApidTargetId};
use satrs::spacepackets::ecss::tc::PusTcReader;
@ -22,7 +21,7 @@ use std::time::Duration;
use crate::pus::{create_verification_reporter, generic_pus_request_timeout_handler};
use crate::requests::GenericRequestRouter;
use super::{HandlingStatus, PusTargetedRequestService};
use super::{HandlingStatus, PusTargetedRequestService, TargetedPusService};
#[derive(Clone, PartialEq, Debug, new)]
pub struct HkReply {
@ -267,47 +266,25 @@ pub struct HkServiceWrapper {
>,
}
impl HkServiceWrapper {
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self.service.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 3 partial packet handling success: {e:?}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 3 invalid subservice {invalid}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 3 subservice {subservice} not implemented");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::Empty => (),
},
Err(error) => {
error!("PUS packet handling error: {error:?}");
}
impl TargetedPusService for HkServiceWrapper {
const SERVICE_ID: u8 = 3;
const SERVICE_STR: &'static str = "housekeeping";
delegate::delegate! {
to self.service {
fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, PusPacketHandlingError>;
fn poll_and_handle_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError>;
fn check_for_request_timeouts(&mut self);
}
// To avoid permanent loops, treat queue empty by default (all tasks done).
HandlingStatus::Empty
}
pub fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
// This only fails if all senders disconnected. Treat it like an empty queue.
self.service
.poll_and_check_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!("PUS 3: Handling reply failed with error {e:?}");
HandlingStatus::Empty
})
}
pub fn check_for_request_timeouts(&mut self) {
self.service.check_for_request_timeouts();
}
}

View File

@ -10,7 +10,7 @@ use crate::requests::GenericRequestRouter;
use log::warn;
use ops_sat_rs::config::components::PUS_ROUTING_SERVICE;
use ops_sat_rs::config::{tmtc_err, CustomPusServiceId};
use ops_sat_rs::{HandlingStatus, TimeStampHelper};
use ops_sat_rs::TimeStampHelper;
use satrs::pus::verification::{
self, FailParams, TcStateAccepted, TcStateStarted, VerificationReporter,
VerificationReporterCfg, VerificationReportingProvider, VerificationToken,
@ -18,8 +18,8 @@ use satrs::pus::verification::{
use satrs::pus::{
ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter,
EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError,
MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusPacketHandlingError,
PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory,
HandlingStatus, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlingError, PusReplyHandler,
PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory,
};
use satrs::queue::{GenericReceiveError, GenericSendError};
use satrs::request::{Apid, GenericMessage, MessageMetadata};
@ -78,7 +78,7 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
&mut self,
sender_id: ComponentId,
tc: Vec<u8>,
) -> Result<PusPacketHandlerResult, GenericSendError> {
) -> Result<HandlingStatus, GenericSendError> {
let pus_tc_result = PusTcReader::new(&tc);
if pus_tc_result.is_err() {
log::warn!(
@ -87,7 +87,7 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
pus_tc_result.unwrap_err()
);
log::warn!("raw data: {:x?}", tc);
return Ok(PusPacketHandlerResult::RequestHandled);
return Ok(HandlingStatus::HandledOne);
}
let pus_tc = pus_tc_result.unwrap().0;
let init_token = self.verif_reporter.add_tc(&pus_tc);
@ -159,17 +159,65 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
}
}
}
Ok(PusPacketHandlerResult::RequestHandled)
Ok(HandlingStatus::HandledOne)
}
}
pub trait TargetedPusService {
/// Returns [true] interface the packet handling is finished.
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus;
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus;
const SERVICE_ID: u8;
const SERVICE_STR: &'static str;
fn poll_and_handle_next_tc_default_handler(&mut self, time_stamp: &[u8]) -> HandlingStatus {
let result = self.poll_and_handle_next_tc(time_stamp);
if let Err(e) = result {
log::error!(
"PUS service {}({})packet handling error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
// To avoid permanent loops on error cases.
return HandlingStatus::Empty;
}
result.unwrap()
}
fn poll_and_handle_next_reply_default_handler(&mut self, time_stamp: &[u8]) -> HandlingStatus {
// This only fails if all senders disconnected. Treat it like an empty queue.
self.poll_and_handle_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!(
"PUS servce {}({}): Handling reply failed with error {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
HandlingStatus::Empty
})
}
fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, PusPacketHandlingError>;
fn poll_and_handle_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError>;
fn check_for_request_timeouts(&mut self);
}
/// Generic trait for services which handle packets directly. Kept minimal right now because
/// of the difficulty to allow flexible user code for these services..
pub trait DirectPusService {
const SERVICE_ID: u8;
const SERVICE_STR: &'static str;
fn poll_and_handle_next_tc(&mut self, timestamp: &[u8]) -> HandlingStatus;
}
/// This is a generic handlers class for all PUS services where a PUS telecommand is converted
/// to a targeted request.
///
@ -262,10 +310,10 @@ where
pub fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
) -> Result<HandlingStatus, PusPacketHandlingError> {
let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?;
if possible_packet.is_none() {
return Ok(PusPacketHandlerResult::Empty);
return Ok(HandlingStatus::Empty);
}
let ecss_tc_and_token = possible_packet.unwrap();
self.service_helper
@ -321,7 +369,7 @@ where
return Err(e.into());
}
}
Ok(PusPacketHandlerResult::RequestHandled)
Ok(HandlingStatus::HandledOne)
}
fn handle_conversion_to_request_error(
@ -374,7 +422,7 @@ where
}
}
pub fn poll_and_check_next_reply(
pub fn poll_and_handle_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError> {

View File

@ -1,15 +1,14 @@
use derive_new::new;
use log::{error, warn};
use satrs::tmtc::PacketAsVec;
use std::sync::mpsc;
use std::time::Duration;
use crate::requests::GenericRequestRouter;
use ops_sat_rs::config::components::PUS_MODE_SERVICE;
use ops_sat_rs::config::{mode_err, tmtc_err};
use ops_sat_rs::config::{mode_err, tmtc_err, CustomPusServiceId};
use satrs::pus::verification::VerificationReporter;
use satrs::pus::{
DefaultActiveRequestMap, EcssTcAndToken, EcssTcInVecConverter, PusPacketHandlerResult,
DefaultActiveRequestMap, EcssTcAndToken, EcssTcInVecConverter, PusPacketHandlingError,
PusServiceHelper,
};
use satrs::request::GenericMessage;
@ -239,48 +238,27 @@ pub struct ModeServiceWrapper {
}
impl TargetedPusService for ModeServiceWrapper {
/// Returns [true] if the packet handling is finished.
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self.service.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS mode service: partial packet handling success: {e:?}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS mode service: invalid subservice {invalid}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS mode service: {subservice} not implemented");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::Empty => (),
},
Err(error) => {
error!("PUS mode service: packet handling error: {error:?}");
}
const SERVICE_ID: u8 = CustomPusServiceId::Mode as u8;
const SERVICE_STR: &'static str = "mode";
delegate::delegate! {
to self.service {
fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, PusPacketHandlingError>;
fn poll_and_handle_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError>;
fn check_for_request_timeouts(&mut self);
}
// To avoid permanent loops, treat queue empty by default (all tasks done).
HandlingStatus::Empty
}
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
self.service
.poll_and_check_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!("PUS action service: Handling reply failed with error {e:?}");
HandlingStatus::HandledOne
})
}
fn check_for_request_timeouts(&mut self) {
self.service.check_for_request_timeouts();
}
}
#[cfg(test)]
mod tests {
use ops_sat_rs::config::tmtc_err;

View File

@ -2,65 +2,20 @@ use std::sync::mpsc;
use std::time::Duration;
use crate::pus::create_verification_reporter;
use log::{error, info, warn};
use log::info;
use ops_sat_rs::config::components::PUS_SCHEDULER_SERVICE;
use satrs::pool::{PoolProvider, StaticMemoryPool};
use satrs::pool::StaticMemoryPool;
use satrs::pus::scheduler::{PusScheduler, TcInfo};
use satrs::pus::scheduler_srv::PusSchedServiceHandler;
use satrs::pus::verification::VerificationReporter;
use satrs::pus::{
EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper,
DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, HandlingStatus,
MpscTcReceiver, PartialPusHandlingError, PusServiceHelper,
};
use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool};
use satrs::ComponentId;
use satrs::spacepackets::ecss::PusServiceId;
use satrs::tmtc::PacketAsVec;
use super::HandlingStatus;
pub trait TcReleaser {
fn release(&mut self, sender_id: ComponentId, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool;
}
impl TcReleaser for PacketSenderWithSharedPool {
fn release(
&mut self,
sender_id: ComponentId,
enabled: bool,
_info: &TcInfo,
tc: &[u8],
) -> bool {
if enabled {
let shared_pool = self.shared_pool.get_mut();
// Transfer TC from scheduler TC pool to shared TC pool.
let released_tc_addr = shared_pool
.0
.write()
.expect("locking pool failed")
.add(tc)
.expect("adding TC to shared pool failed");
self.sender
.send(PacketInPool::new(sender_id, released_tc_addr))
.expect("sending TC to TC source failed");
}
true
}
}
impl TcReleaser for mpsc::Sender<PacketAsVec> {
fn release(
&mut self,
sender_id: ComponentId,
enabled: bool,
_info: &TcInfo,
tc: &[u8],
) -> bool {
if enabled {
// Send released TC to centralized TC source.
self.send(PacketAsVec::new(sender_id, tc.to_vec()))
.expect("sending TC to TC source failed");
}
true
}
}
use super::DirectPusService;
pub struct SchedulingService {
pub pus_11_handler: PusSchedServiceHandler<
@ -72,14 +27,72 @@ pub struct SchedulingService {
>,
pub sched_tc_pool: StaticMemoryPool,
pub releaser_buf: [u8; 4096],
pub tc_releaser: Box<dyn TcReleaser + Send>,
pub tc_releaser: mpsc::Sender<PacketAsVec>,
}
impl DirectPusService for SchedulingService {
const SERVICE_ID: u8 = PusServiceId::Verification as u8;
const SERVICE_STR: &'static str = "verification";
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
let error_handler = |partial_error: &PartialPusHandlingError| {
log::warn!(
"PUS {}({}) partial error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
partial_error
);
};
let result = self.pus_11_handler.poll_and_handle_next_tc(
error_handler,
time_stamp,
&mut self.sched_tc_pool,
);
if let Err(e) = result {
log::warn!(
"PUS {}({}) error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
return HandlingStatus::HandledOne;
}
match result.unwrap() {
DirectPusPacketHandlerResult::Handled(handling_status) => return handling_status,
DirectPusPacketHandlerResult::CustomSubservice(subservice, _) => {
log::warn!(
"PUS {}({}) subservice {} not implemented",
Self::SERVICE_ID,
Self::SERVICE_STR,
subservice
);
}
DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
log::warn!(
"PUS {}({}) subservice {} not implemented",
Self::SERVICE_ID,
Self::SERVICE_STR,
subservice
);
}
}
HandlingStatus::HandledOne
}
}
impl SchedulingService {
pub fn release_tcs(&mut self) {
let id = self.pus_11_handler.service_helper.id();
let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool {
self.tc_releaser.release(id, enabled, info, tc)
let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool {
if enabled {
// Send released TC to centralized TC source.
self.tc_releaser
.send(PacketAsVec::new(id, tc.to_vec()))
.expect("sending TC to TC source failed");
}
true
};
self.pus_11_handler
@ -99,37 +112,6 @@ impl SchedulingService {
info!("{released_tcs} TC(s) released from scheduler");
}
}
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self
.pus_11_handler
.poll_and_handle_next_tc(time_stamp, &mut self.sched_tc_pool)
{
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS11 partial packet handling success: {e:?}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS11 invalid subservice {invalid}");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS11: Subservice {subservice} not implemented");
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::Empty => (),
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
}
}
// To avoid permanent loops, treat queue empty by default (all tasks done).
HandlingStatus::Empty
}
}
pub fn create_scheduler_service(
@ -158,6 +140,6 @@ pub fn create_scheduler_service(
pus_11_handler,
sched_tc_pool,
releaser_buf: [0; 4096],
tc_releaser: Box::new(tc_source_sender),
tc_releaser: tc_source_sender,
}
}

View File

@ -5,7 +5,7 @@ use satrs::spacepackets::time::{cds, TimeWriter};
use super::{
action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper,
mode::ModeServiceWrapper, scheduler::SchedulingService, TargetedPusService,
mode::ModeServiceWrapper, scheduler::SchedulingService, DirectPusService, TargetedPusService,
};
#[derive(new)]
@ -23,59 +23,28 @@ impl PusStack {
// Release all telecommands which reached their release time before calling the service
// handlers.
self.schedule_srv.release_tcs();
let time_stamp = cds::CdsTime::now_with_u16_days()
let timestamp = cds::CdsTime::now_with_u16_days()
.expect("time stamp generation error")
.to_vec()
.unwrap();
let mut loop_count = 0;
let mut loop_count = 0_u32;
// Hot loop which will run continuously until all request and reply handling is done.
loop {
let mut nothing_to_do = true;
let mut is_srv_finished =
|_srv_id: u8,
tc_handling_status: HandlingStatus,
reply_handling_status: Option<HandlingStatus>| {
if tc_handling_status == HandlingStatus::HandledOne
|| (reply_handling_status.is_some()
&& reply_handling_status.unwrap() == HandlingStatus::HandledOne)
{
nothing_to_do = false;
}
};
is_srv_finished(
17,
self.test_srv.poll_and_handle_next_packet(&time_stamp),
None,
Self::direct_service_checker(&mut self.test_srv, &timestamp, &mut nothing_to_do);
Self::direct_service_checker(&mut self.schedule_srv, &timestamp, &mut nothing_to_do);
Self::direct_service_checker(&mut self.event_srv, &timestamp, &mut nothing_to_do);
Self::targeted_service_checker(
&mut self.action_srv_wrapper,
&timestamp,
&mut nothing_to_do,
);
is_srv_finished(
11,
self.schedule_srv.poll_and_handle_next_tc(&time_stamp),
None,
Self::targeted_service_checker(
&mut self.hk_srv_wrapper,
&timestamp,
&mut nothing_to_do,
);
is_srv_finished(5, self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
is_srv_finished(
8,
self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
Some(
self.action_srv_wrapper
.poll_and_handle_next_reply(&time_stamp),
),
);
is_srv_finished(
3,
self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)),
);
is_srv_finished(
200,
self.mode_srv.poll_and_handle_next_tc(&time_stamp),
Some(self.mode_srv.poll_and_handle_next_reply(&time_stamp)),
);
// Safety mechanism to avoid infinite loops.
loop_count += 1;
if loop_count >= 500 {
log::warn!("reached PUS stack loop count 500, breaking");
break;
}
Self::targeted_service_checker(&mut self.mode_srv, &timestamp, &mut nothing_to_do);
if nothing_to_do {
// Timeout checking is only done once.
self.action_srv_wrapper.check_for_request_timeouts();
@ -83,6 +52,37 @@ impl PusStack {
self.mode_srv.check_for_request_timeouts();
break;
}
// Safety mechanism to avoid infinite loops.
loop_count += 1;
if loop_count >= 500 {
log::warn!("reached PUS stack loop count 500, breaking");
break;
}
}
}
pub fn direct_service_checker<S: DirectPusService>(
service: &mut S,
timestamp: &[u8],
nothing_to_do: &mut bool,
) {
let handling_status = service.poll_and_handle_next_tc(timestamp);
if handling_status == HandlingStatus::HandledOne {
*nothing_to_do = false;
}
}
pub fn targeted_service_checker<S: TargetedPusService>(
service: &mut S,
timestamp: &[u8],
nothing_to_do: &mut bool,
) {
let request_handling = service.poll_and_handle_next_tc_default_handler(timestamp);
let reply_handling = service.poll_and_handle_next_reply_default_handler(timestamp);
if request_handling == HandlingStatus::HandledOne
|| reply_handling == HandlingStatus::HandledOne
{
*nothing_to_do = false;
}
}
}

View File

@ -1,22 +1,20 @@
use crate::pus::create_verification_reporter;
use log::{info, warn};
use log::info;
use ops_sat_rs::config::components::PUS_TEST_SERVICE;
use ops_sat_rs::config::{tmtc_err, TEST_EVENT};
use satrs::event_man::{EventMessage, EventMessageU32};
use satrs::pus::test::PusService17TestHandler;
use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider};
use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender,
PusPacketHandlerResult, PusServiceHelper,
DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, HandlingStatus,
MpscTcReceiver, MpscTmAsVecSender, PartialPusHandlingError, PusServiceHelper,
};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusPacket;
use satrs::spacepackets::time::cds::CdsTime;
use satrs::spacepackets::time::TimeWriter;
use satrs::queue::GenericSendError;
use satrs::spacepackets::ecss::PusServiceId;
use satrs::tmtc::PacketAsVec;
use std::sync::mpsc;
use super::HandlingStatus;
use super::DirectPusService;
pub fn create_test_service(
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
@ -46,64 +44,84 @@ pub struct TestCustomServiceWrapper {
pub event_tx: mpsc::SyncSender<EventMessageU32>,
}
impl TestCustomServiceWrapper {
pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> HandlingStatus {
let res = self.handler.poll_and_handle_next_tc(time_stamp);
if res.is_err() {
warn!("PUS17 handlers failed with error {:?}", res.unwrap_err());
return HandlingStatus::Empty;
impl DirectPusService for TestCustomServiceWrapper {
const SERVICE_ID: u8 = PusServiceId::Test as u8;
const SERVICE_STR: &'static str = "test";
fn poll_and_handle_next_tc(&mut self, timestamp: &[u8]) -> HandlingStatus {
let error_handler = |partial_error: &PartialPusHandlingError| {
log::warn!(
"PUS {}({}) partial error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
partial_error
);
};
let res = self
.handler
.poll_and_handle_next_tc(error_handler, timestamp);
if let Err(e) = res {
log::warn!(
"PUS {}({}) error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
return HandlingStatus::HandledOne;
}
match res.unwrap() {
PusPacketHandlerResult::RequestHandled => {
info!("Received PUS ping command TC[17,1]");
info!("Sent ping reply PUS TM[17,2]");
return HandlingStatus::HandledOne;
DirectPusPacketHandlerResult::Handled(handling_status) => {
if handling_status == HandlingStatus::HandledOne {
info!("Received PUS ping command TC[17,1]");
info!("Sent ping reply PUS TM[17,2]");
}
return handling_status;
}
PusPacketHandlerResult::RequestHandledPartialSuccess(partial_err) => {
warn!(
"Handled PUS ping command with partial success: {:?}",
partial_err
DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
log::warn!(
"PUS {}({}) subservice {} not implemented",
Self::SERVICE_ID,
Self::SERVICE_STR,
subservice
);
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS17: Subservice {subservice} not implemented");
return HandlingStatus::HandledOne;
}
// TODO: adapt interface events are implemented
PusPacketHandlerResult::CustomSubservice(subservice, token) => {
let (tc, _) = PusTcReader::new(
self.handler
.service_helper
.tc_in_mem_converter
.tc_slice_raw(),
)
.unwrap();
let time_stamper = CdsTime::now_with_u16_days().unwrap();
let mut stamp_buf: [u8; 7] = [0; 7];
time_stamper.write_to_bytes(&mut stamp_buf).unwrap();
DirectPusPacketHandlerResult::CustomSubservice(subservice, token) => {
if subservice == 128 {
info!("Generating test event");
self.event_tx
info!("generating test event");
if let Err(e) = self
.event_tx
.send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into()))
.expect("Sending test event failed");
let start_token = self
.handler
.service_helper
.verif_reporter()
.start_success(self.handler.service_helper.tm_sender(), token, &stamp_buf)
.expect("Error sending start success");
self.handler
.service_helper
.verif_reporter()
.completion_success(
self.handler.service_helper.tm_sender(),
start_token,
&stamp_buf,
)
.expect("Error sending completion success");
.map_err(|_| GenericSendError::RxDisconnected)
{
// This really should not happen but I want to avoid panicking..
log::warn!("failed to send test event: {:?}", e);
}
match self.handler.service_helper.verif_reporter().start_success(
self.handler.service_helper.tm_sender(),
token,
timestamp,
) {
Ok(started_token) => {
if let Err(e) = self
.handler
.service_helper
.verif_reporter()
.completion_success(
self.handler.service_helper.tm_sender(),
started_token,
timestamp,
)
{
error_handler(&PartialPusHandlingError::Verification(e));
}
}
Err(e) => {
error_handler(&PartialPusHandlingError::Verification(e));
}
}
} else {
let fail_data = [tc.subservice()];
let fail_data = [subservice];
self.handler
.service_helper
.verif_reporter()
@ -111,18 +129,15 @@ impl TestCustomServiceWrapper {
self.handler.service_helper.tm_sender(),
token,
FailParams::new(
&stamp_buf,
timestamp,
&tmtc_err::INVALID_PUS_SUBSERVICE,
&fail_data,
),
)
.expect("Sending start failure verification failed");
}
return HandlingStatus::HandledOne;
}
PusPacketHandlerResult::Empty => (),
}
// To avoid permanent loops, treat queue empty by default (all tasks done).
HandlingStatus::Empty
HandlingStatus::HandledOne
}
}

View File

@ -1,7 +1,9 @@
use std::sync::mpsc::{self, TryRecvError};
use ops_sat_rs::HandlingStatus;
use satrs::{pus::MpscTmAsVecSender, tmtc::PacketAsVec};
use satrs::{
pus::{HandlingStatus, MpscTmAsVecSender},
tmtc::PacketAsVec,
};
use crate::pus::PusTcDistributor;