Compare commits

..

1 Commits

Author SHA1 Message Date
1c22afef31 Major refactoring and update of PUS module
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit
2024-04-02 16:50:09 +02:00
14 changed files with 260 additions and 217 deletions

View File

@ -9,7 +9,9 @@ use satrs_example::TimeStampHelper;
use std::sync::mpsc::{self};
use std::sync::{Arc, Mutex};
use satrs::mode::{ModeAndSubmode, ModeProvider, ModeReply, ModeRequest, ModeRequestHandler};
use satrs::mode::{
ModeAndSubmode, ModeError, ModeProvider, ModeReply, ModeRequest, ModeRequestHandler,
};
use satrs::pus::{EcssTmSenderCore, PusTmVariant};
use satrs::request::{GenericMessage, MessageMetadata, UniqueApidTargetId};
use satrs_example::config::components::PUS_MODE_SERVICE;
@ -42,17 +44,22 @@ pub struct MgmData {
pub z: f32,
}
pub struct MpscModeLeafInterface {
pub request_rx: mpsc::Receiver<GenericMessage<ModeRequest>>,
pub reply_tx_to_pus: mpsc::Sender<GenericMessage<ModeReply>>,
pub reply_tx_to_parent: mpsc::Sender<GenericMessage<ModeReply>>,
}
#[derive(new)]
#[allow(clippy::too_many_arguments)]
pub struct MgmHandler<ComInterface: SpiInterface, TmSender: EcssTmSenderCore> {
id: UniqueApidTargetId,
dev_str: &'static str,
mode_request_receiver: mpsc::Receiver<GenericMessage<ModeRequest>>,
mode_interface: MpscModeLeafInterface,
composite_request_receiver: mpsc::Receiver<GenericMessage<CompositeRequest>>,
mode_reply_sender_to_pus: mpsc::Sender<GenericMessage<ModeReply>>,
mode_reply_sender_to_parent: mpsc::Sender<GenericMessage<ModeReply>>,
hk_reply_sender: mpsc::Sender<GenericMessage<HkReply>>,
hk_tm_sender: TmSender,
spi_interface: ComInterface,
tm_sender: TmSender,
com_interface: ComInterface,
shared_mgm_set: Arc<Mutex<MgmData>>,
#[new(value = "ModeAndSubmode::new(satrs_example::DeviceMode::Off as u32, 0)")]
mode: ModeAndSubmode,
@ -92,7 +99,7 @@ impl<ComInterface: SpiInterface, TmSender: EcssTmSenderCore> MgmHandler<ComInter
&mgm_data_serialized,
true,
);
self.hk_tm_sender
self.tm_sender
.send_tm(self.id.id(), PusTmVariant::Direct(hk_tm))
.expect("failed to send HK TM");
}
@ -100,20 +107,24 @@ impl<ComInterface: SpiInterface, TmSender: EcssTmSenderCore> MgmHandler<ComInter
HkRequestVariant::DisablePeriodic => todo!(),
HkRequestVariant::ModifyCollectionInterval(_) => todo!(),
},
// This object does not have actions (yet).. Still send back completion failure
// TODO: This object does not have actions (yet).. Still send back completion failure
// reply.
CompositeRequest::Action(action_req) => {}
CompositeRequest::Action(_action_req) => {}
},
Err(_) => todo!(),
}
match self.mode_request_receiver.try_recv() {
Ok(msg) => match msg.message {
ModeRequest::SetMode(_) => todo!(),
ModeRequest::ReadMode => todo!(),
ModeRequest::AnnounceMode => todo!(),
ModeRequest::AnnounceModeRecursive => todo!(),
ModeRequest::ModeInfo(_) => todo!(),
},
match self.mode_interface.request_rx.try_recv() {
Ok(msg) => {
let result = self.handle_mode_request(msg);
// TODO: Trigger event?
if result.is_err() {
log::warn!(
"{}: mode request failed with error {:?}",
self.dev_str,
result.err().unwrap()
);
}
}
Err(_) => todo!(),
}
}
@ -130,6 +141,7 @@ impl<ComInterface: SpiInterface, TmSender: EcssTmSenderCore> ModeProvider
impl<ComInterface: SpiInterface, TmSender: EcssTmSenderCore> ModeRequestHandler
for MgmHandler<ComInterface, TmSender>
{
type Error = ModeError;
fn start_transition(
&mut self,
requestor: MessageMetadata,
@ -147,7 +159,7 @@ impl<ComInterface: SpiInterface, TmSender: EcssTmSenderCore> ModeRequestHandler
fn handle_mode_reached(
&mut self,
requestor: Option<MessageMetadata>,
) -> Result<(), satrs::queue::GenericTargetedMessagingError> {
) -> Result<(), Self::Error> {
if let Some(requestor) = requestor {
if requestor.sender_id() == PUS_MODE_SERVICE.raw() {
// self.mode_reply_sender_to_pus.send(
@ -162,7 +174,15 @@ impl<ComInterface: SpiInterface, TmSender: EcssTmSenderCore> ModeRequestHandler
&self,
_requestor: MessageMetadata,
_reply: ModeReply,
) -> Result<(), satrs::queue::GenericTargetedMessagingError> {
) -> Result<(), Self::Error> {
Ok(())
}
fn handle_mode_info(
&mut self,
_requestor_info: MessageMetadata,
_info: ModeAndSubmode,
) -> Result<(), Self::Error> {
Ok(())
}
}

View File

@ -115,7 +115,7 @@ impl<TmSender: EcssTmSenderCore> PusEventHandler<TmSender> {
if let Ok((event, _param)) = self.pus_event_man_rx.try_recv() {
update_time(&mut self.time_provider, &mut self.timestamp);
self.pus_event_dispatcher
.generate_pus_event_tm_generic(&mut self.tm_sender, &self.timestamp, event, None)
.generate_pus_event_tm_generic(&self.tm_sender, &self.timestamp, event, None)
.expect("Sending TM as event failed");
}
}

View File

@ -27,7 +27,7 @@ use satrs_example::config::{OBSW_SERVER_ADDR, SERVER_PORT};
use tmtc::PusTcSourceProviderDynamic;
use udp::DynamicUdpTmHandler;
use crate::acs::mgm::{MgmHandler, SpiDummyInterface};
use crate::acs::mgm::{MgmHandler, MpscModeLeafInterface, SpiDummyInterface};
use crate::ccsds::CcsdsReceiver;
use crate::logger::setup_logger;
use crate::pus::action::{create_action_service_dynamic, create_action_service_static};
@ -206,13 +206,16 @@ fn static_tmtc_pool_main() {
let dummy_spi_interface = SpiDummyInterface::default();
let shared_mgm_set = Arc::default();
let mode_leaf_interface = MpscModeLeafInterface {
request_rx: mgm_handler_mode_rx,
reply_tx_to_pus: pus_mode_reply_tx,
reply_tx_to_parent: mgm_handler_mode_reply_to_parent_tx,
};
let mut mgm_handler = MgmHandler::new(
MGM_HANDLER_0,
"MGM_0",
mgm_handler_mode_rx,
mode_leaf_interface,
mgm_handler_composite_rx,
pus_mode_reply_tx,
mgm_handler_mode_reply_to_parent_tx,
pus_hk_reply_tx,
tm_funnel_tx,
dummy_spi_interface,
@ -421,13 +424,16 @@ fn dyn_tmtc_pool_main() {
mpsc::channel();
let dummy_spi_interface = SpiDummyInterface::default();
let shared_mgm_set = Arc::default();
let mode_leaf_interface = MpscModeLeafInterface {
request_rx: mgm_handler_mode_rx,
reply_tx_to_pus: pus_mode_reply_tx,
reply_tx_to_parent: mgm_handler_mode_reply_to_parent_tx,
};
let mut mgm_handler = MgmHandler::new(
MGM_HANDLER_0,
"MGM_0",
mgm_handler_mode_rx,
mode_leaf_interface,
mgm_handler_composite_rx,
pus_mode_reply_tx,
mgm_handler_mode_reply_to_parent_tx,
pus_hk_reply_tx,
tm_funnel_tx,
dummy_spi_interface,

View File

@ -324,6 +324,7 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter> Targete
#[cfg(test)]
mod tests {
use satrs::pus::test_util::{TEST_APID, TEST_COMPONENT_ID, TEST_UNIQUE_ID};
use satrs::pus::verification;
use satrs::pus::verification::test_util::TestVerificationReporter;
use satrs::request::MessageMetadata;
use satrs::{
@ -334,7 +335,7 @@ mod tests {
tm::PusTmReader,
WritablePusPacket,
},
CcsdsPacket, SpHeader,
SpHeader,
},
};
@ -365,7 +366,7 @@ mod tests {
let mut generic_req_router = GenericRequestRouter::default();
generic_req_router
.composite_router_map
.insert(TEST_APID.into(), action_req_tx);
.insert(TEST_COMPONENT_ID.id(), action_req_tx);
Self {
service: PusTargetedRequestService::new(
PusServiceHelper::new(
@ -381,6 +382,7 @@ mod tests {
generic_req_router,
action_reply_rx,
),
request_id: None,
pus_packet_tx: pus_action_tx,
tm_funnel_rx,
reply_tx: action_reply_tx,
@ -388,12 +390,26 @@ mod tests {
}
}
pub fn verify_packet_verification(&self, subservice: u8) {
let next_tm = self.tm_funnel_rx.try_recv().unwrap();
let verif_tm = PusTmReader::new(&next_tm.packet, 7).unwrap().0;
assert_eq!(verif_tm.apid(), TEST_APID);
assert_eq!(verif_tm.service(), 1);
assert_eq!(verif_tm.subservice(), subservice);
pub fn verify_packet_started(&self) {
self.service
.service_helper
.common
.verif_reporter
.check_next_is_started_success(
self.service.service_helper.id(),
self.request_id.expect("request ID not set").into(),
);
}
pub fn verify_packet_completed(&self) {
self.service
.service_helper
.common
.verif_reporter
.check_next_is_completion_success(
self.service.service_helper.id(),
self.request_id.expect("request ID not set").into(),
);
}
pub fn verify_tm_empty(&self) {
@ -408,7 +424,9 @@ mod tests {
pub fn verify_next_tc_is_handled_properly(&mut self, time_stamp: &[u8]) {
let result = self.service.poll_and_handle_next_tc(time_stamp);
assert!(result.is_ok());
if let Err(e) = result {
panic!("unexpected error {:?}", e);
}
let result = result.unwrap();
match result {
PusPacketHandlerResult::RequestHandled => (),
@ -418,7 +436,9 @@ mod tests {
pub fn verify_all_tcs_handled(&mut self, time_stamp: &[u8]) {
let result = self.service.poll_and_handle_next_tc(time_stamp);
assert!(result.is_ok());
if let Err(e) = result {
panic!("unexpected error {:?}", e);
}
let result = result.unwrap();
match result {
PusPacketHandlerResult::Empty => (),
@ -439,6 +459,7 @@ mod tests {
}
pub fn add_tc(&mut self, tc: &PusTcCreator) {
self.request_id = Some(verification::RequestId::new(tc).into());
let token = self.service.service_helper.verif_reporter_mut().add_tc(tc);
let accepted_token = self
.service
@ -451,16 +472,15 @@ mod tests {
&[0; 7],
)
.expect("TC acceptance failed");
let next_tm = self.tm_funnel_rx.try_recv().unwrap();
let verif_tm = PusTmReader::new(&next_tm.packet, 7).unwrap().0;
assert_eq!(verif_tm.apid(), TEST_APID);
assert_eq!(verif_tm.service(), 1);
assert_eq!(verif_tm.subservice(), 1);
if let Err(mpsc::TryRecvError::Empty) = self.tm_funnel_rx.try_recv() {
} else {
let unexpected_tm = PusTmReader::new(&next_tm.packet, 7).unwrap().0;
panic!("unexpected TM packet {unexpected_tm:?}");
}
self.service
.service_helper
.verif_reporter()
.check_next_was_added(accepted_token.request_id());
let id = self.service.service_helper.id();
self.service
.service_helper
.verif_reporter()
.check_next_is_acceptance_success(id, accepted_token.request_id());
self.pus_packet_tx
.send(EcssTcAndToken::new(tc.to_vec().unwrap(), accepted_token))
.unwrap();
@ -483,7 +503,7 @@ mod tests {
testbench.verify_next_tc_is_handled_properly(&time_stamp);
testbench.verify_all_tcs_handled(&time_stamp);
testbench.verify_packet_verification(3);
testbench.verify_packet_started();
let possible_req = testbench.request_rx.try_recv();
assert!(possible_req.is_ok());
@ -502,7 +522,7 @@ mod tests {
testbench.verify_next_reply_is_handled_properly(&time_stamp);
testbench.verify_all_replies_handled(&time_stamp);
testbench.verify_packet_verification(7);
testbench.verify_packet_completed();
testbench.verify_tm_empty();
}
@ -597,7 +617,7 @@ mod tests {
assert!(result.is_ok());
assert!(result.unwrap());
testbench.verif_reporter.assert_full_completion_success(
TEST_COMPONENT_ID.raw(),
TEST_COMPONENT_ID.id(),
req_id,
None,
);
@ -678,13 +698,15 @@ mod tests {
testbench.verif_reporter.check_next_was_added(req_id);
testbench
.verif_reporter
.check_next_is_acceptance_success(TEST_COMPONENT_ID.raw(), req_id);
.check_next_is_acceptance_success(TEST_COMPONENT_ID.id(), req_id);
testbench
.verif_reporter
.check_next_is_started_success(TEST_COMPONENT_ID.raw(), req_id);
testbench
.verif_reporter
.check_next_is_step_success(TEST_COMPONENT_ID.raw(), req_id, 1);
.check_next_is_started_success(TEST_COMPONENT_ID.id(), req_id);
testbench.verif_reporter.check_next_is_step_failure(
TEST_COMPONENT_ID.id(),
req_id,
error_code.raw().into(),
);
}
#[test]

View File

@ -464,6 +464,7 @@ pub(crate) mod tests {
use satrs::pus::test_util::TEST_COMPONENT_ID;
use satrs::pus::{MpscTmAsVecSender, PusTmAsVec, PusTmVariant};
use satrs::request::RequestId;
use satrs::{
pus::{
verification::test_util::TestVerificationReporter, ActivePusRequestStd,
@ -701,6 +702,7 @@ pub(crate) mod tests {
RequestType,
ReplyType,
>,
pub request_id: Option<RequestId>,
pub tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
pub pus_packet_tx: mpsc::Sender<EcssTcAndToken>,
pub reply_tx: mpsc::Sender<GenericMessage<ReplyType>>,

View File

@ -166,10 +166,6 @@ impl<R: MessageReceiver<ModeRequest>> ModeRequestReceiver
}
}
pub trait ModeProvider {
fn mode_and_submode(&self) -> ModeAndSubmode;
}
#[derive(Debug, Clone)]
pub enum ModeError {
Messaging(GenericTargetedMessagingError),
@ -181,23 +177,61 @@ impl From<GenericTargetedMessagingError> for ModeError {
}
}
pub trait ModeProvider {
fn mode_and_submode(&self) -> ModeAndSubmode;
}
pub trait ModeRequestHandler: ModeProvider {
type Error;
fn start_transition(
&mut self,
requestor: MessageMetadata,
mode_and_submode: ModeAndSubmode,
) -> Result<(), ModeError>;
) -> Result<(), Self::Error>;
fn announce_mode(&self, requestor_info: MessageMetadata, recursive: bool);
fn handle_mode_reached(
&mut self,
requestor_info: Option<MessageMetadata>,
) -> Result<(), GenericTargetedMessagingError>;
) -> Result<(), Self::Error>;
fn handle_mode_info(
&mut self,
requestor_info: MessageMetadata,
info: ModeAndSubmode,
) -> Result<(), Self::Error>;
fn send_mode_reply(
&self,
requestor_info: MessageMetadata,
reply: ModeReply,
) -> Result<(), GenericTargetedMessagingError>;
) -> Result<(), Self::Error>;
fn handle_mode_request(
&mut self,
request: GenericMessage<ModeRequest>,
) -> Result<(), Self::Error> {
match request.message {
ModeRequest::SetMode(mode_and_submode) => {
self.start_transition(request.requestor_info, mode_and_submode)
}
ModeRequest::ReadMode => self.send_mode_reply(
request.requestor_info,
ModeReply::ModeReply(self.mode_and_submode()),
),
ModeRequest::AnnounceMode => {
self.announce_mode(request.requestor_info, false);
Ok(())
}
ModeRequest::AnnounceModeRecursive => {
self.announce_mode(request.requestor_info, true);
Ok(())
}
ModeRequest::ModeInfo(info) => self.handle_mode_info(request.requestor_info, info),
}
}
}
pub trait ModeReplyReceiver {
@ -255,23 +289,6 @@ pub mod alloc_mod {
}
}
/*
impl<S: MessageSender<ModeReply>> ModeReplySender for MessageSenderMapWithId<ModeReply, S> {
fn send_mode_reply(
&self,
request_id: RequestId,
target_channel_id: ComponentId,
reply: ModeReply,
) -> Result<(), GenericTargetedMessagingError> {
self.send_message(request_id, target_channel_id, reply)
}
fn local_channel_id(&self) -> ComponentId {
self.local_channel_id
}
}
*/
impl<FROM, S: MessageSender<ModeReply>, R: MessageReceiver<FROM>> ModeReplySender
for MessageSenderAndReceiver<ModeReply, FROM, S, R>
{

View File

@ -234,6 +234,7 @@ mod alloc_mod {
mod tests {
use super::*;
use crate::events::{EventU32, Severity};
use crate::pus::test_util::TEST_COMPONENT_ID;
use crate::pus::tests::CommonTmInfo;
use crate::pus::{ChannelWithId, PusTmVariant};
use crate::ComponentId;
@ -250,6 +251,7 @@ mod tests {
#[derive(Debug, Eq, PartialEq, Clone)]
struct TmInfo {
pub sender_id: ComponentId,
pub common: CommonTmInfo,
pub event: EventU32,
pub aux_data: Vec<u8>,
@ -283,6 +285,7 @@ mod tests {
aux_data.extend_from_slice(&src_data[4..]);
}
self.service_queue.borrow_mut().push_back(TmInfo {
sender_id,
common: CommonTmInfo::new_from_tm(&tm),
event,
aux_data,
@ -340,7 +343,8 @@ mod tests {
error_data: Option<&[u8]>,
) {
let mut sender = TestSender::default();
let reporter = EventReporter::new(0, EXAMPLE_APID, max_event_aux_data_buf);
let reporter =
EventReporter::new(TEST_COMPONENT_ID.id(), EXAMPLE_APID, max_event_aux_data_buf);
assert!(reporter.is_some());
let mut reporter = reporter.unwrap();
let time_stamp_empty: [u8; 7] = [0; 7];
@ -370,6 +374,7 @@ mod tests {
assert_eq!(tm_info.common.msg_counter, 0);
assert_eq!(tm_info.common.apid, EXAMPLE_APID);
assert_eq!(tm_info.event, event);
assert_eq!(tm_info.sender_id, TEST_COMPONENT_ID.id());
assert_eq!(tm_info.aux_data, error_copy);
}

View File

@ -283,10 +283,10 @@ mod tests {
#[test]
fn test_basic() {
let mut event_man = create_basic_man_1();
let (mut event_tx, event_rx) = mpsc::channel::<PusTmAsVec>();
let event_man = create_basic_man_1();
let (event_tx, event_rx) = mpsc::channel::<PusTmAsVec>();
let event_sent = event_man
.generate_pus_event_tm(&mut event_tx, &EMPTY_STAMP, INFO_EVENT, None)
.generate_pus_event_tm(&event_tx, &EMPTY_STAMP, INFO_EVENT, None)
.expect("Sending info event failed");
assert!(event_sent);

View File

@ -1610,7 +1610,7 @@ pub mod tests {
MpscTcReceiver,
MpscTmAsVecSender,
EcssTcInVecConverter,
TestVerificationReporter,
VerificationReporter,
>;
impl PusServiceHandlerWithVecCommon {
@ -1620,6 +1620,8 @@ pub mod tests {
let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel();
let (tm_tx, tm_rx) = mpsc::channel();
let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap();
let verification_handler = VerificationReporter::new(&verif_cfg);
let in_store_converter = EcssTcInVecConverter::default();
(
Self {
@ -1631,7 +1633,7 @@ pub mod tests {
id,
test_srv_tc_rx,
tm_tx,
TestVerificationReporter::default(),
verification_handler,
in_store_converter,
),
)

View File

@ -137,12 +137,12 @@ mod tests {
);
mode_connector.add_reply_target(TEST_COMPONENT_ID_2, reply_sender_to_channel_2);
// Send a request and verify it arrives at the receiver.
// Send a reply and verify it arrives at the receiver.
let request_id = 2;
let sent_reply = ModeReply::ModeReply(ModeAndSubmode::new(3, 5));
mode_connector
.send_mode_reply(
MessageMetadata::new(request_id, TEST_COMPONENT_ID_0),
MessageMetadata::new(request_id, TEST_COMPONENT_ID_2),
sent_reply,
)
.expect("send failed");

View File

@ -162,7 +162,6 @@ mod tests {
use crate::pus::tests::{
PusServiceHandlerWithSharedStoreCommon, PusServiceHandlerWithVecCommon,
};
use crate::pus::verification::test_util::TestVerificationReporter;
use crate::pus::verification::{
RequestId, VerificationReporter, VerificationReportingProvider,
};
@ -244,7 +243,7 @@ mod tests {
MpscTcReceiver,
MpscTmAsVecSender,
EcssTcInVecConverter,
TestVerificationReporter,
VerificationReporter,
>,
}

View File

@ -1146,21 +1146,6 @@ pub mod test_util {
use super::*;
/*
#[derive(Clone)]
pub struct VerificationStatus {
pub accepted: Option<bool>,
pub started: Option<bool>,
pub step: u16,
pub step_status: Option<bool>,
pub completed: Option<bool>,
pub failure_data: Option<Vec<u8>>,
pub fail_enum: Option<u64>,
}
*/
//pub type SharedVerificationMap = Arc<Mutex<RefCell<HashMap<RequestId, VerificationStatus>>>>;
#[derive(Default)]
pub struct TestVerificationReporter {
pub report_queue: RefCell<VecDeque<(RequestId, VerificationReportInfo)>>,
@ -1366,98 +1351,99 @@ pub mod test_util {
}
impl TestVerificationReporter {
pub fn check_next_was_added(&mut self, request_id: RequestId) {
pub fn check_next_was_added(&self, request_id: RequestId) {
let (last_report_req_id, info) = self
.report_queue
.get_mut()
.borrow_mut()
.pop_front()
.expect("report queue is empty");
assert_eq!(request_id, last_report_req_id);
assert_eq!(info, VerificationReportInfo::Added);
}
pub fn check_next_is_acceptance_success(
&mut self,
sender_id: ComponentId,
req_id: RequestId,
) {
pub fn check_next_is_acceptance_success(&self, sender_id: ComponentId, req_id: RequestId) {
let (last_report_req_id, info) = self
.report_queue
.get_mut()
.borrow_mut()
.pop_front()
.expect("report queue is empty");
assert_eq!(req_id, last_report_req_id);
if let VerificationReportInfo::AcceptanceSuccess(data) = info {
assert_eq!(data.sender, sender_id);
return;
}
panic!("next message is not acceptance success message")
}
pub fn check_next_is_started_success(&mut self, sender_id: ComponentId, req_id: RequestId) {
pub fn check_next_is_started_success(&self, sender_id: ComponentId, req_id: RequestId) {
let (last_report_req_id, info) = self
.report_queue
.get_mut()
.borrow_mut()
.pop_front()
.expect("report queue is empty");
assert_eq!(req_id, last_report_req_id);
if let VerificationReportInfo::StartedSuccess(data) = info {
assert_eq!(data.sender, sender_id);
return;
}
panic!("next message is not start success message")
}
pub fn check_next_is_step_success(
&mut self,
&self,
sender_id: ComponentId,
request_id: RequestId,
expected_step: u16,
) {
let (last_report_req_id, info) = self
.report_queue
.get_mut()
.borrow_mut()
.pop_front()
.expect("report queue is empty");
assert_eq!(request_id, last_report_req_id);
if let VerificationReportInfo::StepSuccess { data, step } = info {
assert_eq!(data.sender, sender_id);
assert_eq!(expected_step, step);
return;
}
panic!("next message is not step success message")
panic!("next message is not step success message: {info:?}")
}
pub fn check_next_is_step_failure(
&mut self,
&self,
sender_id: ComponentId,
request_id: RequestId,
error_code: u64,
) {
let (last_report_req_id, info) = self
.report_queue
.get_mut()
.borrow_mut()
.pop_front()
.expect("report queue is empty");
assert_eq!(request_id, last_report_req_id);
if let VerificationReportInfo::StepFailure(data) = info {
assert_eq!(data.sender, sender_id);
assert_eq!(data.error_enum, error_code);
return;
}
panic!("next message is not step success message")
panic!("next message is not step failure message")
}
pub fn check_next_is_completion_success(
&mut self,
&self,
sender_id: ComponentId,
request_id: RequestId,
) {
let (last_report_req_id, info) = self
.report_queue
.get_mut()
.borrow_mut()
.pop_front()
.expect("report queue is empty");
assert_eq!(request_id, last_report_req_id);
if let VerificationReportInfo::CompletionSuccess(data) = info {
assert_eq!(data.sender, sender_id);
return;
}
panic!("next message is not completion success message")
panic!("next message is not completion success message: {info:?}")
}
pub fn check_next_is_completion_failure(
@ -1475,8 +1461,9 @@ pub mod test_util {
if let VerificationReportInfo::CompletionFailure(data) = info {
assert_eq!(data.sender, sender_id);
assert_eq!(data.error_enum, error_code);
return;
}
panic!("next message is not completion success message")
panic!("next message is not completion failure message: {info:?}")
}
pub fn assert_full_completion_success(
@ -1487,6 +1474,7 @@ pub mod test_util {
) {
self.check_next_was_added(request_id);
self.check_next_is_acceptance_success(sender_id, request_id);
self.check_next_is_started_success(sender_id, request_id);
if let Some(highest_num) = expected_steps {
for i in 0..highest_num {
self.check_next_is_step_success(sender_id, request_id, i);
@ -1504,6 +1492,7 @@ pub mod test_util {
) {
self.check_next_was_added(request_id);
self.check_next_is_acceptance_success(sender_id, request_id);
self.check_next_is_started_success(sender_id, request_id);
if let Some(highest_num) = expected_steps {
for i in 0..highest_num {
self.check_next_is_step_success(sender_id, request_id, i);
@ -1580,13 +1569,14 @@ pub mod test_util {
#[cfg(test)]
pub mod tests {
use crate::pool::{StaticMemoryPool, StaticPoolConfig};
use crate::pus::test_util::TEST_APID;
use crate::pus::test_util::{TEST_APID, TEST_COMPONENT_ID};
use crate::pus::tests::CommonTmInfo;
use crate::pus::verification::{
EcssTmSenderCore, EcssTmtcError, FailParams, FailParamsWithStep, RequestId, TcStateNone,
VerificationReporter, VerificationReporterCfg, VerificationToken,
};
use crate::pus::{ChannelWithId, MpscTmInSharedPoolSender, PusTmVariant};
use crate::request::MessageMetadata;
use crate::tmtc::tm_helper::SharedTmPool;
use crate::ComponentId;
use alloc::format;
@ -1615,8 +1605,8 @@ pub mod tests {
#[derive(Debug, Eq, PartialEq, Clone)]
struct TmInfo {
pub requestor: MessageMetadata,
pub common: CommonTmInfo,
pub req_id: RequestId,
pub additional_data: Option<Vec<u8>>,
}
@ -1656,8 +1646,8 @@ pub mod tests {
vec = Some(new_vec);
}
self.service_queue.borrow_mut().push_back(TmInfo {
requestor: MessageMetadata::new(req_id.into(), sender_id),
common: CommonTmInfo::new_from_tm(&tm),
req_id,
additional_data: vec,
});
Ok(())
@ -1667,10 +1657,10 @@ pub mod tests {
}
struct VerificationReporterTestbench {
id: ComponentId,
pub id: ComponentId,
sender: TestSender,
reporter: VerificationReporter,
request_id: RequestId,
pub request_id: RequestId,
tc: Vec<u8>,
}
@ -1681,7 +1671,7 @@ pub mod tests {
impl VerificationReporterTestbench {
fn new(id: ComponentId, tc: PusTcCreator) -> Self {
let mut reporter = base_reporter();
let reporter = base_reporter();
Self {
id,
sender: TestSender::default(),
@ -1691,6 +1681,7 @@ pub mod tests {
}
}
#[allow(dead_code)]
fn set_dest_id(&mut self, dest_id: u16) {
self.reporter.set_dest_id(dest_id);
}
@ -1774,6 +1765,7 @@ pub mod tests {
fn acceptance_check(&self, time_stamp: &[u8; 7]) {
let cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 1,
apid: TEST_APID,
@ -1782,7 +1774,6 @@ pub mod tests {
time_stamp: *time_stamp,
},
additional_data: None,
req_id: self.request_id,
};
let mut service_queue = self.sender.service_queue.borrow_mut();
assert_eq!(service_queue.len(), 1);
@ -1792,6 +1783,7 @@ pub mod tests {
fn acceptance_fail_check(&mut self, stamp_buf: [u8; 7]) {
let cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 2,
apid: TEST_APID,
@ -1800,7 +1792,6 @@ pub mod tests {
time_stamp: stamp_buf,
},
additional_data: Some([0, 2].to_vec()),
req_id: self.request_id,
};
let service_queue = self.sender.service_queue.get_mut();
assert_eq!(service_queue.len(), 1);
@ -1812,6 +1803,7 @@ pub mod tests {
let mut srv_queue = self.sender.service_queue.borrow_mut();
assert_eq!(srv_queue.len(), 2);
let mut cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 1,
apid: TEST_APID,
@ -1820,12 +1812,12 @@ pub mod tests {
time_stamp: EMPTY_STAMP,
},
additional_data: None,
req_id: self.request_id,
};
let mut info = srv_queue.pop_front().unwrap();
assert_eq!(info, cmp_info);
cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 4,
apid: TEST_APID,
@ -1834,7 +1826,6 @@ pub mod tests {
time_stamp: EMPTY_STAMP,
},
additional_data: Some([&[22], fail_data_raw.as_slice()].concat().to_vec()),
req_id: self.request_id,
};
info = srv_queue.pop_front().unwrap();
assert_eq!(info, cmp_info);
@ -1842,6 +1833,7 @@ pub mod tests {
fn step_success_check(&mut self, time_stamp: &[u8; 7]) {
let mut cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 1,
apid: TEST_APID,
@ -1850,12 +1842,12 @@ pub mod tests {
time_stamp: *time_stamp,
},
additional_data: None,
req_id: self.request_id,
};
let mut srv_queue = self.sender.service_queue.borrow_mut();
let mut info = srv_queue.pop_front().unwrap();
assert_eq!(info, cmp_info);
cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 3,
apid: TEST_APID,
@ -1864,11 +1856,11 @@ pub mod tests {
time_stamp: *time_stamp,
},
additional_data: None,
req_id: self.request_id,
};
info = srv_queue.pop_front().unwrap();
assert_eq!(info, cmp_info);
cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 5,
apid: TEST_APID,
@ -1877,11 +1869,11 @@ pub mod tests {
time_stamp: *time_stamp,
},
additional_data: Some([0].to_vec()),
req_id: self.request_id,
};
info = srv_queue.pop_front().unwrap();
assert_eq!(info, cmp_info);
cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 5,
apid: TEST_APID,
@ -1890,7 +1882,6 @@ pub mod tests {
time_stamp: *time_stamp,
},
additional_data: Some([1].to_vec()),
req_id: self.request_id,
};
info = srv_queue.pop_front().unwrap();
assert_eq!(info, cmp_info);
@ -1899,6 +1890,7 @@ pub mod tests {
fn check_step_failure(&mut self, fail_data_raw: [u8; 4]) {
assert_eq!(self.sender.service_queue.borrow().len(), 4);
let mut cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 1,
apid: TEST_APID,
@ -1907,12 +1899,12 @@ pub mod tests {
time_stamp: EMPTY_STAMP,
},
additional_data: None,
req_id: self.request_id,
};
let mut info = self.sender.service_queue.borrow_mut().pop_front().unwrap();
assert_eq!(info, cmp_info);
cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 3,
apid: TEST_APID,
@ -1921,12 +1913,12 @@ pub mod tests {
time_stamp: [0, 1, 0, 1, 0, 1, 0],
},
additional_data: None,
req_id: self.request_id,
};
info = self.sender.service_queue.borrow_mut().pop_front().unwrap();
assert_eq!(info, cmp_info);
cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 5,
apid: TEST_APID,
@ -1935,12 +1927,12 @@ pub mod tests {
time_stamp: EMPTY_STAMP,
},
additional_data: Some([0].to_vec()),
req_id: self.request_id,
};
info = self.sender.service_queue.get_mut().pop_front().unwrap();
assert_eq!(info, cmp_info);
cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 6,
apid: TEST_APID,
@ -1957,7 +1949,6 @@ pub mod tests {
.concat()
.to_vec(),
),
req_id: self.request_id,
};
info = self.sender.service_queue.get_mut().pop_front().unwrap();
assert_eq!(info, cmp_info);
@ -1967,6 +1958,7 @@ pub mod tests {
assert_eq!(self.sender.service_queue.borrow().len(), 3);
let mut cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 1,
apid: TEST_APID,
@ -1975,12 +1967,12 @@ pub mod tests {
time_stamp: EMPTY_STAMP,
},
additional_data: None,
req_id: self.request_id,
};
let mut info = self.sender.service_queue.get_mut().pop_front().unwrap();
assert_eq!(info, cmp_info);
cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 3,
apid: TEST_APID,
@ -1989,12 +1981,12 @@ pub mod tests {
time_stamp: [0, 1, 0, 1, 0, 1, 0],
},
additional_data: None,
req_id: self.request_id,
};
info = self.sender.service_queue.get_mut().pop_front().unwrap();
assert_eq!(info, cmp_info);
cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 8,
apid: TEST_APID,
@ -2003,7 +1995,6 @@ pub mod tests {
time_stamp: EMPTY_STAMP,
},
additional_data: Some([0, 0, 0x10, 0x20].to_vec()),
req_id: self.request_id,
};
info = self.sender.service_queue.get_mut().pop_front().unwrap();
assert_eq!(info, cmp_info);
@ -2012,6 +2003,7 @@ pub mod tests {
fn completion_success_check(&mut self) {
assert_eq!(self.sender.service_queue.borrow().len(), 3);
let cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 1,
apid: TEST_APID,
@ -2020,12 +2012,12 @@ pub mod tests {
time_stamp: EMPTY_STAMP,
},
additional_data: None,
req_id: self.request_id,
};
let mut info = self.sender.service_queue.borrow_mut().pop_front().unwrap();
assert_eq!(info, cmp_info);
let cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 3,
apid: TEST_APID,
@ -2034,11 +2026,11 @@ pub mod tests {
time_stamp: [0, 1, 0, 1, 0, 1, 0],
},
additional_data: None,
req_id: self.request_id,
};
info = self.sender.service_queue.borrow_mut().pop_front().unwrap();
assert_eq!(info, cmp_info);
let cmp_info = TmInfo {
requestor: MessageMetadata::new(self.request_id.into(), self.id),
common: CommonTmInfo {
subservice: 7,
apid: TEST_APID,
@ -2047,7 +2039,6 @@ pub mod tests {
time_stamp: EMPTY_STAMP,
},
additional_data: None,
req_id: self.request_id,
};
info = self.sender.service_queue.borrow_mut().pop_front().unwrap();
assert_eq!(info, cmp_info);
@ -2158,6 +2149,7 @@ pub mod tests {
.acceptance_failure(init_token, fail_params)
.expect("sending acceptance failure failed");
let cmp_info = TmInfo {
requestor: MessageMetadata::new(testbench.request_id.into(), testbench.id),
common: CommonTmInfo {
subservice: 2,
apid: TEST_APID,
@ -2166,7 +2158,6 @@ pub mod tests {
time_stamp: EMPTY_STAMP,
},
additional_data: Some([10, 0, 0, 0, 12].to_vec()),
req_id: testbench.request_id,
};
let mut service_queue = testbench.sender.service_queue.borrow_mut();
assert_eq!(service_queue.len(), 1);
@ -2174,6 +2165,7 @@ pub mod tests {
assert_eq!(info, cmp_info);
}
#[test]
fn test_start_failure() {
let mut testbench = VerificationReporterTestbench::new(0, create_generic_ping());
let init_token = testbench.init();
@ -2283,9 +2275,9 @@ pub mod tests {
#[test]
fn test_complete_success_sequence() {
let mut testbench = VerificationReporterTestbench::new(0, create_generic_ping());
let mut testbench =
VerificationReporterTestbench::new(TEST_COMPONENT_ID.id(), create_generic_ping());
let token = testbench.init();
let mut sender = TestSender::default();
let accepted_token = testbench
.acceptance_success(token, &EMPTY_STAMP)
.expect("Sending acceptance success failed");

View File

@ -493,10 +493,10 @@ mod tests {
fn test_sender_map() {
let (sender0, receiver0) = mpsc::channel();
let (sender1, receiver1) = mpsc::channel();
let mut sender_map_with_id = MessageSenderMap::default();
sender_map_with_id.add_message_target(TEST_CHANNEL_ID_1, sender0);
sender_map_with_id.add_message_target(TEST_CHANNEL_ID_2, sender1);
sender_map_with_id
let mut sender_map = MessageSenderMap::default();
sender_map.add_message_target(TEST_CHANNEL_ID_1, sender0);
sender_map.add_message_target(TEST_CHANNEL_ID_2, sender1);
sender_map
.send_message(
MessageMetadata::new(1, TEST_CHANNEL_ID_0),
TEST_CHANNEL_ID_1,
@ -507,9 +507,9 @@ mod tests {
assert_eq!(reply.request_id(), 1);
assert_eq!(reply.sender_id(), TEST_CHANNEL_ID_0);
assert_eq!(reply.message, 5);
sender_map_with_id
sender_map
.send_message(
MessageMetadata::new(2, TEST_CHANNEL_ID_1),
MessageMetadata::new(2, TEST_CHANNEL_ID_0),
TEST_CHANNEL_ID_2,
10,
)

View File

@ -6,7 +6,7 @@ use satrs::mode::{
ModeRequestHandlerMpscBounded, ModeRequestReceiver, ModeRequestorAndHandlerMpscBounded,
ModeRequestorBoundedMpsc,
};
use satrs::request::{MessageMetadata, RequestId};
use satrs::request::MessageMetadata;
use satrs::{
mode::{ModeAndSubmode, ModeReply, ModeRequest},
queue::GenericTargetedMessagingError,
@ -45,29 +45,6 @@ struct TestDevice {
pub name: String,
pub mode_node: ModeRequestHandlerMpscBounded,
pub mode_and_submode: ModeAndSubmode,
pub mode_requestor_info: Option<MessageMetadata>,
}
fn mode_leaf_node_req_handler(
handler: &mut impl ModeRequestHandler,
request: GenericMessage<ModeRequest>,
) {
match request.message {
ModeRequest::SetMode(mode_and_submode) => {
handler
.start_transition(request.requestor_info, mode_and_submode)
.unwrap();
}
ModeRequest::ReadMode => handler
.send_mode_reply(
request.requestor_info,
ModeReply::ModeReply(handler.mode_and_submode()),
)
.unwrap(),
ModeRequest::AnnounceMode => handler.announce_mode(request.requestor_info, false),
ModeRequest::AnnounceModeRecursive => handler.announce_mode(request.requestor_info, true),
ModeRequest::ModeInfo(_) => todo!(),
}
}
impl TestDevice {
@ -75,27 +52,9 @@ impl TestDevice {
self.check_mode_requests().expect("mode messaging error");
}
pub fn check_mode_requests(&mut self) -> Result<(), GenericTargetedMessagingError> {
pub fn check_mode_requests(&mut self) -> Result<(), ModeError> {
if let Some(request) = self.mode_node.try_recv_mode_request()? {
match request.message {
ModeRequest::SetMode(mode_and_submode) => {
self.start_transition(request.requestor_info, mode_and_submode)
.unwrap();
self.mode_requestor_info = Some(request.requestor_info);
}
ModeRequest::ReadMode => self
.mode_node
.send_mode_reply(
request.requestor_info,
ModeReply::ModeReply(self.mode_and_submode),
)
.unwrap(),
ModeRequest::AnnounceMode => self.announce_mode(request.requestor_info, false),
ModeRequest::AnnounceModeRecursive => {
self.announce_mode(request.requestor_info, true)
}
ModeRequest::ModeInfo(_) => todo!(),
}
self.handle_mode_request(request)?
}
Ok(())
}
@ -108,6 +67,8 @@ impl ModeProvider for TestDevice {
}
impl ModeRequestHandler for TestDevice {
type Error = ModeError;
fn start_transition(
&mut self,
requestor: MessageMetadata,
@ -118,18 +79,14 @@ impl ModeRequestHandler for TestDevice {
Ok(())
}
fn announce_mode(&self, requestor_info: MessageMetadata, _recursive: bool) {
fn announce_mode(&self, _requestor_info: MessageMetadata, _recursive: bool) {
println!(
"{}: announcing mode: {:?}",
self.name, self.mode_and_submode
);
}
fn handle_mode_reached(
&mut self,
requestor: Option<MessageMetadata>,
) -> Result<(), GenericTargetedMessagingError> {
fn handle_mode_reached(&mut self, requestor: Option<MessageMetadata>) -> Result<(), ModeError> {
if let Some(requestor) = requestor {
self.send_mode_reply(requestor, ModeReply::ModeReply(self.mode_and_submode))?;
}
@ -139,9 +96,23 @@ impl ModeRequestHandler for TestDevice {
&self,
requestor_info: MessageMetadata,
reply: ModeReply,
) -> Result<(), GenericTargetedMessagingError> {
self.mode_node
.send_mode_reply(requestor_info, ModeReply::ModeReply(self.mode_and_submode))?;
) -> Result<(), ModeError> {
self.mode_node.send_mode_reply(requestor_info, reply)?;
Ok(())
}
fn handle_mode_info(
&mut self,
requestor_info: MessageMetadata,
info: ModeAndSubmode,
) -> Result<(), ModeError> {
// A device is a leaf in the tree.. so this really should not happen
println!(
"{}: unexpected mode info from {:?} with mode: {:?}",
self.name,
requestor_info.sender_id(),
info
);
Ok(())
}
}
@ -215,11 +186,12 @@ impl TestAssembly {
}
impl ModeRequestHandler for TestAssembly {
type Error = ModeError;
fn start_transition(
&mut self,
requestor: MessageMetadata,
mode_and_submode: ModeAndSubmode,
) -> Result<(), ModeError> {
) -> Result<(), Self::Error> {
self.mode_requestor_info = Some(requestor);
self.target_mode_and_submode = Some(mode_and_submode);
Ok(())
@ -255,7 +227,7 @@ impl ModeRequestHandler for TestAssembly {
fn handle_mode_reached(
&mut self,
mode_requestor: Option<MessageMetadata>,
) -> Result<(), GenericTargetedMessagingError> {
) -> Result<(), Self::Error> {
if let Some(requestor) = mode_requestor {
self.send_mode_reply(requestor, ModeReply::ModeReply(self.mode_and_submode))?;
}
@ -266,9 +238,17 @@ impl ModeRequestHandler for TestAssembly {
&self,
requestor: MessageMetadata,
reply: ModeReply,
) -> Result<(), GenericTargetedMessagingError> {
self.mode_node
.send_mode_reply(requestor, ModeReply::ModeReply(self.mode_and_submode))?;
) -> Result<(), Self::Error> {
self.mode_node.send_mode_reply(requestor, reply)?;
Ok(())
}
fn handle_mode_info(
&mut self,
_requestor_info: MessageMetadata,
_info: ModeAndSubmode,
) -> Result<(), Self::Error> {
// TODO: A proper assembly must reach to mode changes of its children..
Ok(())
}
}
@ -352,13 +332,11 @@ fn main() {
let mut device1 = TestDevice {
name: "Test Device 1".to_string(),
mode_node: mode_node_dev1,
mode_requestor_info: None,
mode_and_submode: ModeAndSubmode::new(0, 0),
};
let mut device2 = TestDevice {
name: "Test Device 2".to_string(),
mode_node: mode_node_dev2,
mode_requestor_info: None,
mode_and_submode: ModeAndSubmode::new(0, 0),
};
let mut assy = TestAssembly {