Continue update
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit
This commit is contained in:
@ -1,5 +1,16 @@
|
||||
use log::{error, warn};
|
||||
use std::sync::mpsc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::requests::GenericRequestRouter;
|
||||
use satrs::pool::SharedStaticMemoryPool;
|
||||
use satrs::pus::verification::VerificationReporter;
|
||||
use satrs::pus::{
|
||||
DefaultActiveRequestMap, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter,
|
||||
EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded,
|
||||
PusPacketHandlerResult, PusServiceHelper, PusTmAsVec, PusTmInPool, TmInSharedPoolSender,
|
||||
};
|
||||
use satrs::request::GenericMessage;
|
||||
use satrs::{
|
||||
mode::{ModeAndSubmode, ModeReply, ModeRequest},
|
||||
pus::{
|
||||
@ -9,9 +20,9 @@ use satrs::{
|
||||
VerificationToken,
|
||||
},
|
||||
ActivePusRequestStd, ActiveRequestProvider, EcssTmSenderCore, EcssTmtcError,
|
||||
GenericConversionError, PusReplyHandler, PusTcToRequestConverter, PusTmWrapper,
|
||||
GenericConversionError, PusReplyHandler, PusTcToRequestConverter, PusTmVariant,
|
||||
},
|
||||
request::TargetAndApidId,
|
||||
request::UniqueApidTargetId,
|
||||
spacepackets::{
|
||||
ecss::{
|
||||
tc::PusTcReader,
|
||||
@ -20,10 +31,15 @@ use satrs::{
|
||||
},
|
||||
SpHeader,
|
||||
},
|
||||
ComponentId,
|
||||
};
|
||||
use satrs_example::config::components::PUS_MODE_SERVICE;
|
||||
use satrs_example::config::{mode_err, tmtc_err};
|
||||
|
||||
use super::generic_pus_request_timeout_handler;
|
||||
use super::{
|
||||
create_verification_reporter, generic_pus_request_timeout_handler, PusTargetedRequestService,
|
||||
TargetedPusService,
|
||||
};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ModeReplyHandler {}
|
||||
@ -33,7 +49,8 @@ impl PusReplyHandler<ActivePusRequestStd, ModeReply> for ModeReplyHandler {
|
||||
|
||||
fn handle_unrequested_reply(
|
||||
&mut self,
|
||||
reply: &satrs::request::GenericMessage<ModeReply>,
|
||||
_caller_id: ComponentId,
|
||||
reply: &GenericMessage<ModeReply>,
|
||||
_tm_sender: &impl EcssTmSenderCore,
|
||||
) -> Result<(), Self::Error> {
|
||||
log::warn!("received unexpected reply for mode service 5: {reply:?}");
|
||||
@ -42,30 +59,24 @@ impl PusReplyHandler<ActivePusRequestStd, ModeReply> for ModeReplyHandler {
|
||||
|
||||
fn handle_reply(
|
||||
&mut self,
|
||||
reply: &satrs::request::GenericMessage<ModeReply>,
|
||||
caller_id: ComponentId,
|
||||
reply: &GenericMessage<ModeReply>,
|
||||
active_request: &ActivePusRequestStd,
|
||||
tm_sender: &impl EcssTmSenderCore,
|
||||
verification_handler: &impl VerificationReportingProvider,
|
||||
time_stamp: &[u8],
|
||||
tm_sender: &impl EcssTmSenderCore,
|
||||
) -> Result<bool, Self::Error> {
|
||||
let started_token: VerificationToken<TcStateStarted> = active_request
|
||||
.token()
|
||||
.try_into()
|
||||
.expect("invalid token state");
|
||||
match reply.message {
|
||||
ModeReply::ModeInfo(info) => {
|
||||
log::warn!(
|
||||
"received unrequest mode information for active request {:?}: {:?}",
|
||||
active_request,
|
||||
info
|
||||
);
|
||||
}
|
||||
ModeReply::ModeReply(mode_reply) => {
|
||||
let mut source_data: [u8; 12] = [0; 12];
|
||||
mode_reply
|
||||
.write_to_be_bytes(&mut source_data)
|
||||
.expect("writing mode reply failed");
|
||||
let req_id = verification::RequestId::from(reply.request_id);
|
||||
let req_id = verification::RequestId::from(reply.request_id());
|
||||
let mut sp_header = SpHeader::tm_unseg(req_id.packet_id().apid(), 0, 0)
|
||||
.expect("generating SP header failed");
|
||||
let sec_header = PusTmSecondaryHeader::new(
|
||||
@ -76,18 +87,21 @@ impl PusReplyHandler<ActivePusRequestStd, ModeReply> for ModeReplyHandler {
|
||||
Some(time_stamp),
|
||||
);
|
||||
let pus_tm = PusTmCreator::new(&mut sp_header, sec_header, &source_data, true);
|
||||
tm_sender.send_tm(PusTmWrapper::Direct(pus_tm))?;
|
||||
verification_handler
|
||||
.completion_success(started_token, time_stamp)
|
||||
.map_err(|e| e.0)?;
|
||||
tm_sender.send_tm(caller_id, PusTmVariant::Direct(pus_tm))?;
|
||||
verification_handler.completion_success(
|
||||
caller_id,
|
||||
tm_sender,
|
||||
started_token,
|
||||
time_stamp,
|
||||
)?;
|
||||
}
|
||||
ModeReply::CantReachMode(error_code) => {
|
||||
verification_handler
|
||||
.completion_failure(
|
||||
started_token,
|
||||
FailParams::new(time_stamp, &error_code, &[]),
|
||||
)
|
||||
.map_err(|e| e.0)?;
|
||||
verification_handler.completion_failure(
|
||||
caller_id,
|
||||
tm_sender,
|
||||
started_token,
|
||||
FailParams::new(time_stamp, &error_code, &[]),
|
||||
)?;
|
||||
}
|
||||
ModeReply::WrongMode { expected, reached } => {
|
||||
let mut error_info: [u8; 24] = [0; 24];
|
||||
@ -97,16 +111,16 @@ impl PusReplyHandler<ActivePusRequestStd, ModeReply> for ModeReplyHandler {
|
||||
written_len += reached
|
||||
.write_to_be_bytes(&mut error_info[ModeAndSubmode::RAW_LEN..])
|
||||
.expect("writing reached mode failed");
|
||||
verification_handler
|
||||
.completion_failure(
|
||||
started_token,
|
||||
FailParams::new(
|
||||
time_stamp,
|
||||
&mode_err::WRONG_MODE,
|
||||
&error_info[..written_len],
|
||||
),
|
||||
)
|
||||
.map_err(|e| e.0)?;
|
||||
verification_handler.completion_failure(
|
||||
caller_id,
|
||||
tm_sender,
|
||||
started_token,
|
||||
FailParams::new(
|
||||
time_stamp,
|
||||
&mode_err::WRONG_MODE,
|
||||
&error_info[..written_len],
|
||||
),
|
||||
)?;
|
||||
}
|
||||
};
|
||||
Ok(true)
|
||||
@ -114,12 +128,15 @@ impl PusReplyHandler<ActivePusRequestStd, ModeReply> for ModeReplyHandler {
|
||||
|
||||
fn handle_request_timeout(
|
||||
&mut self,
|
||||
caller_id: ComponentId,
|
||||
active_request: &ActivePusRequestStd,
|
||||
tm_sender: &impl EcssTmSenderCore,
|
||||
verification_handler: &impl VerificationReportingProvider,
|
||||
time_stamp: &[u8],
|
||||
_tm_sender: &impl EcssTmSenderCore,
|
||||
) -> Result<(), Self::Error> {
|
||||
generic_pus_request_timeout_handler(
|
||||
caller_id,
|
||||
tm_sender,
|
||||
active_request,
|
||||
verification_handler,
|
||||
time_stamp,
|
||||
@ -137,16 +154,21 @@ impl PusTcToRequestConverter<ActivePusRequestStd, ModeRequest> for ModeRequestCo
|
||||
|
||||
fn convert(
|
||||
&mut self,
|
||||
|
||||
caller_id: ComponentId,
|
||||
token: VerificationToken<TcStateAccepted>,
|
||||
tc: &PusTcReader,
|
||||
time_stamp: &[u8],
|
||||
tm_sender: &(impl EcssTmSenderCore + ?Sized),
|
||||
verif_reporter: &impl VerificationReportingProvider,
|
||||
time_stamp: &[u8],
|
||||
) -> Result<(ActivePusRequestStd, ModeRequest), Self::Error> {
|
||||
let subservice = tc.subservice();
|
||||
let user_data = tc.user_data();
|
||||
let not_enough_app_data = |expected: usize| {
|
||||
verif_reporter
|
||||
.start_failure(
|
||||
caller_id,
|
||||
tm_sender,
|
||||
token,
|
||||
FailParams::new_no_fail_data(time_stamp, &tmtc_err::NOT_ENOUGH_APP_DATA),
|
||||
)
|
||||
@ -159,7 +181,7 @@ impl PusTcToRequestConverter<ActivePusRequestStd, ModeRequest> for ModeRequestCo
|
||||
if user_data.len() < core::mem::size_of::<u32>() {
|
||||
return not_enough_app_data(4);
|
||||
}
|
||||
let target_id_and_apid = TargetAndApidId::from_pus_tc(tc).unwrap();
|
||||
let target_id_and_apid = UniqueApidTargetId::from_pus_tc(tc).unwrap();
|
||||
let active_request =
|
||||
ActivePusRequestStd::new(target_id_and_apid.into(), token, Duration::from_secs(30));
|
||||
let subservice_typed = Subservice::try_from(subservice);
|
||||
@ -167,6 +189,8 @@ impl PusTcToRequestConverter<ActivePusRequestStd, ModeRequest> for ModeRequestCo
|
||||
// Invalid subservice
|
||||
verif_reporter
|
||||
.start_failure(
|
||||
caller_id,
|
||||
tm_sender,
|
||||
token,
|
||||
FailParams::new_no_fail_data(time_stamp, &tmtc_err::INVALID_PUS_SUBSERVICE),
|
||||
)
|
||||
@ -196,8 +220,117 @@ impl PusTcToRequestConverter<ActivePusRequestStd, ModeRequest> for ModeRequestCo
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_mode_service_static(
|
||||
tm_sender: TmInSharedPoolSender<mpsc::SyncSender<PusTmInPool>>,
|
||||
tc_pool: SharedStaticMemoryPool,
|
||||
pus_action_rx: mpsc::Receiver<EcssTcAndToken>,
|
||||
mode_router: GenericRequestRouter,
|
||||
reply_receiver: mpsc::Receiver<GenericMessage<ModeReply>>,
|
||||
) -> ModeServiceWrapper<MpscTmInSharedPoolSenderBounded, EcssTcInSharedStoreConverter> {
|
||||
let mode_request_handler = PusTargetedRequestService::new(
|
||||
PusServiceHelper::new(
|
||||
PUS_MODE_SERVICE.raw(),
|
||||
pus_action_rx,
|
||||
tm_sender,
|
||||
create_verification_reporter(PUS_MODE_SERVICE.apid),
|
||||
EcssTcInSharedStoreConverter::new(tc_pool, 2048),
|
||||
),
|
||||
ModeRequestConverter::default(),
|
||||
DefaultActiveRequestMap::default(),
|
||||
ModeReplyHandler::default(),
|
||||
mode_router,
|
||||
reply_receiver,
|
||||
);
|
||||
ModeServiceWrapper {
|
||||
service: mode_request_handler,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_mode_service_dynamic(
|
||||
tm_funnel_tx: mpsc::Sender<PusTmAsVec>,
|
||||
pus_action_rx: mpsc::Receiver<EcssTcAndToken>,
|
||||
mode_router: GenericRequestRouter,
|
||||
reply_receiver: mpsc::Receiver<GenericMessage<ModeReply>>,
|
||||
) -> ModeServiceWrapper<MpscTmAsVecSender, EcssTcInVecConverter> {
|
||||
let mode_request_handler = PusTargetedRequestService::new(
|
||||
PusServiceHelper::new(
|
||||
PUS_MODE_SERVICE.raw(),
|
||||
pus_action_rx,
|
||||
tm_funnel_tx,
|
||||
create_verification_reporter(PUS_MODE_SERVICE.apid),
|
||||
EcssTcInVecConverter::default(),
|
||||
),
|
||||
ModeRequestConverter::default(),
|
||||
DefaultActiveRequestMap::default(),
|
||||
ModeReplyHandler::default(),
|
||||
mode_router,
|
||||
reply_receiver,
|
||||
);
|
||||
ModeServiceWrapper {
|
||||
service: mode_request_handler,
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ModeServiceWrapper<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter> {
|
||||
pub(crate) service: PusTargetedRequestService<
|
||||
MpscTcReceiver,
|
||||
TmSender,
|
||||
TcInMemConverter,
|
||||
VerificationReporter,
|
||||
ModeRequestConverter,
|
||||
ModeReplyHandler,
|
||||
DefaultActiveRequestMap<ActivePusRequestStd>,
|
||||
ActivePusRequestStd,
|
||||
ModeRequest,
|
||||
ModeReply,
|
||||
>,
|
||||
}
|
||||
|
||||
impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter> TargetedPusService
|
||||
for ModeServiceWrapper<TmSender, TcInMemConverter>
|
||||
{
|
||||
/// Returns [true] if the packet handling is finished.
|
||||
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool {
|
||||
match self.service.poll_and_handle_next_tc(time_stamp) {
|
||||
Ok(result) => match result {
|
||||
PusPacketHandlerResult::RequestHandled => {}
|
||||
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
|
||||
warn!("PUS mode service: partial packet handling success: {e:?}")
|
||||
}
|
||||
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
|
||||
warn!("PUS mode service: invalid subservice {invalid}");
|
||||
}
|
||||
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
||||
warn!("PUS mode service: {subservice} not implemented");
|
||||
}
|
||||
PusPacketHandlerResult::Empty => {
|
||||
return true;
|
||||
}
|
||||
},
|
||||
Err(error) => {
|
||||
error!("PUS mode service: packet handling error: {error:?}")
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> bool {
|
||||
self.service
|
||||
.poll_and_check_next_reply(time_stamp)
|
||||
.unwrap_or_else(|e| {
|
||||
warn!("PUS action service: Handling reply failed with error {e:?}");
|
||||
false
|
||||
})
|
||||
}
|
||||
|
||||
fn check_for_request_timeouts(&mut self) {
|
||||
self.service.check_for_request_timeouts();
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use satrs::pus::test_util::{TEST_APID, TEST_COMPONENT_ID, TEST_UNIQUE_ID};
|
||||
use satrs::request::MessageMetadata;
|
||||
use satrs::{
|
||||
mode::{ModeAndSubmode, ModeReply, ModeRequest},
|
||||
pus::mode::Subservice,
|
||||
@ -207,10 +340,11 @@ mod tests {
|
||||
SpHeader,
|
||||
},
|
||||
};
|
||||
use satrs_example::config::tmtc_err;
|
||||
|
||||
use crate::pus::{
|
||||
mode::ModeReplyHandler,
|
||||
tests::{PusConverterTestbench, ReplyHandlerTestbench, TEST_APID, TEST_APID_TARGET_ID},
|
||||
tests::{PusConverterTestbench, ReplyHandlerTestbench},
|
||||
};
|
||||
|
||||
use super::ModeRequestConverter;
|
||||
@ -221,11 +355,11 @@ mod tests {
|
||||
let mut sp_header = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap();
|
||||
let sec_header = PusTcSecondaryHeader::new_simple(200, Subservice::TcReadMode as u8);
|
||||
let mut app_data: [u8; 4] = [0; 4];
|
||||
app_data[0..4].copy_from_slice(&TEST_APID_TARGET_ID.to_be_bytes());
|
||||
app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID.to_be_bytes());
|
||||
let tc = PusTcCreator::new(&mut sp_header, sec_header, &app_data, true);
|
||||
let token = testbench.add_tc(&tc);
|
||||
let (_active_req, req) = testbench
|
||||
.convert(token, &[], TEST_APID, TEST_APID_TARGET_ID)
|
||||
.convert(token, &[], TEST_APID, TEST_UNIQUE_ID)
|
||||
.expect("conversion has failed");
|
||||
assert_eq!(req, ModeRequest::ReadMode);
|
||||
}
|
||||
@ -237,14 +371,14 @@ mod tests {
|
||||
let sec_header = PusTcSecondaryHeader::new_simple(200, Subservice::TcSetMode as u8);
|
||||
let mut app_data: [u8; 4 + ModeAndSubmode::RAW_LEN] = [0; 4 + ModeAndSubmode::RAW_LEN];
|
||||
let mode_and_submode = ModeAndSubmode::new(2, 1);
|
||||
app_data[0..4].copy_from_slice(&TEST_APID_TARGET_ID.to_be_bytes());
|
||||
app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID.to_be_bytes());
|
||||
mode_and_submode
|
||||
.write_to_be_bytes(&mut app_data[4..])
|
||||
.unwrap();
|
||||
let tc = PusTcCreator::new(&mut sp_header, sec_header, &app_data, true);
|
||||
let token = testbench.add_tc(&tc);
|
||||
let (_active_req, req) = testbench
|
||||
.convert(token, &[], TEST_APID, TEST_APID_TARGET_ID)
|
||||
.convert(token, &[], TEST_APID, TEST_UNIQUE_ID)
|
||||
.expect("conversion has failed");
|
||||
assert_eq!(req, ModeRequest::SetMode(mode_and_submode));
|
||||
}
|
||||
@ -255,11 +389,11 @@ mod tests {
|
||||
let mut sp_header = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap();
|
||||
let sec_header = PusTcSecondaryHeader::new_simple(200, Subservice::TcAnnounceMode as u8);
|
||||
let mut app_data: [u8; 4] = [0; 4];
|
||||
app_data[0..4].copy_from_slice(&TEST_APID_TARGET_ID.to_be_bytes());
|
||||
app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID.to_be_bytes());
|
||||
let tc = PusTcCreator::new(&mut sp_header, sec_header, &app_data, true);
|
||||
let token = testbench.add_tc(&tc);
|
||||
let (_active_req, req) = testbench
|
||||
.convert(token, &[], TEST_APID, TEST_APID_TARGET_ID)
|
||||
.convert(token, &[], TEST_APID, TEST_UNIQUE_ID)
|
||||
.expect("conversion has failed");
|
||||
assert_eq!(req, ModeRequest::AnnounceMode);
|
||||
}
|
||||
@ -271,11 +405,11 @@ mod tests {
|
||||
let sec_header =
|
||||
PusTcSecondaryHeader::new_simple(200, Subservice::TcAnnounceModeRecursive as u8);
|
||||
let mut app_data: [u8; 4] = [0; 4];
|
||||
app_data[0..4].copy_from_slice(&TEST_APID_TARGET_ID.to_be_bytes());
|
||||
app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID.to_be_bytes());
|
||||
let tc = PusTcCreator::new(&mut sp_header, sec_header, &app_data, true);
|
||||
let token = testbench.add_tc(&tc);
|
||||
let (_active_req, req) = testbench
|
||||
.convert(token, &[], TEST_APID, TEST_APID_TARGET_ID)
|
||||
.convert(token, &[], TEST_APID, TEST_UNIQUE_ID)
|
||||
.expect("conversion has failed");
|
||||
assert_eq!(req, ModeRequest::AnnounceModeRecursive);
|
||||
}
|
||||
@ -283,8 +417,9 @@ mod tests {
|
||||
#[test]
|
||||
fn reply_handling_unrequested_reply() {
|
||||
let mut testbench = ReplyHandlerTestbench::new(ModeReplyHandler::default());
|
||||
let mode_reply = ModeReply::ModeInfo(ModeAndSubmode::new(5, 1));
|
||||
let unrequested_reply = GenericMessage::new(10_u32, 15_u64, mode_reply);
|
||||
let mode_reply = ModeReply::ModeReply(ModeAndSubmode::new(5, 1));
|
||||
let unrequested_reply =
|
||||
GenericMessage::new(MessageMetadata::new(10_u32, 15_u64), mode_reply);
|
||||
// Right now this function does not do a lot. We simply check that it does not panic or do
|
||||
// weird stuff.
|
||||
let result = testbench.handle_unrequested_reply(&unrequested_reply);
|
||||
@ -294,9 +429,14 @@ mod tests {
|
||||
#[test]
|
||||
fn reply_handling_reply_timeout() {
|
||||
let mut testbench = ReplyHandlerTestbench::new(ModeReplyHandler::default());
|
||||
// TODO: Start a request, then time it out with the API and check verification completion
|
||||
// failure.
|
||||
// testbench.reply_handler.handle_request_timeout(active_request, verification_handler, time_stamp, tm_sender)
|
||||
// testbench.default_timeout = Duration::from_millis(50);
|
||||
let (req_id, active_request) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID, &[]);
|
||||
let result = testbench.handle_request_timeout(&active_request, &[]);
|
||||
assert!(result.is_ok());
|
||||
testbench.verif_reporter.assert_completion_failure(
|
||||
TEST_COMPONENT_ID.raw(),
|
||||
req_id,
|
||||
None,
|
||||
tmtc_err::REQUEST_TIMEOUT.raw() as u64,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user