diff --git a/satrs/src/action.rs b/satrs/src/action.rs index 7caeaa6..8803b7a 100644 --- a/satrs/src/action.rs +++ b/satrs/src/action.rs @@ -1,4 +1,4 @@ -use crate::{pool::StoreAddr, TargetId}; +use crate::{params::Params, pool::StoreAddr, TargetId}; pub type ActionId = u32; @@ -41,23 +41,31 @@ impl TargetedActionRequest { } } -/// A reply to an action request. +/// A reply to an action request specific to PUS. #[non_exhaustive] -#[derive(Clone, Eq, PartialEq, Debug)] +#[derive(Clone, Debug)] pub enum ActionReply { - CompletionFailed(ActionId), + CompletionFailed { + id: ActionId, + reason: Params, + }, StepFailed { id: ActionId, step: u32, + reason: Params, }, Completed(ActionId), #[cfg(feature = "alloc")] CompletedStringId(alloc::string::String), #[cfg(feature = "alloc")] - CompletionFailedStringId(alloc::string::String), + CompletionFailedStringId { + id: alloc::string::String, + reason: Params, + }, #[cfg(feature = "alloc")] StepFailedStringId { id: alloc::string::String, step: u32, + reason: Params, }, } diff --git a/satrs/src/lib.rs b/satrs/src/lib.rs index 5040d58..b3374f9 100644 --- a/satrs/src/lib.rs +++ b/satrs/src/lib.rs @@ -32,6 +32,9 @@ pub mod events; #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] pub mod executable; pub mod hal; +#[cfg(feature = "std")] +#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] +pub mod mode_tree; pub mod objects; pub mod pool; pub mod power; @@ -49,8 +52,7 @@ pub mod params; pub use spacepackets; -/// Generic channel ID type. -pub type ChannelId = u32; +pub use queue::ChannelId; /// Generic target ID type. pub type TargetId = u64; diff --git a/satrs/src/mode.rs b/satrs/src/mode.rs index c5968b4..1346226 100644 --- a/satrs/src/mode.rs +++ b/satrs/src/mode.rs @@ -5,19 +5,22 @@ use spacepackets::ByteConversionError; use crate::TargetId; +pub type Mode = u32; +pub type Submode = u16; + #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct ModeAndSubmode { - mode: u32, - submode: u16, + mode: Mode, + submode: Submode, } impl ModeAndSubmode { - pub const fn new_mode_only(mode: u32) -> Self { + pub const fn new_mode_only(mode: Mode) -> Self { Self { mode, submode: 0 } } - pub const fn new(mode: u32, submode: u16) -> Self { + pub const fn new(mode: Mode, submode: Submode) -> Self { Self { mode, submode } } @@ -33,16 +36,20 @@ impl ModeAndSubmode { }); } Ok(Self { - mode: u32::from_be_bytes(buf[0..4].try_into().unwrap()), - submode: u16::from_be_bytes(buf[4..6].try_into().unwrap()), + mode: Mode::from_be_bytes(buf[0..size_of::()].try_into().unwrap()), + submode: Submode::from_be_bytes( + buf[size_of::()..size_of::() + size_of::()] + .try_into() + .unwrap(), + ), }) } - pub fn mode(&self) -> u32 { + pub fn mode(&self) -> Mode { self.mode } - pub fn submode(&self) -> u16 { + pub fn submode(&self) -> Submode { self.submode } } @@ -87,6 +94,20 @@ pub enum ModeRequest { AnnounceModeRecursive, } +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum ModeReply { + /// Unrequest mode information. Can be used to notify other components of changed modes. + ModeInfo(ModeAndSubmode), + /// Reply to a mode request to confirm the commanded mode was reached. + ModeReply(ModeAndSubmode), + CantReachMode(ModeAndSubmode), + WrongMode { + expected: ModeAndSubmode, + reached: ModeAndSubmode, + }, +} + #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct TargetedModeRequest { diff --git a/satrs/src/mode_tree.rs b/satrs/src/mode_tree.rs new file mode 100644 index 0000000..3da5c26 --- /dev/null +++ b/satrs/src/mode_tree.rs @@ -0,0 +1,398 @@ +use alloc::vec::Vec; +use hashbrown::HashMap; +use std::sync::mpsc; + +use crate::{ + mode::{Mode, ModeAndSubmode, ModeReply, ModeRequest, Submode}, + queue::GenericTargetedMessagingError, + request::{ + GenericMessage, MessageReceiver, MessageReceiverWithId, MessageSender, + MessageSenderAndReceiver, MessageSenderMap, MessageSenderMapWithId, + RequestAndReplySenderAndReceiver, RequestId, + }, + ChannelId, +}; + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum TableEntryType { + /// Target table containing information of the expected children modes for given mode. + Target, + /// Sequence table which contains information about how to reach a target table, including + /// the order of the sequences. + Sequence, +} + +pub struct ModeTableEntry { + /// Name of respective table entry. + pub name: &'static str, + /// Target channel ID. + pub channel_id: ChannelId, + pub mode_submode: ModeAndSubmode, + pub allowed_submode_mask: Option, + pub check_success: bool, +} + +pub struct ModeTableMapValue { + /// Name for a given mode table entry. + pub name: &'static str, + pub entries: Vec, +} + +pub type ModeTable = HashMap; + +pub trait ModeRequestSender { + fn local_channel_id(&self) -> ChannelId; + fn send_mode_request( + &self, + request_id: RequestId, + target_id: ChannelId, + request: ModeRequest, + ) -> Result<(), GenericTargetedMessagingError>; +} + +pub trait ModeReplySender { + fn local_channel_id(&self) -> ChannelId; + + fn send_mode_reply( + &self, + request_id: RequestId, + target_id: ChannelId, + reply: ModeReply, + ) -> Result<(), GenericTargetedMessagingError>; +} + +pub trait ModeRequestReceiver { + fn try_recv_mode_request( + &self, + ) -> Result>, GenericTargetedMessagingError>; +} + +pub trait ModeReplyReceiver { + fn try_recv_mode_reply( + &self, + ) -> Result>, GenericTargetedMessagingError>; +} + +impl> MessageSenderMap { + pub fn send_mode_request( + &self, + request_id: RequestId, + local_id: ChannelId, + target_id: ChannelId, + request: ModeRequest, + ) -> Result<(), GenericTargetedMessagingError> { + self.send_message(request_id, local_id, target_id, request) + } + + pub fn add_request_target(&mut self, target_id: ChannelId, request_sender: S) { + self.add_message_target(target_id, request_sender) + } +} + +impl> MessageSenderMap { + pub fn send_mode_reply( + &self, + request_id: RequestId, + local_id: ChannelId, + target_id: ChannelId, + request: ModeReply, + ) -> Result<(), GenericTargetedMessagingError> { + self.send_message(request_id, local_id, target_id, request) + } + + pub fn add_reply_target(&mut self, target_id: ChannelId, request_sender: S) { + self.add_message_target(target_id, request_sender) + } +} + +impl> ModeReplySender for MessageSenderMapWithId { + fn send_mode_reply( + &self, + request_id: RequestId, + target_channel_id: ChannelId, + reply: ModeReply, + ) -> Result<(), GenericTargetedMessagingError> { + self.send_message(request_id, target_channel_id, reply) + } + + fn local_channel_id(&self) -> ChannelId { + self.local_channel_id + } +} + +impl> ModeRequestSender for MessageSenderMapWithId { + fn local_channel_id(&self) -> ChannelId { + self.local_channel_id + } + + fn send_mode_request( + &self, + request_id: RequestId, + target_id: ChannelId, + request: ModeRequest, + ) -> Result<(), GenericTargetedMessagingError> { + self.send_message(request_id, target_id, request) + } +} + +impl> ModeReplyReceiver for MessageReceiverWithId { + fn try_recv_mode_reply( + &self, + ) -> Result>, GenericTargetedMessagingError> { + self.try_recv_message() + } +} + +impl> ModeRequestReceiver + for MessageReceiverWithId +{ + fn try_recv_mode_request( + &self, + ) -> Result>, GenericTargetedMessagingError> { + self.try_recv_message() + } +} + +impl, R: MessageReceiver> ModeRequestSender + for MessageSenderAndReceiver +{ + fn local_channel_id(&self) -> ChannelId { + self.local_channel_id_generic() + } + + fn send_mode_request( + &self, + request_id: RequestId, + target_id: ChannelId, + request: ModeRequest, + ) -> Result<(), GenericTargetedMessagingError> { + self.message_sender_map.send_mode_request( + request_id, + self.local_channel_id(), + target_id, + request, + ) + } +} + +impl, R: MessageReceiver> ModeReplySender + for MessageSenderAndReceiver +{ + fn local_channel_id(&self) -> ChannelId { + self.local_channel_id_generic() + } + + fn send_mode_reply( + &self, + request_id: RequestId, + target_id: ChannelId, + request: ModeReply, + ) -> Result<(), GenericTargetedMessagingError> { + self.message_sender_map.send_mode_reply( + request_id, + self.local_channel_id(), + target_id, + request, + ) + } +} + +impl, R: MessageReceiver> ModeReplyReceiver + for MessageSenderAndReceiver +{ + fn try_recv_mode_reply( + &self, + ) -> Result>, GenericTargetedMessagingError> { + self.message_receiver + .try_recv_message(self.local_channel_id_generic()) + } +} +impl, R: MessageReceiver> ModeRequestReceiver + for MessageSenderAndReceiver +{ + fn try_recv_mode_request( + &self, + ) -> Result>, GenericTargetedMessagingError> { + self.message_receiver + .try_recv_message(self.local_channel_id_generic()) + } +} + +pub type ModeRequestHandlerConnector = MessageSenderAndReceiver; +pub type MpscModeRequestHandlerConnector = ModeRequestHandlerConnector< + mpsc::Sender>, + mpsc::Receiver>, +>; +pub type MpscBoundedModeRequestHandlerConnector = ModeRequestHandlerConnector< + mpsc::SyncSender>, + mpsc::Receiver>, +>; + +pub type ModeRequestorConnector = MessageSenderAndReceiver; +pub type MpscModeRequestorConnector = ModeRequestorConnector< + mpsc::Sender>, + mpsc::Receiver>, +>; +pub type MpscBoundedModeRequestorConnector = ModeRequestorConnector< + mpsc::SyncSender>, + mpsc::Receiver>, +>; + +pub type ModeConnector = + RequestAndReplySenderAndReceiver; +pub type MpscModeConnector = ModeConnector< + mpsc::Sender>, + mpsc::Receiver>, + mpsc::Sender>, + mpsc::Receiver>, +>; +pub type MpscBoundedModeConnector = ModeConnector< + mpsc::SyncSender>, + mpsc::Receiver>, + mpsc::SyncSender>, + mpsc::Receiver>, +>; + +impl< + REPLY, + S0: MessageSender, + R0: MessageReceiver, + S1: MessageSender, + R1: MessageReceiver, + > RequestAndReplySenderAndReceiver +{ + pub fn add_request_target(&mut self, target_id: ChannelId, request_sender: S0) { + self.request_sender_map + .add_message_target(target_id, request_sender) + } +} + +impl< + REQUEST, + S0: MessageSender, + R0: MessageReceiver, + S1: MessageSender, + R1: MessageReceiver, + > RequestAndReplySenderAndReceiver +{ + pub fn add_reply_target(&mut self, target_id: ChannelId, reply_sender: S1) { + self.reply_sender_map + .add_message_target(target_id, reply_sender) + } +} + +impl< + REPLY, + S0: MessageSender, + R0: MessageReceiver, + S1: MessageSender, + R1: MessageReceiver, + > ModeRequestSender for RequestAndReplySenderAndReceiver +{ + fn local_channel_id(&self) -> ChannelId { + self.local_channel_id_generic() + } + + fn send_mode_request( + &self, + request_id: RequestId, + target_id: ChannelId, + request: ModeRequest, + ) -> Result<(), GenericTargetedMessagingError> { + self.request_sender_map.send_mode_request( + request_id, + self.local_channel_id(), + target_id, + request, + ) + } +} + +impl< + REQUEST, + S0: MessageSender, + R0: MessageReceiver, + S1: MessageSender, + R1: MessageReceiver, + > ModeReplySender for RequestAndReplySenderAndReceiver +{ + fn local_channel_id(&self) -> ChannelId { + self.local_channel_id_generic() + } + + fn send_mode_reply( + &self, + request_id: RequestId, + target_id: ChannelId, + request: ModeReply, + ) -> Result<(), GenericTargetedMessagingError> { + self.reply_sender_map.send_mode_reply( + request_id, + self.local_channel_id(), + target_id, + request, + ) + } +} + +impl< + REQUEST, + S0: MessageSender, + R0: MessageReceiver, + S1: MessageSender, + R1: MessageReceiver, + > ModeReplyReceiver for RequestAndReplySenderAndReceiver +{ + fn try_recv_mode_reply( + &self, + ) -> Result>, GenericTargetedMessagingError> { + self.reply_receiver + .try_recv_message(self.local_channel_id_generic()) + } +} + +impl< + REPLY, + S0: MessageSender, + R0: MessageReceiver, + S1: MessageSender, + R1: MessageReceiver, + > ModeRequestReceiver for RequestAndReplySenderAndReceiver +{ + fn try_recv_mode_request( + &self, + ) -> Result>, GenericTargetedMessagingError> { + self.request_receiver + .try_recv_message(self.local_channel_id_generic()) + } +} + +pub trait ModeProvider { + fn mode_and_submode(&self) -> ModeAndSubmode; +} + +#[derive(Debug, Clone)] +pub enum ModeError { + Messaging(GenericTargetedMessagingError), +} + +impl From for ModeError { + fn from(value: GenericTargetedMessagingError) -> Self { + Self::Messaging(value) + } +} + +pub trait ModeRequestHandler: ModeProvider { + fn start_transition( + &mut self, + request_id: RequestId, + sender_id: ChannelId, + mode_and_submode: ModeAndSubmode, + ) -> Result<(), ModeError>; + + fn announce_mode(&self, request_id: RequestId, sender_id: ChannelId, recursive: bool); + fn handle_mode_reached(&mut self) -> Result<(), GenericTargetedMessagingError>; +} + +#[cfg(test)] +mod tests {} diff --git a/satrs/src/params.rs b/satrs/src/params.rs index 1279015..90d7028 100644 --- a/satrs/src/params.rs +++ b/satrs/src/params.rs @@ -618,6 +618,42 @@ impl From<&str> for Params { } } +/// Please note while [WritableToBeBytes] is implemented for [Params], the default implementation +/// will not be able to process store parameters. +impl WritableToBeBytes for Params { + fn raw_len(&self) -> usize { + match self { + Params::Heapless(p) => match p { + ParamsHeapless::Raw(raw) => raw.raw_len(), + ParamsHeapless::EcssEnum(enumeration) => enumeration.raw_len(), + }, + Params::Store(_) => 0, + Params::Vec(vec) => vec.len(), + Params::String(string) => string.len(), + } + } + + fn write_to_be_bytes(&self, buf: &mut [u8]) -> Result { + match self { + Params::Heapless(p) => match p { + ParamsHeapless::Raw(raw) => raw.write_to_be_bytes(buf), + ParamsHeapless::EcssEnum(enumeration) => enumeration.write_to_be_bytes(buf), + }, + Params::Store(_) => Ok(0), + Params::Vec(vec) => { + // TODO: size checks. + buf[0..vec.len()].copy_from_slice(vec); + Ok(vec.len()) + } + Params::String(string) => { + // TODO: size checks + buf[0..string.len()].copy_from_slice(string.as_bytes()); + Ok(string.len()) + } + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/satrs/src/pus/action.rs b/satrs/src/pus/action.rs index 2ee4815..1526843 100644 --- a/satrs/src/pus/action.rs +++ b/satrs/src/pus/action.rs @@ -1,7 +1,15 @@ -use crate::{action::ActionRequest, TargetId}; +use crate::{ + action::{ActionId, ActionRequest}, + params::Params, + request::RequestId, + TargetId, +}; use super::verification::{TcStateAccepted, VerificationToken}; +use satrs_shared::res_code::ResultU16; +use spacepackets::ecss::EcssEnumU16; + #[cfg(feature = "std")] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] pub use std_mod::*; @@ -10,6 +18,38 @@ pub use std_mod::*; #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] pub use alloc_mod::*; +#[derive(Clone, Debug)] +pub struct ActionRequestWithId { + pub request_id: RequestId, + pub request: ActionRequest, +} + +#[derive(Clone, Debug)] +pub struct ActionReplyPusWithIds { + pub request_id: RequestId, + pub action_id: ActionId, + pub reply: ActionReplyPus, +} + +/// A reply to an action request, but tailored to the PUS standard verification process. +#[non_exhaustive] +#[derive(Clone, Debug)] +pub enum ActionReplyPus { + Completed, + StepSuccess { + step: EcssEnumU16, + }, + CompletionFailed { + error_code: ResultU16, + params: Params, + }, + StepFailed { + error_code: ResultU16, + step: EcssEnumU16, + params: Params, + }, +} + /// This trait is an abstraction for the routing of PUS service 8 action requests to a dedicated /// recipient using the generic [TargetId]. pub trait PusActionRequestRouter { @@ -61,11 +101,22 @@ pub mod alloc_mod { #[cfg(feature = "std")] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] pub mod std_mod { - use crate::pus::{ - get_current_cds_short_timestamp, verification::VerificationReportingProvider, - EcssTcInMemConverter, EcssTcReceiverCore, EcssTmSenderCore, GenericRoutingError, - PusPacketHandlerResult, PusPacketHandlingError, PusRoutingErrorHandler, PusServiceHelper, + use crate::{ + params::WritableToBeBytes, + pus::{ + get_current_cds_short_timestamp, + verification::{ + self, FailParams, FailParamsWithStep, TcStateStarted, VerificationReportingProvider, + }, + EcssTcInMemConverter, EcssTcReceiverCore, EcssTmSenderCore, EcssTmtcError, + GenericRoutingError, PusPacketHandlerResult, PusPacketHandlingError, + PusRoutingErrorHandler, PusServiceHelper, + }, + request::RequestId, }; + use hashbrown::HashMap; + use spacepackets::time::UnixTimestamp; + use std::time::SystemTimeError; use super::*; @@ -175,6 +226,130 @@ pub mod std_mod { Ok(PusPacketHandlerResult::RequestHandled) } } + + #[derive(Debug, Clone, PartialEq, Eq)] + pub struct ActiveRequest { + token: VerificationToken, + start_time: UnixTimestamp, + timeout_seconds: u32, + } + + pub struct PusService8ReplyHandler { + active_requests: HashMap, + verification_reporter: VerificationReporter, + fail_data_buf: alloc::vec::Vec, + current_time: UnixTimestamp, + } + + impl + PusService8ReplyHandler + { + pub fn add_routed_request( + &mut self, + request_id: verification::RequestId, + token: VerificationToken, + timeout_seconds: u32, + ) { + self.active_requests.insert( + request_id.into(), + ActiveRequest { + token, + start_time: self.current_time, + timeout_seconds, + }, + ); + } + + #[cfg(feature = "std")] + #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] + pub fn update_time_from_now(&mut self) -> Result<(), SystemTimeError> { + self.current_time = UnixTimestamp::from_now()?; + Ok(()) + } + + pub fn check_for_timeouts(&mut self, time_stamp: &[u8]) -> Result<(), EcssTmtcError> { + for (_req_id, active_req) in self.active_requests.iter() { + // TODO: Simplified until spacepackets update. + let diff = + (self.current_time.unix_seconds - active_req.start_time.unix_seconds) as u32; + if diff > active_req.timeout_seconds { + self.handle_timeout(active_req, time_stamp); + } + } + Ok(()) + } + + pub fn handle_timeout(&self, active_request: &ActiveRequest, time_stamp: &[u8]) { + self.verification_reporter + .completion_failure( + active_request.token, + FailParams::new( + time_stamp, + // WTF, what failure code? + &ResultU16::new(0, 0), + &[], + ), + ) + .unwrap(); + } + + pub fn handle_action_reply( + &mut self, + action_reply_with_ids: ActionReplyPusWithIds, + time_stamp: &[u8], + ) -> Result<(), EcssTmtcError> { + let active_req = self.active_requests.get(&action_reply_with_ids.request_id); + if active_req.is_none() { + // TODO: This is an unexpected reply. We need to deal with this somehow. + } + let active_req = active_req.unwrap(); + match action_reply_with_ids.reply { + ActionReplyPus::CompletionFailed { error_code, params } => { + params.write_to_be_bytes(&mut self.fail_data_buf)?; + self.verification_reporter + .completion_failure( + active_req.token, + FailParams::new(time_stamp, &error_code, &self.fail_data_buf), + ) + .map_err(|e| e.0)?; + self.active_requests + .remove(&action_reply_with_ids.request_id); + } + ActionReplyPus::StepFailed { + error_code, + step, + params, + } => { + params.write_to_be_bytes(&mut self.fail_data_buf)?; + self.verification_reporter + .step_failure( + active_req.token, + FailParamsWithStep::new( + time_stamp, + &step, + &error_code, + &self.fail_data_buf, + ), + ) + .map_err(|e| e.0)?; + self.active_requests + .remove(&action_reply_with_ids.request_id); + } + ActionReplyPus::Completed => { + self.verification_reporter + .completion_success(active_req.token, time_stamp) + .map_err(|e| e.0)?; + self.active_requests + .remove(&action_reply_with_ids.request_id); + } + ActionReplyPus::StepSuccess { step } => { + self.verification_reporter + .step_success(&active_req.token, time_stamp, step)?; + } + } + Ok(()) + } + } } #[cfg(test)] diff --git a/satrs/src/pus/mod.rs b/satrs/src/pus/mod.rs index ba0ff1d..77a948c 100644 --- a/satrs/src/pus/mod.rs +++ b/satrs/src/pus/mod.rs @@ -4,7 +4,7 @@ //! The satrs-example application contains various usage examples of these components. use crate::pool::{StoreAddr, StoreError}; use crate::pus::verification::{TcStateAccepted, TcStateToken, VerificationToken}; -use crate::queue::{GenericRecvError, GenericSendError}; +use crate::queue::{GenericReceiveError, GenericSendError}; use crate::ChannelId; use core::fmt::{Display, Formatter}; #[cfg(feature = "alloc")] @@ -59,26 +59,26 @@ impl<'tm> From> for PusTmWrapper<'tm> { #[derive(Debug, Clone)] pub enum EcssTmtcError { - StoreLock, Store(StoreError), + ByteConversion(ByteConversionError), Pus(PusError), CantSendAddr(StoreAddr), CantSendDirectTm, Send(GenericSendError), - Recv(GenericRecvError), + Receive(GenericReceiveError), } impl Display for EcssTmtcError { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { match self { - EcssTmtcError::StoreLock => { - write!(f, "store lock error") - } EcssTmtcError::Store(store) => { - write!(f, "store error: {store}") + write!(f, "ecss tmtc error: {store}") } - EcssTmtcError::Pus(pus_e) => { - write!(f, "PUS error: {pus_e}") + EcssTmtcError::ByteConversion(e) => { + write!(f, "ecss tmtc error: {e}") + } + EcssTmtcError::Pus(e) => { + write!(f, "ecss tmtc error: {e}") } EcssTmtcError::CantSendAddr(addr) => { write!(f, "can not send address {addr}") @@ -86,11 +86,11 @@ impl Display for EcssTmtcError { EcssTmtcError::CantSendDirectTm => { write!(f, "can not send TM directly") } - EcssTmtcError::Send(send_e) => { - write!(f, "send error {send_e}") + EcssTmtcError::Send(e) => { + write!(f, "ecss tmtc error: {e}") } - EcssTmtcError::Recv(recv_e) => { - write!(f, "recv error {recv_e}") + EcssTmtcError::Receive(e) => { + write!(f, "ecss tmtc error {e}") } } } @@ -114,9 +114,15 @@ impl From for EcssTmtcError { } } -impl From for EcssTmtcError { - fn from(value: GenericRecvError) -> Self { - Self::Recv(value) +impl From for EcssTmtcError { + fn from(value: ByteConversionError) -> Self { + Self::ByteConversion(value) + } +} + +impl From for EcssTmtcError { + fn from(value: GenericReceiveError) -> Self { + Self::Receive(value) } } @@ -125,9 +131,10 @@ impl Error for EcssTmtcError { fn source(&self) -> Option<&(dyn Error + 'static)> { match self { EcssTmtcError::Store(e) => Some(e), + EcssTmtcError::ByteConversion(e) => Some(e), EcssTmtcError::Pus(e) => Some(e), EcssTmtcError::Send(e) => Some(e), - EcssTmtcError::Recv(e) => Some(e), + EcssTmtcError::Receive(e) => Some(e), _ => None, } } @@ -368,11 +375,13 @@ mod alloc_mod { #[cfg(feature = "std")] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] pub mod std_mod { - use crate::pool::{PoolProvider, PoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr}; + use crate::pool::{ + PoolProvider, PoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr, StoreError, + }; use crate::pus::verification::{TcStateAccepted, VerificationToken}; use crate::pus::{ EcssChannel, EcssTcAndToken, EcssTcReceiverCore, EcssTmSenderCore, EcssTmtcError, - GenericRecvError, GenericSendError, PusTmWrapper, TryRecvTmtcError, + GenericReceiveError, GenericSendError, PusTmWrapper, TryRecvTmtcError, }; use crate::tmtc::tm_helper::SharedTmPool; use crate::{ChannelId, TargetId}; @@ -563,9 +572,9 @@ pub mod std_mod { fn recv_tc(&self) -> Result { self.receiver.try_recv().map_err(|e| match e { TryRecvError::Empty => TryRecvTmtcError::Empty, - TryRecvError::Disconnected => { - TryRecvTmtcError::Tmtc(EcssTmtcError::from(GenericRecvError::TxDisconnected)) - } + TryRecvError::Disconnected => TryRecvTmtcError::Tmtc(EcssTmtcError::from( + GenericReceiveError::TxDisconnected(Some(self.channel_id())), + )), }) } } @@ -659,7 +668,7 @@ pub mod std_mod { self.receiver.try_recv().map_err(|e| match e { cb::TryRecvError::Empty => TryRecvTmtcError::Empty, cb::TryRecvError::Disconnected => TryRecvTmtcError::Tmtc(EcssTmtcError::from( - GenericRecvError::TxDisconnected, + GenericReceiveError::TxDisconnected(Some(self.id())), )), }) } @@ -805,10 +814,9 @@ pub mod std_mod { pub fn copy_tc_to_buf(&mut self, addr: StoreAddr) -> Result<(), PusPacketHandlingError> { // Keep locked section as short as possible. - let mut tc_pool = self - .shared_tc_store - .write() - .map_err(|_| PusPacketHandlingError::EcssTmtc(EcssTmtcError::StoreLock))?; + let mut tc_pool = self.shared_tc_store.write().map_err(|_| { + PusPacketHandlingError::EcssTmtc(EcssTmtcError::Store(StoreError::LockError)) + })?; let tc_size = tc_pool .len_of_data(&addr) .map_err(|e| PusPacketHandlingError::EcssTmtc(EcssTmtcError::Store(e)))?; diff --git a/satrs/src/pus/verification.rs b/satrs/src/pus/verification.rs index 89ce9b9..2c05482 100644 --- a/satrs/src/pus/verification.rs +++ b/satrs/src/pus/verification.rs @@ -173,6 +173,22 @@ impl RequestId { } } +impl From for RequestId { + fn from(value: u32) -> Self { + Self { + version_number: ((value >> 29) & 0b111) as u8, + packet_id: PacketId::from(((value >> 16) & 0xffff) as u16), + psc: PacketSequenceCtrl::from((value & 0xffff) as u16), + } + } +} + +impl From for u32 { + fn from(value: RequestId) -> Self { + value.raw() + } +} + /// If a verification operation fails, the passed token will be returned as well. This allows /// re-trying the operation at a later point. #[derive(Debug, Clone)] diff --git a/satrs/src/queue.rs b/satrs/src/queue.rs index 5ba4bdc..7be4551 100644 --- a/satrs/src/queue.rs +++ b/satrs/src/queue.rs @@ -4,11 +4,15 @@ use std::error::Error; #[cfg(feature = "std")] use std::sync::mpsc; +/// Generic channel ID type. +pub type ChannelId = u32; + /// Generic error type for sending something via a message queue. #[derive(Debug, Copy, Clone)] pub enum GenericSendError { RxDisconnected, QueueFull(Option), + TargetDoesNotExist(ChannelId), } impl Display for GenericSendError { @@ -20,6 +24,9 @@ impl Display for GenericSendError { GenericSendError::QueueFull(max_cap) => { write!(f, "queue with max capacity of {max_cap:?} is full") } + GenericSendError::TargetDoesNotExist(target) => { + write!(f, "target queue with ID {target} does not exist") + } } } } @@ -29,16 +36,16 @@ impl Error for GenericSendError {} /// Generic error type for sending something via a message queue. #[derive(Debug, Copy, Clone)] -pub enum GenericRecvError { +pub enum GenericReceiveError { Empty, - TxDisconnected, + TxDisconnected(Option), } -impl Display for GenericRecvError { +impl Display for GenericReceiveError { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { match self { - Self::TxDisconnected => { - write!(f, "tx side has disconnected") + Self::TxDisconnected(channel_id) => { + write!(f, "tx side with id {channel_id:?} has disconnected") } Self::Empty => { write!(f, "nothing to receive") @@ -48,7 +55,24 @@ impl Display for GenericRecvError { } #[cfg(feature = "std")] -impl Error for GenericRecvError {} +impl Error for GenericReceiveError {} + +#[derive(Debug, Clone)] +pub enum GenericTargetedMessagingError { + Send(GenericSendError), + Receive(GenericReceiveError), +} +impl From for GenericTargetedMessagingError { + fn from(value: GenericSendError) -> Self { + Self::Send(value) + } +} + +impl From for GenericTargetedMessagingError { + fn from(value: GenericReceiveError) -> Self { + Self::Receive(value) + } +} #[cfg(feature = "std")] impl From> for GenericSendError { diff --git a/satrs/src/request.rs b/satrs/src/request.rs index 24ca497..438cc25 100644 --- a/satrs/src/request.rs +++ b/satrs/src/request.rs @@ -2,13 +2,22 @@ use core::fmt; #[cfg(feature = "std")] use std::error::Error; +#[cfg(feature = "std")] +#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] +pub use std_mod::*; + use spacepackets::{ ecss::{tc::IsPusTelecommand, PusPacket}, ByteConversionError, CcsdsPacket, }; -use crate::TargetId; +use crate::{queue::GenericTargetedMessagingError, ChannelId, TargetId}; +/// Generic request ID type. Requests can be associated with an ID to have a unique identifier +/// for them. This can be useful for tasks like tracking their progress. +pub type RequestId = u32; + +/// CCSDS APID type definition. Please note that the APID is a 14 bit value. pub type Apid = u16; #[derive(Debug, Clone, PartialEq, Eq)] @@ -108,3 +117,269 @@ impl fmt::Display for TargetAndApidId { write!(f, "{}, {}", self.apid, self.target) } } + +/// Generic message type which is associated with a sender using a [ChannelId] and associated +/// with a request using a [RequestId]. +pub struct GenericMessage { + pub sender_id: ChannelId, + pub request_id: RequestId, + pub message: MSG, +} + +impl GenericMessage { + pub fn new(request_id: RequestId, sender_id: ChannelId, message: MSG) -> Self { + Self { + request_id, + sender_id, + message, + } + } +} + +/// Generic trait for objects which can send targeted messages. +pub trait MessageSender: Send { + fn send(&self, message: GenericMessage) -> Result<(), GenericTargetedMessagingError>; +} + +// Generic trait for objects which can receive targeted messages. +pub trait MessageReceiver { + fn try_recv(&self) -> Result>, GenericTargetedMessagingError>; +} + +#[cfg(feature = "std")] +#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] +mod std_mod { + use core::marker::PhantomData; + use std::sync::mpsc; + + use hashbrown::HashMap; + + use crate::{ + queue::{GenericReceiveError, GenericSendError, GenericTargetedMessagingError}, + ChannelId, + }; + + use super::{GenericMessage, MessageReceiver, MessageSender, RequestId}; + + impl MessageSender for mpsc::Sender> { + fn send(&self, message: GenericMessage) -> Result<(), GenericTargetedMessagingError> { + self.send(message) + .map_err(|_| GenericSendError::RxDisconnected)?; + Ok(()) + } + } + impl MessageSender for mpsc::SyncSender> { + fn send(&self, message: GenericMessage) -> Result<(), GenericTargetedMessagingError> { + if let Err(e) = self.try_send(message) { + match e { + mpsc::TrySendError::Full(_) => { + return Err(GenericSendError::QueueFull(None).into()); + } + mpsc::TrySendError::Disconnected(_) => todo!(), + } + } + Ok(()) + } + } + + pub struct MessageSenderMap>( + pub HashMap, + PhantomData, + ); + + pub type MpscSenderMap = MessageReceiverWithId>; + pub type MpscBoundedSenderMap = MessageReceiverWithId>; + + impl> Default for MessageSenderMap { + fn default() -> Self { + Self(Default::default(), PhantomData) + } + } + + impl> MessageSenderMap { + pub fn add_message_target(&mut self, target_id: ChannelId, message_sender: S) { + self.0.insert(target_id, message_sender); + } + + pub fn send_message( + &self, + request_id: RequestId, + local_channel_id: ChannelId, + target_channel_id: ChannelId, + message: MSG, + ) -> Result<(), GenericTargetedMessagingError> { + if self.0.contains_key(&target_channel_id) { + self.0 + .get(&target_channel_id) + .unwrap() + .send(GenericMessage::new(request_id, local_channel_id, message)) + .map_err(|_| GenericSendError::RxDisconnected)?; + return Ok(()); + } + Err(GenericSendError::TargetDoesNotExist(target_channel_id).into()) + } + } + + pub struct MessageSenderMapWithId> { + pub local_channel_id: ChannelId, + pub message_sender_map: MessageSenderMap, + } + + impl> MessageSenderMapWithId { + pub fn new(local_channel_id: ChannelId) -> Self { + Self { + local_channel_id, + message_sender_map: Default::default(), + } + } + + pub fn send_message( + &self, + request_id: RequestId, + target_channel_id: ChannelId, + message: MSG, + ) -> Result<(), GenericTargetedMessagingError> { + self.message_sender_map.send_message( + request_id, + self.local_channel_id, + target_channel_id, + message, + ) + } + + pub fn add_message_target(&mut self, target_id: ChannelId, message_sender: S) { + self.message_sender_map + .add_message_target(target_id, message_sender) + } + } + + impl MessageReceiver for mpsc::Receiver> { + fn try_recv(&self) -> Result>, GenericTargetedMessagingError> { + match self.try_recv() { + Ok(msg) => Ok(Some(msg)), + Err(e) => match e { + mpsc::TryRecvError::Empty => Ok(None), + mpsc::TryRecvError::Disconnected => { + Err(GenericReceiveError::TxDisconnected(None).into()) + } + }, + } + } + } + + pub struct MessageWithSenderIdReceiver>(pub R, PhantomData); + + impl> From for MessageWithSenderIdReceiver { + fn from(receiver: R) -> Self { + MessageWithSenderIdReceiver(receiver, PhantomData) + } + } + + impl> MessageWithSenderIdReceiver { + pub fn try_recv_message( + &self, + _local_id: ChannelId, + ) -> Result>, GenericTargetedMessagingError> { + self.0.try_recv() + } + } + + pub struct MessageReceiverWithId> { + local_channel_id: ChannelId, + reply_receiver: MessageWithSenderIdReceiver, + } + + pub type MpscMessageReceiverWithId = MessageReceiverWithId>; + + impl> MessageReceiverWithId { + pub fn new( + local_channel_id: ChannelId, + reply_receiver: MessageWithSenderIdReceiver, + ) -> Self { + Self { + local_channel_id, + reply_receiver, + } + } + + pub fn local_channel_id(&self) -> ChannelId { + self.local_channel_id + } + } + + impl> MessageReceiverWithId { + pub fn try_recv_message( + &self, + ) -> Result>, GenericTargetedMessagingError> { + self.reply_receiver.0.try_recv() + } + } + + pub struct MessageSenderAndReceiver, R: MessageReceiver> { + pub local_channel_id: ChannelId, + pub message_sender_map: MessageSenderMap, + pub message_receiver: MessageWithSenderIdReceiver, + } + + impl, R: MessageReceiver> + MessageSenderAndReceiver + { + pub fn new(local_channel_id: ChannelId, message_receiver: R) -> Self { + Self { + local_channel_id, + message_sender_map: Default::default(), + message_receiver: MessageWithSenderIdReceiver::from(message_receiver), + } + } + + pub fn add_message_target(&mut self, target_id: ChannelId, message_sender: S) { + self.message_sender_map + .add_message_target(target_id, message_sender) + } + + pub fn local_channel_id_generic(&self) -> ChannelId { + self.local_channel_id + } + } + + pub struct RequestAndReplySenderAndReceiver< + REQUEST, + REPLY, + S0: MessageSender, + R0: MessageReceiver, + S1: MessageSender, + R1: MessageReceiver, + > { + pub local_channel_id: ChannelId, + // These 2 are a functional group. + pub request_sender_map: MessageSenderMap, + pub reply_receiver: MessageWithSenderIdReceiver, + // These 2 are a functional group. + pub request_receiver: MessageWithSenderIdReceiver, + pub reply_sender_map: MessageSenderMap, + } + + impl< + REQUEST, + REPLY, + S0: MessageSender, + R0: MessageReceiver, + S1: MessageSender, + R1: MessageReceiver, + > RequestAndReplySenderAndReceiver + { + pub fn new(local_channel_id: ChannelId, request_receiver: R1, reply_receiver: R0) -> Self { + Self { + local_channel_id, + request_receiver: request_receiver.into(), + reply_receiver: reply_receiver.into(), + request_sender_map: Default::default(), + reply_sender_map: Default::default(), + } + } + + pub fn local_channel_id_generic(&self) -> ChannelId { + self.local_channel_id + } + } +} diff --git a/satrs/src/tmtc/tm_helper.rs b/satrs/src/tmtc/tm_helper.rs index 192d574..9de359b 100644 --- a/satrs/src/tmtc/tm_helper.rs +++ b/satrs/src/tmtc/tm_helper.rs @@ -8,7 +8,9 @@ pub use std_mod::*; #[cfg(feature = "std")] pub mod std_mod { - use crate::pool::{PoolProvider, SharedStaticMemoryPool, StaticMemoryPool, StoreAddr}; + use crate::pool::{ + PoolProvider, SharedStaticMemoryPool, StaticMemoryPool, StoreAddr, StoreError, + }; use crate::pus::EcssTmtcError; use spacepackets::ecss::tm::PusTmCreator; use spacepackets::ecss::WritablePusPacket; @@ -34,7 +36,7 @@ pub mod std_mod { } pub fn add_pus_tm(&self, pus_tm: &PusTmCreator) -> Result { - let mut pg = self.0.write().map_err(|_| EcssTmtcError::StoreLock)?; + let mut pg = self.0.write().map_err(|_| StoreError::LockError)?; let addr = pg.free_element(pus_tm.len_written(), |buf| { pus_tm .write_to_bytes(buf) diff --git a/satrs/tests/mode_tree.rs b/satrs/tests/mode_tree.rs new file mode 100644 index 0000000..a66b774 --- /dev/null +++ b/satrs/tests/mode_tree.rs @@ -0,0 +1,322 @@ +use core::cell::Cell; +use std::{println, sync::mpsc}; + +use satrs::mode_tree::ModeRequestSender; +use satrs::request::RequestId; +use satrs::{ + mode::{ModeAndSubmode, ModeReply, ModeRequest}, + mode_tree::{ + ModeError, ModeProvider, ModeReplyReceiver, ModeReplySender, ModeRequestHandler, + ModeRequestReceiver, MpscBoundedModeConnector, MpscBoundedModeRequestHandlerConnector, + MpscBoundedModeRequestorConnector, + }, + queue::GenericTargetedMessagingError, + request::GenericMessage, + ChannelId, +}; +use std::string::{String, ToString}; + +pub enum TestChannelId { + Device1 = 1, + Device2 = 2, + Assembly = 3, + PusModeService = 4, +} + +struct PusModeService { + pub request_id_counter: Cell, + pub mode_node: MpscBoundedModeRequestorConnector, +} + +impl PusModeService { + pub fn send_announce_mode_cmd_to_assy(&self) { + self.mode_node + .send_mode_request( + self.request_id_counter.get(), + TestChannelId::Assembly as u32, + ModeRequest::AnnounceModeRecursive, + ) + .unwrap(); + self.request_id_counter + .replace(self.request_id_counter.get() + 1); + } +} + +struct TestDevice { + pub name: String, + pub mode_node: MpscBoundedModeRequestHandlerConnector, + pub mode_and_submode: ModeAndSubmode, + pub mode_requestor_info: Option<(RequestId, ChannelId)>, +} + +impl TestDevice { + pub fn run(&mut self) { + self.check_mode_requests().expect("mode messaging error"); + } + + pub fn check_mode_requests(&mut self) -> Result<(), GenericTargetedMessagingError> { + if let Some(request) = self.mode_node.try_recv_mode_request()? { + match request.message { + ModeRequest::SetMode(mode_and_submode) => { + self.start_transition(request.request_id, request.sender_id, mode_and_submode) + .unwrap(); + self.mode_requestor_info = Some((request.request_id, request.sender_id)); + } + ModeRequest::ReadMode => self + .mode_node + .send_mode_reply( + request.request_id, + request.sender_id, + ModeReply::ModeReply(self.mode_and_submode), + ) + .unwrap(), + ModeRequest::AnnounceMode => { + self.announce_mode(request.request_id, request.sender_id, false) + } + ModeRequest::AnnounceModeRecursive => { + self.announce_mode(request.request_id, request.sender_id, true) + } + } + } + Ok(()) + } +} + +impl ModeProvider for TestDevice { + fn mode_and_submode(&self) -> ModeAndSubmode { + self.mode_and_submode + } +} +impl ModeRequestHandler for TestDevice { + fn start_transition( + &mut self, + _request_id: RequestId, + _sender_id: ChannelId, + mode_and_submode: ModeAndSubmode, + ) -> Result<(), ModeError> { + self.mode_and_submode = mode_and_submode; + self.handle_mode_reached()?; + Ok(()) + } + + fn announce_mode(&self, _request_id: RequestId, _sender_id: ChannelId, _recursive: bool) { + println!( + "{}: announcing mode: {:?}", + self.name, self.mode_and_submode + ); + } + + fn handle_mode_reached(&mut self) -> Result<(), GenericTargetedMessagingError> { + let (req_id, sender_id) = self.mode_requestor_info.unwrap(); + self.mode_node.send_mode_reply( + req_id, + sender_id, + ModeReply::ModeReply(self.mode_and_submode), + )?; + Ok(()) + } +} + +struct TestAssembly { + pub mode_node: MpscBoundedModeConnector, + pub mode_requestor_info: Option<(RequestId, ChannelId)>, + pub mode_and_submode: ModeAndSubmode, + pub target_mode_and_submode: Option, +} + +impl ModeProvider for TestAssembly { + fn mode_and_submode(&self) -> ModeAndSubmode { + self.mode_and_submode + } +} + +impl TestAssembly { + pub fn run(&mut self) { + self.check_mode_requests().expect("mode messaging error"); + self.check_mode_replies().expect("mode messaging error"); + } + + pub fn check_mode_requests(&mut self) -> Result<(), GenericTargetedMessagingError> { + if let Some(request) = self.mode_node.try_recv_mode_request()? { + match request.message { + ModeRequest::SetMode(mode_and_submode) => { + self.start_transition(request.request_id, request.sender_id, mode_and_submode) + .unwrap(); + } + ModeRequest::ReadMode => self + .mode_node + .send_mode_reply( + request.request_id, + request.sender_id, + ModeReply::ModeReply(self.mode_and_submode), + ) + .unwrap(), + ModeRequest::AnnounceMode => { + self.announce_mode(request.request_id, request.sender_id, false) + } + ModeRequest::AnnounceModeRecursive => { + self.announce_mode(request.request_id, request.sender_id, true) + } + } + } + Ok(()) + } + + pub fn check_mode_replies(&mut self) -> Result<(), GenericTargetedMessagingError> { + if let Some(reply_and_id) = self.mode_node.try_recv_mode_reply()? { + match reply_and_id.message { + ModeReply::ModeInfo(_) => todo!(), + ModeReply::ModeReply(reply) => { + println!( + "TestAssembly: Received mode reply from {:?}, reached: {:?}", + reply_and_id.sender_id, reply + ); + } + ModeReply::CantReachMode(_) => todo!(), + ModeReply::WrongMode { expected, reached } => { + println!( + "TestAssembly: Wrong mode reply from {:?}, reached {:?}, expected {:?}", + reply_and_id.sender_id, reached, expected + ); + } + } + } + Ok(()) + } +} + +impl ModeRequestHandler for TestAssembly { + fn start_transition( + &mut self, + request_id: RequestId, + sender_id: ChannelId, + mode_and_submode: ModeAndSubmode, + ) -> Result<(), ModeError> { + self.mode_requestor_info = Some((request_id, sender_id)); + self.target_mode_and_submode = Some(mode_and_submode); + Ok(()) + } + + fn announce_mode(&self, request_id: RequestId, _sender_id: ChannelId, recursive: bool) { + println!( + "TestAssembly: Announcing mode (recursively: {}): {:?}", + recursive, self.mode_and_submode + ); + // self.mode_requestor_info = Some((request_id, sender_id)); + let mut mode_request = ModeRequest::AnnounceMode; + if recursive { + mode_request = ModeRequest::AnnounceModeRecursive; + } + self.mode_node + .request_sender_map + .0 + .iter() + .for_each(|(_, sender)| { + sender + .send(GenericMessage::new( + request_id, + self.mode_node.local_channel_id_generic(), + mode_request, + )) + .expect("sending mode request failed"); + }); + } + + fn handle_mode_reached(&mut self) -> Result<(), GenericTargetedMessagingError> { + let (req_id, sender_id) = self.mode_requestor_info.unwrap(); + self.mode_node.send_mode_reply( + req_id, + sender_id, + ModeReply::ModeReply(self.mode_and_submode), + )?; + Ok(()) + } +} + +fn main() { + // All request channel handles. + let (request_sender_to_dev1, request_receiver_dev1) = mpsc::sync_channel(10); + let (request_sender_to_dev2, request_receiver_dev2) = mpsc::sync_channel(10); + let (request_sender_to_assy, request_receiver_assy) = mpsc::sync_channel(10); + + // All reply channel handles. + let (reply_sender_to_assy, reply_receiver_assy) = mpsc::sync_channel(10); + let (reply_sender_to_pus, reply_receiver_pus) = mpsc::sync_channel(10); + + // Mode requestors and handlers. + let mut mode_node_assy = MpscBoundedModeConnector::new( + TestChannelId::Assembly as u32, + request_receiver_assy, + reply_receiver_assy, + ); + // Mode requestors only. + let mut mode_node_pus = MpscBoundedModeRequestorConnector::new( + TestChannelId::PusModeService as u32, + reply_receiver_pus, + ); + + // Request handlers only. + let mut mode_node_dev1 = MpscBoundedModeRequestHandlerConnector::new( + TestChannelId::Device1 as u32, + request_receiver_dev1, + ); + let mut mode_node_dev2 = MpscBoundedModeRequestHandlerConnector::new( + TestChannelId::Device2 as u32, + request_receiver_dev2, + ); + + // Set up mode request senders first. + mode_node_pus.add_message_target(TestChannelId::Assembly as u32, request_sender_to_assy); + mode_node_pus.add_message_target( + TestChannelId::Device1 as u32, + request_sender_to_dev1.clone(), + ); + mode_node_pus.add_message_target( + TestChannelId::Device2 as u32, + request_sender_to_dev2.clone(), + ); + mode_node_assy.add_request_target(TestChannelId::Device1 as u32, request_sender_to_dev1); + mode_node_assy.add_request_target(TestChannelId::Device2 as u32, request_sender_to_dev2); + + // Set up mode reply senders. + mode_node_dev1.add_message_target(TestChannelId::Assembly as u32, reply_sender_to_assy.clone()); + mode_node_dev1.add_message_target( + TestChannelId::PusModeService as u32, + reply_sender_to_pus.clone(), + ); + mode_node_dev2.add_message_target(TestChannelId::Assembly as u32, reply_sender_to_assy); + mode_node_dev2.add_message_target( + TestChannelId::PusModeService as u32, + reply_sender_to_pus.clone(), + ); + mode_node_assy.add_reply_target(TestChannelId::PusModeService as u32, reply_sender_to_pus); + + let mut device1 = TestDevice { + name: "Test Device 1".to_string(), + mode_node: mode_node_dev1, + mode_requestor_info: None, + mode_and_submode: ModeAndSubmode::new(0, 0), + }; + let mut device2 = TestDevice { + name: "Test Device 2".to_string(), + mode_node: mode_node_dev2, + mode_requestor_info: None, + mode_and_submode: ModeAndSubmode::new(0, 0), + }; + let mut assy = TestAssembly { + mode_node: mode_node_assy, + mode_requestor_info: None, + mode_and_submode: ModeAndSubmode::new(0, 0), + target_mode_and_submode: None, + }; + let pus_service = PusModeService { + request_id_counter: Cell::new(0), + mode_node: mode_node_pus, + }; + + pus_service.send_announce_mode_cmd_to_assy(); + assy.run(); + device1.run(); + device2.run(); + assy.run(); +}