From b7ab1d5ea7b4b3c6bba138f6b9c5490c1d61f7c2 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 22 Mar 2024 18:21:49 +0100 Subject: [PATCH] incredible, but it seems to work.. --- satrs-example/Cargo.toml | 3 + satrs-example/src/acs/mgm.rs | 4 +- satrs-example/src/main.rs | 6 +- satrs-example/src/pus/action.rs | 214 ++++++++++++++++++++++++++++++-- satrs-example/src/pus/hk.rs | 4 + satrs-example/src/pus/mod.rs | 59 ++++++++- satrs-example/src/pus/stack.rs | 6 +- satrs-example/src/requests.rs | 11 +- satrs/src/pus/mod.rs | 3 +- satrs/src/queue.rs | 4 +- satrs/src/request.rs | 2 +- 11 files changed, 288 insertions(+), 28 deletions(-) diff --git a/satrs-example/Cargo.toml b/satrs-example/Cargo.toml index 2294640..dd7bf47 100644 --- a/satrs-example/Cargo.toml +++ b/satrs-example/Cargo.toml @@ -30,3 +30,6 @@ path = "../satrs-mib" [features] dyn_tmtc = [] default = ["dyn_tmtc"] + +[dev-dependencies] +env_logger = "0.11" diff --git a/satrs-example/src/acs/mgm.rs b/satrs-example/src/acs/mgm.rs index 2e55987..91172ca 100644 --- a/satrs-example/src/acs/mgm.rs +++ b/satrs-example/src/acs/mgm.rs @@ -7,7 +7,7 @@ use satrs::request::GenericMessage; use satrs::ComponentId; use crate::pus::hk::HkReply; -use crate::requests::RequestWithToken; +use crate::requests::CompositeRequestWithToken; pub trait SpiInterface { type Error; @@ -27,7 +27,7 @@ pub struct MgmHandler { mode_request_receiver: mpsc::Receiver>, mode_reply_sender_to_pus: mpsc::Sender>, mode_reply_sender_to_parent: mpsc::Sender>, - composite_request_receiver: mpsc::Receiver, + composite_request_receiver: mpsc::Receiver, hk_reply_sender: mpsc::Sender>, hk_tm_sender: TmSender, mode: ModeAndSubmode, diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 1afc6b2..a867fd8 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -38,7 +38,7 @@ use crate::pus::hk::{create_hk_service_dynamic, create_hk_service_static}; use crate::pus::scheduler::{create_scheduler_service_dynamic, create_scheduler_service_static}; use crate::pus::test::create_test_service_static; use crate::pus::{PusReceiver, PusTcMpscRouter}; -use crate::requests::{GenericRequestRouter, RequestWithToken}; +use crate::requests::{CompositeRequestWithToken, GenericRequestRouter}; use crate::tcp::{SyncTcpTmSource, TcpTask}; use crate::tmtc::{ PusTcSourceProviderSharedPool, SharedTcPool, TcSourceTaskDynamic, TcSourceTaskStatic, @@ -86,7 +86,7 @@ fn static_tmtc_pool_main() { )); let acs_target_id = TargetAndApidId::new(PUS_APID, RequestTargetId::AcsSubsystem as u32); - let (acs_thread_tx, acs_thread_rx) = channel::(); + let (acs_thread_tx, acs_thread_rx) = channel::(); // Some request are targetable. This map is used to retrieve sender handles based on a target ID. let mut request_map = GenericRequestRouter::default(); request_map.0.insert(acs_target_id.into(), acs_thread_tx); @@ -322,7 +322,7 @@ fn dyn_tmtc_pool_main() { )); let acs_target_id = TargetAndApidId::new(PUS_APID, RequestTargetId::AcsSubsystem as u32); - let (acs_thread_tx, acs_thread_rx) = channel::(); + let (acs_thread_tx, acs_thread_rx) = channel::(); // Some request are targetable. This map is used to retrieve sender handles based on a target ID. let mut request_map = GenericRequestRouter::default(); request_map.0.insert(acs_target_id.into(), acs_thread_tx); diff --git a/satrs-example/src/pus/action.rs b/satrs-example/src/pus/action.rs index 7493ced..ad27819 100644 --- a/satrs-example/src/pus/action.rs +++ b/satrs-example/src/pus/action.rs @@ -29,7 +29,7 @@ use std::time::Duration; use crate::requests::GenericRequestRouter; -use super::{generic_pus_request_timeout_handler, PusTargetedRequestService}; +use super::{generic_pus_request_timeout_handler, PusTargetedRequestService, TargetedPusService}; pub struct ActionReplyHandler { fail_data_buf: [u8; 128], @@ -167,6 +167,11 @@ impl PusTcToRequestConverter let token = verif_reporter .start_success(token, time_stamp) .expect("sending start success verification failed"); + let req_variant = if user_data.len() == 8 { + ActionRequestVariant::NoData + } else { + ActionRequestVariant::VecData(user_data[8..].to_vec()) + }; Ok(( ActivePusActionRequestStd::new( action_id, @@ -174,10 +179,7 @@ impl PusTcToRequestConverter token, Duration::from_secs(30), ), - ActionRequest::new( - action_id, - ActionRequestVariant::VecData(user_data[8..].to_vec()), - ), + ActionRequest::new(action_id, req_variant), )) } else { verif_reporter @@ -303,9 +305,11 @@ impl< TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, - > Pus8Wrapper + > TargetedPusService + for Pus8Wrapper { - pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool { + /// Returns [true] if the packet handling is finished. + fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool { match self.service.poll_and_handle_next_tc(time_stamp) { Ok(result) => match result { PusPacketHandlerResult::RequestHandled => {} @@ -329,7 +333,7 @@ impl< false } - pub fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> bool { + fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> bool { match self.service.poll_and_check_next_reply(time_stamp) { Ok(packet_handled) => packet_handled, Err(e) => { @@ -338,4 +342,198 @@ impl< } } } + + fn check_for_request_timeouts(&mut self) { + self.service.check_for_request_timeouts(); + } +} + +#[cfg(test)] +mod tests { + use satrs::{ + pus::verification::VerificationReporterCfg, + spacepackets::{ + ecss::{ + tc::{PusTcCreator, PusTcSecondaryHeader}, + tm::PusTmReader, + WritablePusPacket, + }, + CcsdsPacket, SpHeader, + }, + }; + + use crate::{ + pus::tests::{TargetedPusRequestTestbench, TARGET_ID, TEST_APID, TEST_APID_TARGET_ID}, + requests::CompositeRequest, + }; + + use super::*; + + impl + TargetedPusRequestTestbench< + ExampleActionRequestConverter, + ActionReplyHandler, + DefaultActiveActionRequestMap, + ActivePusActionRequestStd, + ActionRequest, + ActionReplyPusWithActionId, + > + { + pub fn new_for_action() -> Self { + env_logger::init(); + let target_and_apid_id = TargetAndApidId::new(TEST_APID, TEST_APID_TARGET_ID); + let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); + let (pus_action_tx, pus_action_rx) = mpsc::channel(); + let (action_reply_tx, action_reply_rx) = mpsc::channel(); + let (action_req_tx, action_req_rx) = mpsc::channel(); + let verif_reporter_cfg = VerificationReporterCfg::new(TEST_APID, 2, 2, 64).unwrap(); + let tm_as_vec_sender = + TmAsVecSenderWithMpsc::new(1, "VERIF_SENDER", tm_funnel_tx.clone()); + let verif_reporter = + VerificationReporterWithVecMpscSender::new(&verif_reporter_cfg, tm_as_vec_sender); + let mut generic_req_router = GenericRequestRouter::default(); + generic_req_router + .0 + .insert(target_and_apid_id.into(), action_req_tx); + let action_srv_tm_sender = + TmAsVecSenderWithId::new(0, "TESTBENCH", tm_funnel_tx.clone()); + let action_srv_receiver = MpscTcReceiver::new(0, "TESTBENCH", pus_action_rx); + Self { + service: PusTargetedRequestService::new( + PusServiceHelper::new( + action_srv_receiver, + action_srv_tm_sender.clone(), + PUS_APID, + verif_reporter.clone(), + EcssTcInVecConverter::default(), + ), + ExampleActionRequestConverter::default(), + DefaultActiveActionRequestMap::default(), + ActionReplyHandler::default(), + generic_req_router, + action_reply_rx, + ), + verif_reporter, + pus_packet_tx: pus_action_tx, + tm_funnel_rx, + reply_tx: action_reply_tx, + request_rx: action_req_rx, + } + } + + pub fn verify_packet_verification(&self, subservice: u8) { + let next_tm = self.tm_funnel_rx.try_recv().unwrap(); + let verif_tm = PusTmReader::new(&next_tm, 7).unwrap().0; + assert_eq!(verif_tm.apid(), TEST_APID); + assert_eq!(verif_tm.service(), 1); + assert_eq!(verif_tm.subservice(), subservice); + } + + pub fn verify_tm_empty(&self) { + let packet = self.tm_funnel_rx.try_recv(); + if let Err(mpsc::TryRecvError::Empty) = packet { + } else { + let tm = packet.unwrap(); + let unexpected_tm = PusTmReader::new(&tm, 7).unwrap().0; + panic!("unexpected TM packet {unexpected_tm:?}"); + } + } + + pub fn verify_next_tc_is_handled_properly(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_handle_next_tc(time_stamp); + assert!(result.is_ok()); + let result = result.unwrap(); + match result { + PusPacketHandlerResult::RequestHandled => (), + _ => panic!("unexpected result {result:?}"), + } + } + + pub fn verify_all_tcs_handled(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_handle_next_tc(time_stamp); + assert!(result.is_ok()); + let result = result.unwrap(); + match result { + PusPacketHandlerResult::Empty => (), + _ => panic!("unexpected result {result:?}"), + } + } + + pub fn verify_next_reply_is_handled_properly(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_check_next_reply(time_stamp); + assert!(result.is_ok()); + assert!(!result.unwrap()); + } + + pub fn verify_all_replies_handled(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_check_next_reply(time_stamp); + assert!(result.is_ok()); + assert!(result.unwrap()); + } + + pub fn add_tc(&mut self, tc: &PusTcCreator) { + let token = self.verif_reporter.add_tc(tc); + let accepted_token = self + .verif_reporter + .acceptance_success(token, &[0; 7]) + .expect("TC acceptance failed"); + let next_tm = self.tm_funnel_rx.try_recv().unwrap(); + let verif_tm = PusTmReader::new(&next_tm, 7).unwrap().0; + assert_eq!(verif_tm.apid(), TEST_APID); + assert_eq!(verif_tm.service(), 1); + assert_eq!(verif_tm.subservice(), 1); + if let Err(mpsc::TryRecvError::Empty) = self.tm_funnel_rx.try_recv() { + } else { + let unexpected_tm = PusTmReader::new(&next_tm, 7).unwrap().0; + panic!("unexpected TM packet {unexpected_tm:?}"); + } + self.pus_packet_tx + .send(EcssTcAndToken::new(tc.to_vec().unwrap(), accepted_token)) + .unwrap(); + } + } + + #[test] + fn test_basic_request() { + let mut testbench = TargetedPusRequestTestbench::new_for_action(); + // Create a basic action request and verify forwarding. + let mut sp_header = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap(); + let sec_header = PusTcSecondaryHeader::new_simple(8, 128); + let action_id = 5_u32; + let mut app_data: [u8; 8] = [0; 8]; + app_data[0..4].copy_from_slice(&TEST_APID_TARGET_ID.to_be_bytes()); + app_data[4..8].copy_from_slice(&action_id.to_be_bytes()); + let pus8_packet = PusTcCreator::new(&mut sp_header, sec_header, &app_data, true); + testbench.add_tc(&pus8_packet); + let time_stamp: [u8; 7] = [0; 7]; + testbench.verify_next_tc_is_handled_properly(&time_stamp); + testbench.verify_all_tcs_handled(&time_stamp); + + testbench.verify_packet_verification(3); + + let possible_req = testbench.request_rx.try_recv(); + assert!(possible_req.is_ok()); + let req = possible_req.unwrap(); + if let CompositeRequest::Action(action_req) = req.targeted_request.message { + assert_eq!(action_req.action_id, action_id); + assert_eq!(action_req.variant, ActionRequestVariant::NoData); + let action_reply = + ActionReplyPusWithActionId::new(action_id, ActionReplyPus::Completed); + testbench + .reply_tx + .send(GenericMessage::new( + req.targeted_request.request_id, + TARGET_ID.into(), + action_reply, + )) + .unwrap(); + } else { + panic!("unexpected request type"); + } + testbench.verify_next_reply_is_handled_properly(&time_stamp); + testbench.verify_all_replies_handled(&time_stamp); + + testbench.verify_packet_verification(7); + testbench.verify_tm_empty(); + } } diff --git a/satrs-example/src/pus/hk.rs b/satrs-example/src/pus/hk.rs index 5abf03d..e9a9221 100644 --- a/satrs-example/src/pus/hk.rs +++ b/satrs-example/src/pus/hk.rs @@ -357,4 +357,8 @@ impl< } } } + + pub fn check_for_request_timeouts(&mut self) { + self.service.check_for_request_timeouts(); + } } diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index b841c94..d155a77 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -81,6 +81,13 @@ impl PusReceiver bool; + fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> bool; + fn check_for_request_timeouts(&mut self); +} + /// This is a generic handler class for all PUS services where a PUS telecommand is converted /// to a targeted request. /// @@ -200,7 +207,6 @@ where } }; let verif_request_id = verification::RequestId::new(&tc).raw(); - //if let Err(e) = match self.request_router.route( request_info.target_id(), verif_request_id, @@ -275,10 +281,10 @@ where match self.reply_receiver.try_recv() { Ok(reply) => { self.handle_reply(&reply, time_stamp)?; - Ok(true) + Ok(false) } Err(e) => match e { - mpsc::TryRecvError::Empty => Ok(false), + mpsc::TryRecvError::Empty => Ok(true), mpsc::TryRecvError::Disconnected => Err(EcssTmtcError::Receive( GenericReceiveError::TxDisconnected(None), )), @@ -434,3 +440,50 @@ impl PusReceiver, + ReplyHandler: PusReplyHandler, + ActiveRequestMap: ActiveRequestMapProvider, + ActiveRequestInfo: ActiveRequestProvider, + RequestType, + ReplyType, + > { + pub service: PusTargetedRequestService< + MpscTcReceiver, + TmAsVecSenderWithMpsc, + EcssTcInVecConverter, + VerificationReporterWithVecMpscSender, + RequestConverter, + ReplyHandler, + ActiveRequestMap, + ActiveRequestInfo, + RequestType, + ReplyType, + >, + pub verif_reporter: VerificationReporterWithVecMpscSender, + pub tm_funnel_rx: mpsc::Receiver>, + pub pus_packet_tx: mpsc::Sender, + pub reply_tx: mpsc::Sender>, + pub request_rx: mpsc::Receiver, + } +} diff --git a/satrs-example/src/pus/stack.rs b/satrs-example/src/pus/stack.rs index f8269d1..1d3f667 100644 --- a/satrs-example/src/pus/stack.rs +++ b/satrs-example/src/pus/stack.rs @@ -8,7 +8,7 @@ use satrs::{ use super::{ action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11Wrapper, - test::Service17CustomWrapper, + test::Service17CustomWrapper, TargetedPusService, }; pub struct PusStack< @@ -86,8 +86,8 @@ impl< ); if nothing_to_do { // Timeout checking is only done once. - self.action_srv_wrapper.service.check_for_request_timeouts(); - self.hk_srv_wrapper.service.check_for_request_timeouts(); + self.action_srv_wrapper.check_for_request_timeouts(); + self.hk_srv_wrapper.check_for_request_timeouts(); break; } } diff --git a/satrs-example/src/requests.rs b/satrs-example/src/requests.rs index db9370b..8354d14 100644 --- a/satrs-example/src/requests.rs +++ b/satrs-example/src/requests.rs @@ -23,12 +23,12 @@ pub enum CompositeRequest { } #[derive(Clone, Debug)] -pub struct RequestWithToken { +pub struct CompositeRequestWithToken { pub(crate) targeted_request: GenericMessage, pub(crate) token: VerificationToken, } -impl RequestWithToken { +impl CompositeRequestWithToken { pub fn new( target_id: ComponentId, request_id: RequestId, @@ -43,7 +43,7 @@ impl RequestWithToken { } #[derive(Default, Clone)] -pub struct GenericRequestRouter(pub HashMap>); +pub struct GenericRequestRouter(pub HashMap>); impl GenericRequestRouter { pub(crate) fn handle_error_generic( @@ -94,7 +94,7 @@ impl PusRequestRouter for GenericRequestRouter { ) -> Result<(), Self::Error> { if let Some(sender) = self.0.get(&target_id) { sender - .send(RequestWithToken::new( + .send(CompositeRequestWithToken::new( target_id, request_id, CompositeRequest::Hk(hk_request), @@ -117,8 +117,9 @@ impl PusRequestRouter for GenericRequestRouter { token: VerificationToken, ) -> Result<(), Self::Error> { if let Some(sender) = self.0.get(&target_id) { + println!("routed action request"); sender - .send(RequestWithToken::new( + .send(CompositeRequestWithToken::new( target_id, request_id, CompositeRequest::Action(action_request), diff --git a/satrs/src/pus/mod.rs b/satrs/src/pus/mod.rs index 94bec2a..920fb74 100644 --- a/satrs/src/pus/mod.rs +++ b/satrs/src/pus/mod.rs @@ -60,7 +60,7 @@ impl<'tm> From> for PusTmWrapper<'tm> { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum EcssTmtcError { Store(StoreError), ByteConversion(ByteConversionError), @@ -658,6 +658,7 @@ pub mod std_mod { use spacepackets::ecss::WritablePusPacket; use spacepackets::time::StdTimestampError; use spacepackets::ByteConversionError; + use std::println; use std::string::String; use std::sync::mpsc; use std::sync::mpsc::TryRecvError; diff --git a/satrs/src/queue.rs b/satrs/src/queue.rs index 2ad4f32..93c8ec8 100644 --- a/satrs/src/queue.rs +++ b/satrs/src/queue.rs @@ -10,7 +10,7 @@ use crate::ComponentId; pub type ChannelId = u32; /// Generic error type for sending something via a message queue. -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum GenericSendError { RxDisconnected, QueueFull(Option), @@ -37,7 +37,7 @@ impl Display for GenericSendError { impl Error for GenericSendError {} /// Generic error type for sending something via a message queue. -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum GenericReceiveError { Empty, TxDisconnected(Option), diff --git a/satrs/src/request.rs b/satrs/src/request.rs index 03d0690..65fd8e4 100644 --- a/satrs/src/request.rs +++ b/satrs/src/request.rs @@ -31,7 +31,7 @@ pub struct TargetAndApidId { } impl TargetAndApidId { - pub fn new(apid: Apid, target: u32) -> Self { + pub const fn new(apid: Apid, target: u32) -> Self { Self { apid, target } }