introduce active request table abstraction
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good

This commit is contained in:
Robin Müller 2024-03-11 17:49:55 +01:00
parent 460ce8074f
commit 4afc360873
Signed by: muellerr
GPG Key ID: A649FB78196E3849

View File

@ -1,3 +1,5 @@
use core::time::Duration;
use crate::{ use crate::{
action::{ActionId, ActionRequest}, action::{ActionId, ActionRequest},
params::Params, params::Params,
@ -5,10 +7,10 @@ use crate::{
TargetId, TargetId,
}; };
use super::verification::{TcStateAccepted, VerificationToken}; use super::verification::{TcStateAccepted, TcStateStarted, VerificationToken};
use satrs_shared::res_code::ResultU16; use satrs_shared::res_code::ResultU16;
use spacepackets::ecss::EcssEnumU16; use spacepackets::{ecss::EcssEnumU16, time::UnixTimestamp};
#[cfg(feature = "std")] #[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
@ -62,6 +64,27 @@ pub trait PusActionRequestRouter {
) -> Result<(), Self::Error>; ) -> Result<(), Self::Error>;
} }
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ActiveActionRequest {
action_id: ActionId,
token: VerificationToken<TcStateStarted>,
start_time: UnixTimestamp,
timeout: Duration,
}
pub trait ActiveRequestMapProvider: Default {
fn insert(&mut self, request_id: &RequestId, request: ActiveActionRequest);
fn get(&self, request_id: RequestId) -> Option<ActiveActionRequest>;
fn get_mut(&mut self, request_id: RequestId) -> Option<&mut ActiveActionRequest>;
fn remove(&mut self, request_id: RequestId) -> bool;
/// Call a user-supplied closure for each active request.
fn for_each<F: FnMut(&RequestId, &ActiveActionRequest)>(&self, f: F);
/// Call a user-supplied closure for each active request. Mutable variant.
fn for_each_mut<F: FnMut(&RequestId, &mut ActiveActionRequest)>(&mut self, f: F);
}
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub mod alloc_mod { pub mod alloc_mod {
@ -228,12 +251,40 @@ pub mod std_mod {
} }
} }
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Clone, Debug, Default)]
pub struct ActiveActionRequest { pub struct DefaultActiveRequestMap(HashMap<RequestId, ActiveActionRequest>);
action_id: ActionId,
token: VerificationToken<TcStateStarted>, impl ActiveRequestMapProvider for DefaultActiveRequestMap {
start_time: UnixTimestamp, // type Iter = hashbrown::hash_map::Iter<'a, RequestId, ActiveActionRequest>;
timeout: Duration, // type IterMut = hashbrown::hash_map::IterMut<'a, RequestId, ActiveActionRequest>;
fn insert(&mut self, request_id: &RequestId, request: ActiveActionRequest) {
self.0.insert(*request_id, request);
}
fn get(&self, request_id: RequestId) -> Option<ActiveActionRequest> {
self.0.get(&request_id).cloned()
}
fn get_mut(&mut self, request_id: RequestId) -> Option<&mut ActiveActionRequest> {
self.0.get_mut(&request_id)
}
fn remove(&mut self, request_id: RequestId) -> bool {
self.0.remove(&request_id).is_some()
}
fn for_each<F: FnMut(&RequestId, &ActiveActionRequest)>(&self, mut f: F) {
for (req_id, active_req) in &self.0 {
f(req_id, active_req);
}
}
fn for_each_mut<F: FnMut(&RequestId, &mut ActiveActionRequest)>(&mut self, mut f: F) {
for (req_id, active_req) in &mut self.0 {
f(req_id, active_req);
}
}
} }
pub trait ActionReplyHandlerHook { pub trait ActionReplyHandlerHook {
@ -244,17 +295,21 @@ pub mod std_mod {
pub struct PusService8ReplyHandler< pub struct PusService8ReplyHandler<
VerificationReporter: VerificationReportingProvider, VerificationReporter: VerificationReportingProvider,
ActiveRequestMap: ActiveRequestMapProvider,
UserHook: ActionReplyHandlerHook, UserHook: ActionReplyHandlerHook,
> { > {
active_requests: HashMap<RequestId, ActiveActionRequest>, active_requests: ActiveRequestMap,
verification_reporter: VerificationReporter, verification_reporter: VerificationReporter,
fail_data_buf: alloc::vec::Vec<u8>, fail_data_buf: alloc::vec::Vec<u8>,
current_time: UnixTimestamp, current_time: UnixTimestamp,
user_hook: UserHook, user_hook: UserHook,
} }
impl<VerificationReporter: VerificationReportingProvider, UserHook: ActionReplyHandlerHook> impl<
PusService8ReplyHandler<VerificationReporter, UserHook> VerificationReporter: VerificationReportingProvider,
ActiveRequestMap: ActiveRequestMapProvider,
UserHook: ActionReplyHandlerHook,
> PusService8ReplyHandler<VerificationReporter, ActiveRequestMap, UserHook>
{ {
#[cfg(feature = "std")] #[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
@ -263,15 +318,13 @@ pub mod std_mod {
fail_data_buf_size: usize, fail_data_buf_size: usize,
user_hook: UserHook, user_hook: UserHook,
) -> Result<Self, SystemTimeError> { ) -> Result<Self, SystemTimeError> {
let mut reply_handler = Self { let current_time = UnixTimestamp::from_now()?;
active_requests: HashMap::new(), Ok(Self::new(
verification_reporter, verification_reporter,
fail_data_buf: alloc::vec![0; fail_data_buf_size], fail_data_buf_size,
current_time: UnixTimestamp::default(),
user_hook, user_hook,
}; current_time,
reply_handler.update_time_from_now()?; ))
Ok(reply_handler)
} }
pub fn new( pub fn new(
@ -281,7 +334,7 @@ pub mod std_mod {
init_time: UnixTimestamp, init_time: UnixTimestamp,
) -> Self { ) -> Self {
Self { Self {
active_requests: HashMap::new(), active_requests: ActiveRequestMap::default(),
verification_reporter, verification_reporter,
fail_data_buf: alloc::vec![0; fail_data_buf_size], fail_data_buf: alloc::vec![0; fail_data_buf_size],
current_time: init_time, current_time: init_time,
@ -296,7 +349,7 @@ pub mod std_mod {
timeout: Duration, timeout: Duration,
) { ) {
self.active_requests.insert( self.active_requests.insert(
request_id.into(), &request_id.into(),
ActiveActionRequest { ActiveActionRequest {
action_id, action_id,
token, token,
@ -317,11 +370,16 @@ pub mod std_mod {
/// ///
/// It will call [Self::handle_timeout] for all active requests which have timed out. /// It will call [Self::handle_timeout] for all active requests which have timed out.
pub fn check_for_timeouts(&mut self, time_stamp: &[u8]) -> Result<(), EcssTmtcError> { pub fn check_for_timeouts(&mut self, time_stamp: &[u8]) -> Result<(), EcssTmtcError> {
for active_req in self.active_requests.values() { let mut timed_out_commands = alloc::vec::Vec::new();
self.active_requests.for_each(|request_id, active_req| {
let diff = self.current_time - active_req.start_time; let diff = self.current_time - active_req.start_time;
if diff.duration_absolute > active_req.timeout { if diff.duration_absolute > active_req.timeout {
self.handle_timeout(active_req, time_stamp); self.handle_timeout(active_req, time_stamp);
} }
timed_out_commands.push(*request_id);
});
for timed_out_command in timed_out_commands {
self.active_requests.remove(timed_out_command);
} }
Ok(()) Ok(())
} }
@ -352,13 +410,13 @@ pub mod std_mod {
action_reply_with_ids: ActionReplyPusWithIds, action_reply_with_ids: ActionReplyPusWithIds,
time_stamp: &[u8], time_stamp: &[u8],
) -> Result<(), EcssTmtcError> { ) -> Result<(), EcssTmtcError> {
let active_req = self.active_requests.get(&action_reply_with_ids.request_id); let active_req = self.active_requests.get(action_reply_with_ids.request_id);
if active_req.is_none() { if active_req.is_none() {
self.user_hook self.user_hook
.handle_unexpected_reply(&action_reply_with_ids); .handle_unexpected_reply(&action_reply_with_ids);
} }
let active_req = active_req.unwrap(); let active_req = active_req.unwrap().clone();
match action_reply_with_ids.reply { let remove_entry = match action_reply_with_ids.reply {
ActionReplyPus::CompletionFailed { error_code, params } => { ActionReplyPus::CompletionFailed { error_code, params } => {
let fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?; let fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?;
self.verification_reporter self.verification_reporter
@ -371,8 +429,7 @@ pub mod std_mod {
), ),
) )
.map_err(|e| e.0)?; .map_err(|e| e.0)?;
self.active_requests true
.remove(&action_reply_with_ids.request_id);
} }
ActionReplyPus::StepFailed { ActionReplyPus::StepFailed {
error_code, error_code,
@ -391,15 +448,13 @@ pub mod std_mod {
), ),
) )
.map_err(|e| e.0)?; .map_err(|e| e.0)?;
self.active_requests true
.remove(&action_reply_with_ids.request_id);
} }
ActionReplyPus::Completed => { ActionReplyPus::Completed => {
self.verification_reporter self.verification_reporter
.completion_success(active_req.token, time_stamp) .completion_success(active_req.token, time_stamp)
.map_err(|e| e.0)?; .map_err(|e| e.0)?;
self.active_requests true
.remove(&action_reply_with_ids.request_id);
} }
ActionReplyPus::StepSuccess { step } => { ActionReplyPus::StepSuccess { step } => {
self.verification_reporter.step_success( self.verification_reporter.step_success(
@ -407,7 +462,12 @@ pub mod std_mod {
time_stamp, time_stamp,
EcssEnumU16::new(step), EcssEnumU16::new(step),
)?; )?;
false
} }
};
if remove_entry {
self.active_requests
.remove(action_reply_with_ids.request_id);
} }
Ok(()) Ok(())
} }
@ -606,7 +666,11 @@ mod tests {
pub struct Pus8ReplyTestbench { pub struct Pus8ReplyTestbench {
verif_reporter: TestVerificationReporter, verif_reporter: TestVerificationReporter,
handler: PusService8ReplyHandler<TestVerificationReporter, TestReplyHandlerHook>, handler: PusService8ReplyHandler<
TestVerificationReporter,
DefaultActiveRequestMap,
TestReplyHandlerHook,
>,
} }
impl Pus8ReplyTestbench { impl Pus8ReplyTestbench {