Merge pull request 'bump sat-rs and improve PUS stack' (#29) from bump-satrs-improve-pus-stack into main
Reviewed-on: #29
This commit is contained in:
commit
ebb58e4fd4
26
Cargo.lock
generated
26
Cargo.lock
generated
@ -248,6 +248,17 @@ dependencies = [
|
|||||||
"syn 1.0.109",
|
"syn 1.0.109",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "delegate"
|
||||||
|
version = "0.12.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4e018fccbeeb50ff26562ece792ed06659b9c2dae79ece77c4456bb10d9bf79b"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn 2.0.60",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "derive-new"
|
name = "derive-new"
|
||||||
version = "0.6.0"
|
version = "0.6.0"
|
||||||
@ -631,6 +642,7 @@ name = "ops-sat-rs"
|
|||||||
version = "0.1.1"
|
version = "0.1.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
|
"delegate 0.12.0",
|
||||||
"derive-new",
|
"derive-new",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
"fern",
|
"fern",
|
||||||
@ -776,13 +788,13 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "satrs"
|
name = "satrs"
|
||||||
version = "0.2.0-rc.5"
|
version = "0.2.0-rc.5"
|
||||||
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#29f71c2a571e7492cf5d997d4c11c3f844de83bc"
|
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#9ffe4d0ae02064444da606b2cd1a3c0dd6858fbb"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bus",
|
"bus",
|
||||||
"cobs",
|
"cobs",
|
||||||
"crc",
|
"crc",
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
"delegate",
|
"delegate 0.10.0",
|
||||||
"derive-new",
|
"derive-new",
|
||||||
"downcast-rs",
|
"downcast-rs",
|
||||||
"dyn-clone",
|
"dyn-clone",
|
||||||
@ -835,9 +847,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde"
|
name = "serde"
|
||||||
version = "1.0.199"
|
version = "1.0.200"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0c9f6e76df036c77cd94996771fb40db98187f096dd0b9af39c6c6e452ba966a"
|
checksum = "ddc6f9cc94d67c0e21aaf7eda3a010fd3af78ebf6e096aa6e2e13c79749cce4f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
]
|
]
|
||||||
@ -855,9 +867,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_derive"
|
name = "serde_derive"
|
||||||
version = "1.0.199"
|
version = "1.0.200"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "11bd257a6541e141e42ca6d24ae26f7714887b47e89aa739099104c7e4d3b7fc"
|
checksum = "856f046b9400cee3c8c94ed572ecdb752444c24528c035cd35882aad6f492bcb"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
@ -926,7 +938,7 @@ checksum = "fa9f4d7df5fa3bc25ecfc95f1f612fc3d16c566df538d3d3c82db0e523096216"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"crc",
|
"crc",
|
||||||
"delegate",
|
"delegate 0.10.0",
|
||||||
"num-traits",
|
"num-traits",
|
||||||
"num_enum",
|
"num_enum",
|
||||||
"serde",
|
"serde",
|
||||||
|
@ -10,6 +10,7 @@ fern = "0.6"
|
|||||||
toml = "0.8"
|
toml = "0.8"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
delegate = "0.12"
|
||||||
lazy_static = "1"
|
lazy_static = "1"
|
||||||
humantime = "2"
|
humantime = "2"
|
||||||
strum = { version = "0.26", features = ["derive"] }
|
strum = { version = "0.26", features = ["derive"] }
|
||||||
|
@ -2,8 +2,8 @@ use std::net::{SocketAddr, UdpSocket};
|
|||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
|
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
use ops_sat_rs::HandlingStatus;
|
|
||||||
use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer};
|
use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer};
|
||||||
|
use satrs::pus::HandlingStatus;
|
||||||
use satrs::queue::GenericSendError;
|
use satrs::queue::GenericSendError;
|
||||||
use satrs::tmtc::PacketAsVec;
|
use satrs::tmtc::PacketAsVec;
|
||||||
|
|
||||||
@ -125,7 +125,7 @@ mod tests {
|
|||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = mpsc::channel();
|
||||||
let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap();
|
let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap();
|
||||||
let tm_handler = TestTmHandler::default();
|
let tm_handler = TestTmHandler::default();
|
||||||
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
|
let _tm_handler_calls = tm_handler.addrs_to_send_to.clone();
|
||||||
let mut udp_dyn_server = UdpTmtcServer {
|
let mut udp_dyn_server = UdpTmtcServer {
|
||||||
udp_tc_server,
|
udp_tc_server,
|
||||||
tm_handler,
|
tm_handler,
|
||||||
|
@ -3,12 +3,6 @@ use satrs::spacepackets::time::TimeWriter;
|
|||||||
|
|
||||||
pub mod config;
|
pub mod config;
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
|
|
||||||
pub enum HandlingStatus {
|
|
||||||
Empty,
|
|
||||||
HandledOne,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct TimeStampHelper {
|
pub struct TimeStampHelper {
|
||||||
stamper: CdsTime,
|
stamper: CdsTime,
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use log::{error, warn};
|
use log::warn;
|
||||||
use ops_sat_rs::config::components::PUS_ACTION_SERVICE;
|
use ops_sat_rs::config::components::PUS_ACTION_SERVICE;
|
||||||
use ops_sat_rs::config::tmtc_err;
|
use ops_sat_rs::config::tmtc_err;
|
||||||
use ops_sat_rs::TimeStampHelper;
|
use ops_sat_rs::TimeStampHelper;
|
||||||
@ -13,13 +13,13 @@ use satrs::pus::verification::{
|
|||||||
};
|
};
|
||||||
use satrs::pus::{
|
use satrs::pus::{
|
||||||
ActiveRequestProvider, EcssTcAndToken, EcssTcInVecConverter, EcssTmSender, EcssTmtcError,
|
ActiveRequestProvider, EcssTcAndToken, EcssTcInVecConverter, EcssTmSender, EcssTmtcError,
|
||||||
GenericConversionError, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper,
|
GenericConversionError, HandlingStatus, PusPacketHandlingError, PusReplyHandler,
|
||||||
PusTcToRequestConverter, PusTmVariant,
|
PusServiceHelper, PusTcToRequestConverter, PusTmVariant,
|
||||||
};
|
};
|
||||||
use satrs::request::{GenericMessage, UniqueApidTargetId};
|
use satrs::request::{GenericMessage, UniqueApidTargetId};
|
||||||
use satrs::spacepackets::ecss::tc::PusTcReader;
|
use satrs::spacepackets::ecss::tc::PusTcReader;
|
||||||
use satrs::spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader};
|
use satrs::spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader};
|
||||||
use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket};
|
use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket, PusServiceId};
|
||||||
use satrs::spacepackets::SpHeader;
|
use satrs::spacepackets::SpHeader;
|
||||||
use satrs::tmtc::PacketAsVec;
|
use satrs::tmtc::PacketAsVec;
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
@ -28,8 +28,8 @@ use std::time::Duration;
|
|||||||
use crate::requests::GenericRequestRouter;
|
use crate::requests::GenericRequestRouter;
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
create_verification_reporter, generic_pus_request_timeout_handler, HandlingStatus,
|
create_verification_reporter, generic_pus_request_timeout_handler, PusTargetedRequestService,
|
||||||
PusTargetedRequestService, TargetedPusService,
|
TargetedPusService,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub const DATA_REPLY: u8 = 130;
|
pub const DATA_REPLY: u8 = 130;
|
||||||
@ -248,47 +248,23 @@ pub struct ActionServiceWrapper {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TargetedPusService for ActionServiceWrapper {
|
impl TargetedPusService for ActionServiceWrapper {
|
||||||
/// Returns [true] if the packet handling is finished.
|
const SERVICE_ID: u8 = PusServiceId::Action as u8;
|
||||||
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
const SERVICE_STR: &'static str = "action";
|
||||||
match self.service.poll_and_handle_next_tc(time_stamp) {
|
|
||||||
Ok(result) => match result {
|
delegate::delegate! {
|
||||||
PusPacketHandlerResult::RequestHandled => {
|
to self.service {
|
||||||
return HandlingStatus::HandledOne;
|
fn poll_and_handle_next_tc(
|
||||||
}
|
&mut self,
|
||||||
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
|
time_stamp: &[u8],
|
||||||
warn!("PUS 8 partial packet handling success: {e:?}");
|
) -> Result<HandlingStatus, PusPacketHandlingError>;
|
||||||
return HandlingStatus::HandledOne;
|
|
||||||
}
|
fn poll_and_handle_next_reply(
|
||||||
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
|
&mut self,
|
||||||
warn!("PUS 8 invalid subservice {invalid}");
|
time_stamp: &[u8],
|
||||||
return HandlingStatus::HandledOne;
|
) -> Result<HandlingStatus, EcssTmtcError>;
|
||||||
}
|
|
||||||
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
fn check_for_request_timeouts(&mut self);
|
||||||
warn!("PUS 8 subservice {subservice} not implemented");
|
|
||||||
return HandlingStatus::HandledOne;
|
|
||||||
}
|
|
||||||
PusPacketHandlerResult::Empty => (),
|
|
||||||
},
|
|
||||||
Err(error) => {
|
|
||||||
error!("PUS packet handling error: {error:?}");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// To avoid permanent loops, treat queue empty by default (all tasks done).
|
|
||||||
HandlingStatus::Empty
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
|
||||||
// This only fails if all senders disconnected. Treat it like an empty queue.
|
|
||||||
self.service
|
|
||||||
.poll_and_check_next_reply(time_stamp)
|
|
||||||
.unwrap_or_else(|e| {
|
|
||||||
warn!("PUS 8: Handling reply failed with error {e:?}");
|
|
||||||
HandlingStatus::Empty
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn check_for_request_timeouts(&mut self) {
|
|
||||||
self.service.check_for_request_timeouts();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -317,8 +293,8 @@ mod tests {
|
|||||||
use satrs::pus::test_util::{
|
use satrs::pus::test_util::{
|
||||||
TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1,
|
TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1,
|
||||||
};
|
};
|
||||||
|
use satrs::pus::verification;
|
||||||
use satrs::pus::verification::test_util::TestVerificationReporter;
|
use satrs::pus::verification::test_util::TestVerificationReporter;
|
||||||
use satrs::pus::{verification, TcInMemory};
|
|
||||||
use satrs::request::MessageMetadata;
|
use satrs::request::MessageMetadata;
|
||||||
use satrs::ComponentId;
|
use satrs::ComponentId;
|
||||||
use satrs::{
|
use satrs::{
|
||||||
@ -423,7 +399,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
let result = result.unwrap();
|
let result = result.unwrap();
|
||||||
match result {
|
match result {
|
||||||
PusPacketHandlerResult::RequestHandled => (),
|
HandlingStatus::HandledOne => (),
|
||||||
_ => panic!("unexpected result {result:?}"),
|
_ => panic!("unexpected result {result:?}"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -435,19 +411,19 @@ mod tests {
|
|||||||
}
|
}
|
||||||
let result = result.unwrap();
|
let result = result.unwrap();
|
||||||
match result {
|
match result {
|
||||||
PusPacketHandlerResult::Empty => (),
|
HandlingStatus::Empty => (),
|
||||||
_ => panic!("unexpected result {result:?}"),
|
_ => panic!("unexpected result {result:?}"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn verify_next_reply_is_handled_properly(&mut self, time_stamp: &[u8]) {
|
pub fn verify_next_reply_is_handled_properly(&mut self, time_stamp: &[u8]) {
|
||||||
let result = self.service.poll_and_check_next_reply(time_stamp);
|
let result = self.service.poll_and_handle_next_reply(time_stamp);
|
||||||
assert!(result.is_ok());
|
assert!(result.is_ok());
|
||||||
assert_eq!(result.unwrap(), HandlingStatus::HandledOne);
|
assert_eq!(result.unwrap(), HandlingStatus::HandledOne);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn verify_all_replies_handled(&mut self, time_stamp: &[u8]) {
|
pub fn verify_all_replies_handled(&mut self, time_stamp: &[u8]) {
|
||||||
let result = self.service.poll_and_check_next_reply(time_stamp);
|
let result = self.service.poll_and_handle_next_reply(time_stamp);
|
||||||
assert!(result.is_ok());
|
assert!(result.is_ok());
|
||||||
assert_eq!(result.unwrap(), HandlingStatus::Empty);
|
assert_eq!(result.unwrap(), HandlingStatus::Empty);
|
||||||
}
|
}
|
||||||
@ -472,11 +448,7 @@ mod tests {
|
|||||||
.check_next_is_acceptance_success(id, accepted_token.request_id());
|
.check_next_is_acceptance_success(id, accepted_token.request_id());
|
||||||
self.pus_packet_tx
|
self.pus_packet_tx
|
||||||
.send(EcssTcAndToken::new(
|
.send(EcssTcAndToken::new(
|
||||||
TcInMemory::Vec(PacketAsVec::new(
|
PacketAsVec::new(self.service.service_helper.id(), tc.to_vec().unwrap()),
|
||||||
self.service.service_helper.id(),
|
|
||||||
//tc.to_vec().unwrap().into(),
|
|
||||||
tc.to_vec().unwrap(),
|
|
||||||
)),
|
|
||||||
accepted_token,
|
accepted_token,
|
||||||
))
|
))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
@ -1,15 +1,16 @@
|
|||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
|
|
||||||
use super::HandlingStatus;
|
use super::{DirectPusService, HandlingStatus};
|
||||||
use crate::pus::create_verification_reporter;
|
use crate::pus::create_verification_reporter;
|
||||||
use log::{error, warn};
|
|
||||||
use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT;
|
use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT;
|
||||||
use satrs::pus::event_man::EventRequestWithToken;
|
use satrs::pus::event_man::EventRequestWithToken;
|
||||||
use satrs::pus::event_srv::PusEventServiceHandler;
|
use satrs::pus::event_srv::PusEventServiceHandler;
|
||||||
use satrs::pus::verification::VerificationReporter;
|
use satrs::pus::verification::VerificationReporter;
|
||||||
use satrs::pus::{
|
use satrs::pus::{
|
||||||
EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper,
|
DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver,
|
||||||
|
PartialPusHandlingError, PusServiceHelper,
|
||||||
};
|
};
|
||||||
|
use satrs::spacepackets::ecss::PusServiceId;
|
||||||
use satrs::tmtc::PacketAsVec;
|
use satrs::tmtc::PacketAsVec;
|
||||||
|
|
||||||
pub fn create_event_service(
|
pub fn create_event_service(
|
||||||
@ -41,32 +42,51 @@ pub struct EventServiceWrapper {
|
|||||||
>,
|
>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EventServiceWrapper {
|
impl DirectPusService for EventServiceWrapper {
|
||||||
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
const SERVICE_ID: u8 = PusServiceId::Event as u8;
|
||||||
match self.handler.poll_and_handle_next_tc(time_stamp) {
|
|
||||||
Ok(result) => match result {
|
const SERVICE_STR: &'static str = "events";
|
||||||
PusPacketHandlerResult::RequestHandled => {
|
|
||||||
return HandlingStatus::HandledOne;
|
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
}
|
let error_handler = |partial_error: &PartialPusHandlingError| {
|
||||||
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
|
log::warn!(
|
||||||
warn!("PUS 5 partial packet handling success: {e:?}");
|
"PUS {}({}) partial error: {:?}",
|
||||||
return HandlingStatus::HandledOne;
|
Self::SERVICE_ID,
|
||||||
}
|
Self::SERVICE_STR,
|
||||||
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
|
partial_error
|
||||||
warn!("PUS 5 invalid subservice {invalid}");
|
);
|
||||||
return HandlingStatus::HandledOne;
|
};
|
||||||
}
|
let result = self
|
||||||
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
.handler
|
||||||
warn!("PUS 5 subservice {subservice} not implemented");
|
.poll_and_handle_next_tc(error_handler, time_stamp);
|
||||||
return HandlingStatus::HandledOne;
|
if let Err(e) = result {
|
||||||
}
|
log::warn!(
|
||||||
PusPacketHandlerResult::Empty => (),
|
"PUS {}({}) error: {:?}",
|
||||||
},
|
Self::SERVICE_ID,
|
||||||
Err(error) => {
|
Self::SERVICE_STR,
|
||||||
error!("PUS packet handling error: {error:?}");
|
e
|
||||||
|
);
|
||||||
|
return HandlingStatus::HandledOne;
|
||||||
|
}
|
||||||
|
match result.unwrap() {
|
||||||
|
DirectPusPacketHandlerResult::Handled(handling_status) => return handling_status,
|
||||||
|
DirectPusPacketHandlerResult::CustomSubservice(subservice, _) => {
|
||||||
|
log::warn!(
|
||||||
|
"PUS {}({}) subservice {} not implemented",
|
||||||
|
Self::SERVICE_ID,
|
||||||
|
Self::SERVICE_STR,
|
||||||
|
subservice
|
||||||
|
);
|
||||||
|
}
|
||||||
|
DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
||||||
|
log::warn!(
|
||||||
|
"PUS {}({}) subservice {} not implemented",
|
||||||
|
Self::SERVICE_ID,
|
||||||
|
Self::SERVICE_STR,
|
||||||
|
subservice
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// To avoid permanent loops, treat queue empty by default (all tasks done).
|
HandlingStatus::HandledOne
|
||||||
HandlingStatus::Empty
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
use derive_new::new;
|
use derive_new::new;
|
||||||
use log::{error, warn};
|
|
||||||
use ops_sat_rs::config::components::PUS_HK_SERVICE;
|
use ops_sat_rs::config::components::PUS_HK_SERVICE;
|
||||||
use ops_sat_rs::config::{hk_err, tmtc_err};
|
use ops_sat_rs::config::{hk_err, tmtc_err};
|
||||||
use satrs::hk::{CollectionIntervalFactor, HkRequest, HkRequestVariant, UniqueId};
|
use satrs::hk::{CollectionIntervalFactor, HkRequest, HkRequestVariant, UniqueId};
|
||||||
@ -10,7 +9,7 @@ use satrs::pus::verification::{
|
|||||||
use satrs::pus::{
|
use satrs::pus::{
|
||||||
ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken,
|
ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken,
|
||||||
EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError,
|
EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError,
|
||||||
PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter,
|
PusPacketHandlingError, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter,
|
||||||
};
|
};
|
||||||
use satrs::request::{GenericMessage, UniqueApidTargetId};
|
use satrs::request::{GenericMessage, UniqueApidTargetId};
|
||||||
use satrs::spacepackets::ecss::tc::PusTcReader;
|
use satrs::spacepackets::ecss::tc::PusTcReader;
|
||||||
@ -22,7 +21,7 @@ use std::time::Duration;
|
|||||||
use crate::pus::{create_verification_reporter, generic_pus_request_timeout_handler};
|
use crate::pus::{create_verification_reporter, generic_pus_request_timeout_handler};
|
||||||
use crate::requests::GenericRequestRouter;
|
use crate::requests::GenericRequestRouter;
|
||||||
|
|
||||||
use super::{HandlingStatus, PusTargetedRequestService};
|
use super::{HandlingStatus, PusTargetedRequestService, TargetedPusService};
|
||||||
|
|
||||||
#[derive(Clone, PartialEq, Debug, new)]
|
#[derive(Clone, PartialEq, Debug, new)]
|
||||||
pub struct HkReply {
|
pub struct HkReply {
|
||||||
@ -267,47 +266,25 @@ pub struct HkServiceWrapper {
|
|||||||
>,
|
>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HkServiceWrapper {
|
impl TargetedPusService for HkServiceWrapper {
|
||||||
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
const SERVICE_ID: u8 = 3;
|
||||||
match self.service.poll_and_handle_next_tc(time_stamp) {
|
|
||||||
Ok(result) => match result {
|
const SERVICE_STR: &'static str = "housekeeping";
|
||||||
PusPacketHandlerResult::RequestHandled => {
|
|
||||||
return HandlingStatus::HandledOne;
|
delegate::delegate! {
|
||||||
}
|
to self.service {
|
||||||
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
|
fn poll_and_handle_next_tc(
|
||||||
warn!("PUS 3 partial packet handling success: {e:?}");
|
&mut self,
|
||||||
return HandlingStatus::HandledOne;
|
time_stamp: &[u8],
|
||||||
}
|
) -> Result<HandlingStatus, PusPacketHandlingError>;
|
||||||
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
|
|
||||||
warn!("PUS 3 invalid subservice {invalid}");
|
fn poll_and_handle_next_reply(
|
||||||
return HandlingStatus::HandledOne;
|
&mut self,
|
||||||
}
|
time_stamp: &[u8],
|
||||||
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
) -> Result<HandlingStatus, EcssTmtcError>;
|
||||||
warn!("PUS 3 subservice {subservice} not implemented");
|
|
||||||
return HandlingStatus::HandledOne;
|
fn check_for_request_timeouts(&mut self);
|
||||||
}
|
|
||||||
PusPacketHandlerResult::Empty => (),
|
|
||||||
},
|
|
||||||
Err(error) => {
|
|
||||||
error!("PUS packet handling error: {error:?}");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// To avoid permanent loops, treat queue empty by default (all tasks done).
|
|
||||||
HandlingStatus::Empty
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
|
||||||
// This only fails if all senders disconnected. Treat it like an empty queue.
|
|
||||||
self.service
|
|
||||||
.poll_and_check_next_reply(time_stamp)
|
|
||||||
.unwrap_or_else(|e| {
|
|
||||||
warn!("PUS 3: Handling reply failed with error {e:?}");
|
|
||||||
HandlingStatus::Empty
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn check_for_request_timeouts(&mut self) {
|
|
||||||
self.service.check_for_request_timeouts();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ use crate::requests::GenericRequestRouter;
|
|||||||
use log::warn;
|
use log::warn;
|
||||||
use ops_sat_rs::config::components::PUS_ROUTING_SERVICE;
|
use ops_sat_rs::config::components::PUS_ROUTING_SERVICE;
|
||||||
use ops_sat_rs::config::{tmtc_err, CustomPusServiceId};
|
use ops_sat_rs::config::{tmtc_err, CustomPusServiceId};
|
||||||
use ops_sat_rs::{HandlingStatus, TimeStampHelper};
|
use ops_sat_rs::TimeStampHelper;
|
||||||
use satrs::pus::verification::{
|
use satrs::pus::verification::{
|
||||||
self, FailParams, TcStateAccepted, TcStateStarted, VerificationReporter,
|
self, FailParams, TcStateAccepted, TcStateStarted, VerificationReporter,
|
||||||
VerificationReporterCfg, VerificationReportingProvider, VerificationToken,
|
VerificationReporterCfg, VerificationReportingProvider, VerificationToken,
|
||||||
@ -18,8 +18,8 @@ use satrs::pus::verification::{
|
|||||||
use satrs::pus::{
|
use satrs::pus::{
|
||||||
ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter,
|
ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter,
|
||||||
EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError,
|
EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError,
|
||||||
MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusPacketHandlingError,
|
HandlingStatus, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlingError, PusReplyHandler,
|
||||||
PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory,
|
PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory,
|
||||||
};
|
};
|
||||||
use satrs::queue::{GenericReceiveError, GenericSendError};
|
use satrs::queue::{GenericReceiveError, GenericSendError};
|
||||||
use satrs::request::{Apid, GenericMessage, MessageMetadata};
|
use satrs::request::{Apid, GenericMessage, MessageMetadata};
|
||||||
@ -78,7 +78,7 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
sender_id: ComponentId,
|
sender_id: ComponentId,
|
||||||
tc: Vec<u8>,
|
tc: Vec<u8>,
|
||||||
) -> Result<PusPacketHandlerResult, GenericSendError> {
|
) -> Result<HandlingStatus, GenericSendError> {
|
||||||
let pus_tc_result = PusTcReader::new(&tc);
|
let pus_tc_result = PusTcReader::new(&tc);
|
||||||
if pus_tc_result.is_err() {
|
if pus_tc_result.is_err() {
|
||||||
log::warn!(
|
log::warn!(
|
||||||
@ -87,7 +87,7 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
|
|||||||
pus_tc_result.unwrap_err()
|
pus_tc_result.unwrap_err()
|
||||||
);
|
);
|
||||||
log::warn!("raw data: {:x?}", tc);
|
log::warn!("raw data: {:x?}", tc);
|
||||||
return Ok(PusPacketHandlerResult::RequestHandled);
|
return Ok(HandlingStatus::HandledOne);
|
||||||
}
|
}
|
||||||
let pus_tc = pus_tc_result.unwrap().0;
|
let pus_tc = pus_tc_result.unwrap().0;
|
||||||
let init_token = self.verif_reporter.add_tc(&pus_tc);
|
let init_token = self.verif_reporter.add_tc(&pus_tc);
|
||||||
@ -159,17 +159,65 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(PusPacketHandlerResult::RequestHandled)
|
Ok(HandlingStatus::HandledOne)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait TargetedPusService {
|
pub trait TargetedPusService {
|
||||||
/// Returns [true] interface the packet handling is finished.
|
const SERVICE_ID: u8;
|
||||||
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus;
|
const SERVICE_STR: &'static str;
|
||||||
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus;
|
|
||||||
|
fn poll_and_handle_next_tc_default_handler(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
|
let result = self.poll_and_handle_next_tc(time_stamp);
|
||||||
|
if let Err(e) = result {
|
||||||
|
log::error!(
|
||||||
|
"PUS service {}({})packet handling error: {:?}",
|
||||||
|
Self::SERVICE_ID,
|
||||||
|
Self::SERVICE_STR,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
// To avoid permanent loops on error cases.
|
||||||
|
return HandlingStatus::Empty;
|
||||||
|
}
|
||||||
|
result.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_and_handle_next_reply_default_handler(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
|
// This only fails if all senders disconnected. Treat it like an empty queue.
|
||||||
|
self.poll_and_handle_next_reply(time_stamp)
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
warn!(
|
||||||
|
"PUS servce {}({}): Handling reply failed with error {:?}",
|
||||||
|
Self::SERVICE_ID,
|
||||||
|
Self::SERVICE_STR,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
HandlingStatus::Empty
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_and_handle_next_tc(
|
||||||
|
&mut self,
|
||||||
|
time_stamp: &[u8],
|
||||||
|
) -> Result<HandlingStatus, PusPacketHandlingError>;
|
||||||
|
|
||||||
|
fn poll_and_handle_next_reply(
|
||||||
|
&mut self,
|
||||||
|
time_stamp: &[u8],
|
||||||
|
) -> Result<HandlingStatus, EcssTmtcError>;
|
||||||
|
|
||||||
fn check_for_request_timeouts(&mut self);
|
fn check_for_request_timeouts(&mut self);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Generic trait for services which handle packets directly. Kept minimal right now because
|
||||||
|
/// of the difficulty to allow flexible user code for these services..
|
||||||
|
pub trait DirectPusService {
|
||||||
|
const SERVICE_ID: u8;
|
||||||
|
const SERVICE_STR: &'static str;
|
||||||
|
|
||||||
|
fn poll_and_handle_next_tc(&mut self, timestamp: &[u8]) -> HandlingStatus;
|
||||||
|
}
|
||||||
|
|
||||||
/// This is a generic handlers class for all PUS services where a PUS telecommand is converted
|
/// This is a generic handlers class for all PUS services where a PUS telecommand is converted
|
||||||
/// to a targeted request.
|
/// to a targeted request.
|
||||||
///
|
///
|
||||||
@ -262,10 +310,10 @@ where
|
|||||||
pub fn poll_and_handle_next_tc(
|
pub fn poll_and_handle_next_tc(
|
||||||
&mut self,
|
&mut self,
|
||||||
time_stamp: &[u8],
|
time_stamp: &[u8],
|
||||||
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
|
) -> Result<HandlingStatus, PusPacketHandlingError> {
|
||||||
let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?;
|
let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?;
|
||||||
if possible_packet.is_none() {
|
if possible_packet.is_none() {
|
||||||
return Ok(PusPacketHandlerResult::Empty);
|
return Ok(HandlingStatus::Empty);
|
||||||
}
|
}
|
||||||
let ecss_tc_and_token = possible_packet.unwrap();
|
let ecss_tc_and_token = possible_packet.unwrap();
|
||||||
self.service_helper
|
self.service_helper
|
||||||
@ -321,7 +369,7 @@ where
|
|||||||
return Err(e.into());
|
return Err(e.into());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(PusPacketHandlerResult::RequestHandled)
|
Ok(HandlingStatus::HandledOne)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_conversion_to_request_error(
|
fn handle_conversion_to_request_error(
|
||||||
@ -374,7 +422,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn poll_and_check_next_reply(
|
pub fn poll_and_handle_next_reply(
|
||||||
&mut self,
|
&mut self,
|
||||||
time_stamp: &[u8],
|
time_stamp: &[u8],
|
||||||
) -> Result<HandlingStatus, EcssTmtcError> {
|
) -> Result<HandlingStatus, EcssTmtcError> {
|
||||||
|
@ -1,15 +1,14 @@
|
|||||||
use derive_new::new;
|
use derive_new::new;
|
||||||
use log::{error, warn};
|
|
||||||
use satrs::tmtc::PacketAsVec;
|
use satrs::tmtc::PacketAsVec;
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::requests::GenericRequestRouter;
|
use crate::requests::GenericRequestRouter;
|
||||||
use ops_sat_rs::config::components::PUS_MODE_SERVICE;
|
use ops_sat_rs::config::components::PUS_MODE_SERVICE;
|
||||||
use ops_sat_rs::config::{mode_err, tmtc_err};
|
use ops_sat_rs::config::{mode_err, tmtc_err, CustomPusServiceId};
|
||||||
use satrs::pus::verification::VerificationReporter;
|
use satrs::pus::verification::VerificationReporter;
|
||||||
use satrs::pus::{
|
use satrs::pus::{
|
||||||
DefaultActiveRequestMap, EcssTcAndToken, EcssTcInVecConverter, PusPacketHandlerResult,
|
DefaultActiveRequestMap, EcssTcAndToken, EcssTcInVecConverter, PusPacketHandlingError,
|
||||||
PusServiceHelper,
|
PusServiceHelper,
|
||||||
};
|
};
|
||||||
use satrs::request::GenericMessage;
|
use satrs::request::GenericMessage;
|
||||||
@ -239,48 +238,27 @@ pub struct ModeServiceWrapper {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TargetedPusService for ModeServiceWrapper {
|
impl TargetedPusService for ModeServiceWrapper {
|
||||||
/// Returns [true] if the packet handling is finished.
|
const SERVICE_ID: u8 = CustomPusServiceId::Mode as u8;
|
||||||
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
|
||||||
match self.service.poll_and_handle_next_tc(time_stamp) {
|
const SERVICE_STR: &'static str = "mode";
|
||||||
Ok(result) => match result {
|
|
||||||
PusPacketHandlerResult::RequestHandled => {
|
delegate::delegate! {
|
||||||
return HandlingStatus::HandledOne;
|
to self.service {
|
||||||
}
|
fn poll_and_handle_next_tc(
|
||||||
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
|
&mut self,
|
||||||
warn!("PUS mode service: partial packet handling success: {e:?}");
|
time_stamp: &[u8],
|
||||||
return HandlingStatus::HandledOne;
|
) -> Result<HandlingStatus, PusPacketHandlingError>;
|
||||||
}
|
|
||||||
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
|
fn poll_and_handle_next_reply(
|
||||||
warn!("PUS mode service: invalid subservice {invalid}");
|
&mut self,
|
||||||
return HandlingStatus::HandledOne;
|
time_stamp: &[u8],
|
||||||
}
|
) -> Result<HandlingStatus, EcssTmtcError>;
|
||||||
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
|
||||||
warn!("PUS mode service: {subservice} not implemented");
|
fn check_for_request_timeouts(&mut self);
|
||||||
return HandlingStatus::HandledOne;
|
|
||||||
}
|
|
||||||
PusPacketHandlerResult::Empty => (),
|
|
||||||
},
|
|
||||||
Err(error) => {
|
|
||||||
error!("PUS mode service: packet handling error: {error:?}");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// To avoid permanent loops, treat queue empty by default (all tasks done).
|
|
||||||
HandlingStatus::Empty
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
|
||||||
self.service
|
|
||||||
.poll_and_check_next_reply(time_stamp)
|
|
||||||
.unwrap_or_else(|e| {
|
|
||||||
warn!("PUS action service: Handling reply failed with error {e:?}");
|
|
||||||
HandlingStatus::HandledOne
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn check_for_request_timeouts(&mut self) {
|
|
||||||
self.service.check_for_request_timeouts();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use ops_sat_rs::config::tmtc_err;
|
use ops_sat_rs::config::tmtc_err;
|
||||||
|
@ -2,65 +2,20 @@ use std::sync::mpsc;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::pus::create_verification_reporter;
|
use crate::pus::create_verification_reporter;
|
||||||
use log::{error, info, warn};
|
use log::info;
|
||||||
use ops_sat_rs::config::components::PUS_SCHEDULER_SERVICE;
|
use ops_sat_rs::config::components::PUS_SCHEDULER_SERVICE;
|
||||||
use satrs::pool::{PoolProvider, StaticMemoryPool};
|
use satrs::pool::StaticMemoryPool;
|
||||||
use satrs::pus::scheduler::{PusScheduler, TcInfo};
|
use satrs::pus::scheduler::{PusScheduler, TcInfo};
|
||||||
use satrs::pus::scheduler_srv::PusSchedServiceHandler;
|
use satrs::pus::scheduler_srv::PusSchedServiceHandler;
|
||||||
use satrs::pus::verification::VerificationReporter;
|
use satrs::pus::verification::VerificationReporter;
|
||||||
use satrs::pus::{
|
use satrs::pus::{
|
||||||
EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper,
|
DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, HandlingStatus,
|
||||||
|
MpscTcReceiver, PartialPusHandlingError, PusServiceHelper,
|
||||||
};
|
};
|
||||||
use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool};
|
use satrs::spacepackets::ecss::PusServiceId;
|
||||||
use satrs::ComponentId;
|
use satrs::tmtc::PacketAsVec;
|
||||||
|
|
||||||
use super::HandlingStatus;
|
use super::DirectPusService;
|
||||||
|
|
||||||
pub trait TcReleaser {
|
|
||||||
fn release(&mut self, sender_id: ComponentId, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TcReleaser for PacketSenderWithSharedPool {
|
|
||||||
fn release(
|
|
||||||
&mut self,
|
|
||||||
sender_id: ComponentId,
|
|
||||||
enabled: bool,
|
|
||||||
_info: &TcInfo,
|
|
||||||
tc: &[u8],
|
|
||||||
) -> bool {
|
|
||||||
if enabled {
|
|
||||||
let shared_pool = self.shared_pool.get_mut();
|
|
||||||
// Transfer TC from scheduler TC pool to shared TC pool.
|
|
||||||
let released_tc_addr = shared_pool
|
|
||||||
.0
|
|
||||||
.write()
|
|
||||||
.expect("locking pool failed")
|
|
||||||
.add(tc)
|
|
||||||
.expect("adding TC to shared pool failed");
|
|
||||||
self.sender
|
|
||||||
.send(PacketInPool::new(sender_id, released_tc_addr))
|
|
||||||
.expect("sending TC to TC source failed");
|
|
||||||
}
|
|
||||||
true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TcReleaser for mpsc::Sender<PacketAsVec> {
|
|
||||||
fn release(
|
|
||||||
&mut self,
|
|
||||||
sender_id: ComponentId,
|
|
||||||
enabled: bool,
|
|
||||||
_info: &TcInfo,
|
|
||||||
tc: &[u8],
|
|
||||||
) -> bool {
|
|
||||||
if enabled {
|
|
||||||
// Send released TC to centralized TC source.
|
|
||||||
self.send(PacketAsVec::new(sender_id, tc.to_vec()))
|
|
||||||
.expect("sending TC to TC source failed");
|
|
||||||
}
|
|
||||||
true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct SchedulingService {
|
pub struct SchedulingService {
|
||||||
pub pus_11_handler: PusSchedServiceHandler<
|
pub pus_11_handler: PusSchedServiceHandler<
|
||||||
@ -72,14 +27,72 @@ pub struct SchedulingService {
|
|||||||
>,
|
>,
|
||||||
pub sched_tc_pool: StaticMemoryPool,
|
pub sched_tc_pool: StaticMemoryPool,
|
||||||
pub releaser_buf: [u8; 4096],
|
pub releaser_buf: [u8; 4096],
|
||||||
pub tc_releaser: Box<dyn TcReleaser + Send>,
|
pub tc_releaser: mpsc::Sender<PacketAsVec>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DirectPusService for SchedulingService {
|
||||||
|
const SERVICE_ID: u8 = PusServiceId::Verification as u8;
|
||||||
|
|
||||||
|
const SERVICE_STR: &'static str = "verification";
|
||||||
|
|
||||||
|
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
|
let error_handler = |partial_error: &PartialPusHandlingError| {
|
||||||
|
log::warn!(
|
||||||
|
"PUS {}({}) partial error: {:?}",
|
||||||
|
Self::SERVICE_ID,
|
||||||
|
Self::SERVICE_STR,
|
||||||
|
partial_error
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = self.pus_11_handler.poll_and_handle_next_tc(
|
||||||
|
error_handler,
|
||||||
|
time_stamp,
|
||||||
|
&mut self.sched_tc_pool,
|
||||||
|
);
|
||||||
|
if let Err(e) = result {
|
||||||
|
log::warn!(
|
||||||
|
"PUS {}({}) error: {:?}",
|
||||||
|
Self::SERVICE_ID,
|
||||||
|
Self::SERVICE_STR,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
return HandlingStatus::HandledOne;
|
||||||
|
}
|
||||||
|
match result.unwrap() {
|
||||||
|
DirectPusPacketHandlerResult::Handled(handling_status) => return handling_status,
|
||||||
|
DirectPusPacketHandlerResult::CustomSubservice(subservice, _) => {
|
||||||
|
log::warn!(
|
||||||
|
"PUS {}({}) subservice {} not implemented",
|
||||||
|
Self::SERVICE_ID,
|
||||||
|
Self::SERVICE_STR,
|
||||||
|
subservice
|
||||||
|
);
|
||||||
|
}
|
||||||
|
DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
||||||
|
log::warn!(
|
||||||
|
"PUS {}({}) subservice {} not implemented",
|
||||||
|
Self::SERVICE_ID,
|
||||||
|
Self::SERVICE_STR,
|
||||||
|
subservice
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
HandlingStatus::HandledOne
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SchedulingService {
|
impl SchedulingService {
|
||||||
pub fn release_tcs(&mut self) {
|
pub fn release_tcs(&mut self) {
|
||||||
let id = self.pus_11_handler.service_helper.id();
|
let id = self.pus_11_handler.service_helper.id();
|
||||||
let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool {
|
let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool {
|
||||||
self.tc_releaser.release(id, enabled, info, tc)
|
if enabled {
|
||||||
|
// Send released TC to centralized TC source.
|
||||||
|
self.tc_releaser
|
||||||
|
.send(PacketAsVec::new(id, tc.to_vec()))
|
||||||
|
.expect("sending TC to TC source failed");
|
||||||
|
}
|
||||||
|
true
|
||||||
};
|
};
|
||||||
|
|
||||||
self.pus_11_handler
|
self.pus_11_handler
|
||||||
@ -99,37 +112,6 @@ impl SchedulingService {
|
|||||||
info!("{released_tcs} TC(s) released from scheduler");
|
info!("{released_tcs} TC(s) released from scheduler");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
|
||||||
match self
|
|
||||||
.pus_11_handler
|
|
||||||
.poll_and_handle_next_tc(time_stamp, &mut self.sched_tc_pool)
|
|
||||||
{
|
|
||||||
Ok(result) => match result {
|
|
||||||
PusPacketHandlerResult::RequestHandled => {
|
|
||||||
return HandlingStatus::HandledOne;
|
|
||||||
}
|
|
||||||
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
|
|
||||||
warn!("PUS11 partial packet handling success: {e:?}");
|
|
||||||
return HandlingStatus::HandledOne;
|
|
||||||
}
|
|
||||||
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
|
|
||||||
warn!("PUS11 invalid subservice {invalid}");
|
|
||||||
return HandlingStatus::HandledOne;
|
|
||||||
}
|
|
||||||
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
|
||||||
warn!("PUS11: Subservice {subservice} not implemented");
|
|
||||||
return HandlingStatus::HandledOne;
|
|
||||||
}
|
|
||||||
PusPacketHandlerResult::Empty => (),
|
|
||||||
},
|
|
||||||
Err(error) => {
|
|
||||||
error!("PUS packet handling error: {error:?}")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// To avoid permanent loops, treat queue empty by default (all tasks done).
|
|
||||||
HandlingStatus::Empty
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn create_scheduler_service(
|
pub fn create_scheduler_service(
|
||||||
@ -158,6 +140,6 @@ pub fn create_scheduler_service(
|
|||||||
pus_11_handler,
|
pus_11_handler,
|
||||||
sched_tc_pool,
|
sched_tc_pool,
|
||||||
releaser_buf: [0; 4096],
|
releaser_buf: [0; 4096],
|
||||||
tc_releaser: Box::new(tc_source_sender),
|
tc_releaser: tc_source_sender,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,7 @@ use satrs::spacepackets::time::{cds, TimeWriter};
|
|||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper,
|
action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper,
|
||||||
mode::ModeServiceWrapper, scheduler::SchedulingService, TargetedPusService,
|
mode::ModeServiceWrapper, scheduler::SchedulingService, DirectPusService, TargetedPusService,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(new)]
|
#[derive(new)]
|
||||||
@ -23,59 +23,28 @@ impl PusStack {
|
|||||||
// Release all telecommands which reached their release time before calling the service
|
// Release all telecommands which reached their release time before calling the service
|
||||||
// handlers.
|
// handlers.
|
||||||
self.schedule_srv.release_tcs();
|
self.schedule_srv.release_tcs();
|
||||||
let time_stamp = cds::CdsTime::now_with_u16_days()
|
let timestamp = cds::CdsTime::now_with_u16_days()
|
||||||
.expect("time stamp generation error")
|
.expect("time stamp generation error")
|
||||||
.to_vec()
|
.to_vec()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut loop_count = 0;
|
let mut loop_count = 0_u32;
|
||||||
|
// Hot loop which will run continuously until all request and reply handling is done.
|
||||||
loop {
|
loop {
|
||||||
let mut nothing_to_do = true;
|
let mut nothing_to_do = true;
|
||||||
let mut is_srv_finished =
|
Self::direct_service_checker(&mut self.test_srv, ×tamp, &mut nothing_to_do);
|
||||||
|_srv_id: u8,
|
Self::direct_service_checker(&mut self.schedule_srv, ×tamp, &mut nothing_to_do);
|
||||||
tc_handling_status: HandlingStatus,
|
Self::direct_service_checker(&mut self.event_srv, ×tamp, &mut nothing_to_do);
|
||||||
reply_handling_status: Option<HandlingStatus>| {
|
Self::targeted_service_checker(
|
||||||
if tc_handling_status == HandlingStatus::HandledOne
|
&mut self.action_srv_wrapper,
|
||||||
|| (reply_handling_status.is_some()
|
×tamp,
|
||||||
&& reply_handling_status.unwrap() == HandlingStatus::HandledOne)
|
&mut nothing_to_do,
|
||||||
{
|
|
||||||
nothing_to_do = false;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
is_srv_finished(
|
|
||||||
17,
|
|
||||||
self.test_srv.poll_and_handle_next_packet(&time_stamp),
|
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
is_srv_finished(
|
Self::targeted_service_checker(
|
||||||
11,
|
&mut self.hk_srv_wrapper,
|
||||||
self.schedule_srv.poll_and_handle_next_tc(&time_stamp),
|
×tamp,
|
||||||
None,
|
&mut nothing_to_do,
|
||||||
);
|
);
|
||||||
is_srv_finished(5, self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
|
Self::targeted_service_checker(&mut self.mode_srv, ×tamp, &mut nothing_to_do);
|
||||||
is_srv_finished(
|
|
||||||
8,
|
|
||||||
self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
|
|
||||||
Some(
|
|
||||||
self.action_srv_wrapper
|
|
||||||
.poll_and_handle_next_reply(&time_stamp),
|
|
||||||
),
|
|
||||||
);
|
|
||||||
is_srv_finished(
|
|
||||||
3,
|
|
||||||
self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
|
|
||||||
Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)),
|
|
||||||
);
|
|
||||||
is_srv_finished(
|
|
||||||
200,
|
|
||||||
self.mode_srv.poll_and_handle_next_tc(&time_stamp),
|
|
||||||
Some(self.mode_srv.poll_and_handle_next_reply(&time_stamp)),
|
|
||||||
);
|
|
||||||
// Safety mechanism to avoid infinite loops.
|
|
||||||
loop_count += 1;
|
|
||||||
if loop_count >= 500 {
|
|
||||||
log::warn!("reached PUS stack loop count 500, breaking");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if nothing_to_do {
|
if nothing_to_do {
|
||||||
// Timeout checking is only done once.
|
// Timeout checking is only done once.
|
||||||
self.action_srv_wrapper.check_for_request_timeouts();
|
self.action_srv_wrapper.check_for_request_timeouts();
|
||||||
@ -83,6 +52,37 @@ impl PusStack {
|
|||||||
self.mode_srv.check_for_request_timeouts();
|
self.mode_srv.check_for_request_timeouts();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
// Safety mechanism to avoid infinite loops.
|
||||||
|
loop_count += 1;
|
||||||
|
if loop_count >= 500 {
|
||||||
|
log::warn!("reached PUS stack loop count 500, breaking");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn direct_service_checker<S: DirectPusService>(
|
||||||
|
service: &mut S,
|
||||||
|
timestamp: &[u8],
|
||||||
|
nothing_to_do: &mut bool,
|
||||||
|
) {
|
||||||
|
let handling_status = service.poll_and_handle_next_tc(timestamp);
|
||||||
|
if handling_status == HandlingStatus::HandledOne {
|
||||||
|
*nothing_to_do = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn targeted_service_checker<S: TargetedPusService>(
|
||||||
|
service: &mut S,
|
||||||
|
timestamp: &[u8],
|
||||||
|
nothing_to_do: &mut bool,
|
||||||
|
) {
|
||||||
|
let request_handling = service.poll_and_handle_next_tc_default_handler(timestamp);
|
||||||
|
let reply_handling = service.poll_and_handle_next_reply_default_handler(timestamp);
|
||||||
|
if request_handling == HandlingStatus::HandledOne
|
||||||
|
|| reply_handling == HandlingStatus::HandledOne
|
||||||
|
{
|
||||||
|
*nothing_to_do = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
141
src/pus/test.rs
141
src/pus/test.rs
@ -1,22 +1,20 @@
|
|||||||
use crate::pus::create_verification_reporter;
|
use crate::pus::create_verification_reporter;
|
||||||
use log::{info, warn};
|
use log::info;
|
||||||
use ops_sat_rs::config::components::PUS_TEST_SERVICE;
|
use ops_sat_rs::config::components::PUS_TEST_SERVICE;
|
||||||
use ops_sat_rs::config::{tmtc_err, TEST_EVENT};
|
use ops_sat_rs::config::{tmtc_err, TEST_EVENT};
|
||||||
use satrs::event_man::{EventMessage, EventMessageU32};
|
use satrs::event_man::{EventMessage, EventMessageU32};
|
||||||
use satrs::pus::test::PusService17TestHandler;
|
use satrs::pus::test::PusService17TestHandler;
|
||||||
use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider};
|
use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider};
|
||||||
use satrs::pus::{
|
use satrs::pus::{
|
||||||
EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender,
|
DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, HandlingStatus,
|
||||||
PusPacketHandlerResult, PusServiceHelper,
|
MpscTcReceiver, MpscTmAsVecSender, PartialPusHandlingError, PusServiceHelper,
|
||||||
};
|
};
|
||||||
use satrs::spacepackets::ecss::tc::PusTcReader;
|
use satrs::queue::GenericSendError;
|
||||||
use satrs::spacepackets::ecss::PusPacket;
|
use satrs::spacepackets::ecss::PusServiceId;
|
||||||
use satrs::spacepackets::time::cds::CdsTime;
|
|
||||||
use satrs::spacepackets::time::TimeWriter;
|
|
||||||
use satrs::tmtc::PacketAsVec;
|
use satrs::tmtc::PacketAsVec;
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
|
|
||||||
use super::HandlingStatus;
|
use super::DirectPusService;
|
||||||
|
|
||||||
pub fn create_test_service(
|
pub fn create_test_service(
|
||||||
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
|
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
|
||||||
@ -46,64 +44,84 @@ pub struct TestCustomServiceWrapper {
|
|||||||
pub event_tx: mpsc::SyncSender<EventMessageU32>,
|
pub event_tx: mpsc::SyncSender<EventMessageU32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TestCustomServiceWrapper {
|
impl DirectPusService for TestCustomServiceWrapper {
|
||||||
pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
const SERVICE_ID: u8 = PusServiceId::Test as u8;
|
||||||
let res = self.handler.poll_and_handle_next_tc(time_stamp);
|
const SERVICE_STR: &'static str = "test";
|
||||||
if res.is_err() {
|
|
||||||
warn!("PUS17 handlers failed with error {:?}", res.unwrap_err());
|
fn poll_and_handle_next_tc(&mut self, timestamp: &[u8]) -> HandlingStatus {
|
||||||
return HandlingStatus::Empty;
|
let error_handler = |partial_error: &PartialPusHandlingError| {
|
||||||
|
log::warn!(
|
||||||
|
"PUS {}({}) partial error: {:?}",
|
||||||
|
Self::SERVICE_ID,
|
||||||
|
Self::SERVICE_STR,
|
||||||
|
partial_error
|
||||||
|
);
|
||||||
|
};
|
||||||
|
let res = self
|
||||||
|
.handler
|
||||||
|
.poll_and_handle_next_tc(error_handler, timestamp);
|
||||||
|
if let Err(e) = res {
|
||||||
|
log::warn!(
|
||||||
|
"PUS {}({}) error: {:?}",
|
||||||
|
Self::SERVICE_ID,
|
||||||
|
Self::SERVICE_STR,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
return HandlingStatus::HandledOne;
|
||||||
}
|
}
|
||||||
match res.unwrap() {
|
match res.unwrap() {
|
||||||
PusPacketHandlerResult::RequestHandled => {
|
DirectPusPacketHandlerResult::Handled(handling_status) => {
|
||||||
info!("Received PUS ping command TC[17,1]");
|
if handling_status == HandlingStatus::HandledOne {
|
||||||
info!("Sent ping reply PUS TM[17,2]");
|
info!("Received PUS ping command TC[17,1]");
|
||||||
return HandlingStatus::HandledOne;
|
info!("Sent ping reply PUS TM[17,2]");
|
||||||
|
}
|
||||||
|
return handling_status;
|
||||||
}
|
}
|
||||||
PusPacketHandlerResult::RequestHandledPartialSuccess(partial_err) => {
|
DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
||||||
warn!(
|
log::warn!(
|
||||||
"Handled PUS ping command with partial success: {:?}",
|
"PUS {}({}) subservice {} not implemented",
|
||||||
partial_err
|
Self::SERVICE_ID,
|
||||||
|
Self::SERVICE_STR,
|
||||||
|
subservice
|
||||||
);
|
);
|
||||||
return HandlingStatus::HandledOne;
|
|
||||||
}
|
}
|
||||||
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
DirectPusPacketHandlerResult::CustomSubservice(subservice, token) => {
|
||||||
warn!("PUS17: Subservice {subservice} not implemented");
|
|
||||||
return HandlingStatus::HandledOne;
|
|
||||||
}
|
|
||||||
// TODO: adapt interface events are implemented
|
|
||||||
PusPacketHandlerResult::CustomSubservice(subservice, token) => {
|
|
||||||
let (tc, _) = PusTcReader::new(
|
|
||||||
self.handler
|
|
||||||
.service_helper
|
|
||||||
.tc_in_mem_converter
|
|
||||||
.tc_slice_raw(),
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
let time_stamper = CdsTime::now_with_u16_days().unwrap();
|
|
||||||
let mut stamp_buf: [u8; 7] = [0; 7];
|
|
||||||
time_stamper.write_to_bytes(&mut stamp_buf).unwrap();
|
|
||||||
if subservice == 128 {
|
if subservice == 128 {
|
||||||
info!("Generating test event");
|
info!("generating test event");
|
||||||
self.event_tx
|
if let Err(e) = self
|
||||||
|
.event_tx
|
||||||
.send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into()))
|
.send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into()))
|
||||||
.expect("Sending test event failed");
|
.map_err(|_| GenericSendError::RxDisconnected)
|
||||||
let start_token = self
|
{
|
||||||
.handler
|
// This really should not happen but I want to avoid panicking..
|
||||||
.service_helper
|
log::warn!("failed to send test event: {:?}", e);
|
||||||
.verif_reporter()
|
}
|
||||||
.start_success(self.handler.service_helper.tm_sender(), token, &stamp_buf)
|
|
||||||
.expect("Error sending start success");
|
match self.handler.service_helper.verif_reporter().start_success(
|
||||||
self.handler
|
self.handler.service_helper.tm_sender(),
|
||||||
.service_helper
|
token,
|
||||||
.verif_reporter()
|
timestamp,
|
||||||
.completion_success(
|
) {
|
||||||
self.handler.service_helper.tm_sender(),
|
Ok(started_token) => {
|
||||||
start_token,
|
if let Err(e) = self
|
||||||
&stamp_buf,
|
.handler
|
||||||
)
|
.service_helper
|
||||||
.expect("Error sending completion success");
|
.verif_reporter()
|
||||||
|
.completion_success(
|
||||||
|
self.handler.service_helper.tm_sender(),
|
||||||
|
started_token,
|
||||||
|
timestamp,
|
||||||
|
)
|
||||||
|
{
|
||||||
|
error_handler(&PartialPusHandlingError::Verification(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error_handler(&PartialPusHandlingError::Verification(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
let fail_data = [tc.subservice()];
|
let fail_data = [subservice];
|
||||||
self.handler
|
self.handler
|
||||||
.service_helper
|
.service_helper
|
||||||
.verif_reporter()
|
.verif_reporter()
|
||||||
@ -111,18 +129,15 @@ impl TestCustomServiceWrapper {
|
|||||||
self.handler.service_helper.tm_sender(),
|
self.handler.service_helper.tm_sender(),
|
||||||
token,
|
token,
|
||||||
FailParams::new(
|
FailParams::new(
|
||||||
&stamp_buf,
|
timestamp,
|
||||||
&tmtc_err::INVALID_PUS_SUBSERVICE,
|
&tmtc_err::INVALID_PUS_SUBSERVICE,
|
||||||
&fail_data,
|
&fail_data,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.expect("Sending start failure verification failed");
|
.expect("Sending start failure verification failed");
|
||||||
}
|
}
|
||||||
return HandlingStatus::HandledOne;
|
|
||||||
}
|
}
|
||||||
PusPacketHandlerResult::Empty => (),
|
|
||||||
}
|
}
|
||||||
// To avoid permanent loops, treat queue empty by default (all tasks done).
|
HandlingStatus::HandledOne
|
||||||
HandlingStatus::Empty
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
use std::sync::mpsc::{self, TryRecvError};
|
use std::sync::mpsc::{self, TryRecvError};
|
||||||
|
|
||||||
use ops_sat_rs::HandlingStatus;
|
use satrs::{
|
||||||
use satrs::{pus::MpscTmAsVecSender, tmtc::PacketAsVec};
|
pus::{HandlingStatus, MpscTmAsVecSender},
|
||||||
|
tmtc::PacketAsVec,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::pus::PusTcDistributor;
|
use crate::pus::PusTcDistributor;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user