use core::{fmt, marker::PhantomData}; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; #[cfg(feature = "alloc")] pub use alloc_mod::*; #[cfg(feature = "std")] pub use std_mod::*; use spacepackets::{ ecss::{tc::IsPusTelecommand, PusPacket}, ByteConversionError, }; use crate::{queue::GenericTargetedMessagingError, ComponentId}; /// Generic request ID type. Requests can be associated with an ID to have a unique identifier /// for them. This can be useful for tasks like tracking their progress. pub type RequestId = u32; /// CCSDS APID type definition. Please note that the APID is a 14 bit value. pub type Apid = u16; #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)] pub struct UniqueApidTargetId { pub apid: Apid, pub unique_id: u32, } impl UniqueApidTargetId { pub const fn new(apid: Apid, target: u32) -> Self { Self { apid, unique_id: target, } } pub fn raw(&self) -> ComponentId { ((self.apid as u64) << 32) | (self.unique_id as u64) } pub fn id(&self) -> ComponentId { self.raw() } /// This function attempts to build the ID from a PUS telecommand by extracting the APID /// and the first four bytes of the application data field as the target field. pub fn from_pus_tc( tc: &(impl PusPacket + IsPusTelecommand), ) -> Result { if tc.user_data().len() < 4 { return Err(ByteConversionError::FromSliceTooSmall { found: tc.user_data().len(), expected: 4, }); } Ok(Self::new( tc.apid(), u32::from_be_bytes(tc.user_data()[0..4].try_into().unwrap()), )) } } impl From for UniqueApidTargetId { fn from(raw: u64) -> Self { Self { apid: (raw >> 32) as u16, unique_id: raw as u32, } } } impl From for u64 { fn from(target_and_apid_id: UniqueApidTargetId) -> Self { target_and_apid_id.raw() } } impl fmt::Display for UniqueApidTargetId { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "Target and APID ID with APID {:#03x} and target {}", self.apid, self.unique_id ) } } /// This contains metadata information which might be useful when used together with a /// generic message tpye. /// /// This could for example be used to build request/reply patterns or state tracking for request. #[derive(Debug, Copy, PartialEq, Eq, Clone)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct MessageMetadata { request_id: RequestId, sender_id: ComponentId, } impl MessageMetadata { pub const fn new(request_id: RequestId, sender_id: ComponentId) -> Self { Self { request_id, sender_id, } } pub fn request_id(&self) -> RequestId { self.request_id } pub fn sender_id(&self) -> ComponentId { self.sender_id } } /// Generic message type which adds [metadata][MessageMetadata] to a generic message typ. #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct GenericMessage { pub requestor_info: MessageMetadata, pub message: Message, } impl GenericMessage { pub fn new(requestor_info: MessageMetadata, message: Message) -> Self { Self { requestor_info, message, } } delegate::delegate! { to self.requestor_info { pub fn request_id(&self) -> RequestId; pub fn sender_id(&self) -> ComponentId; } } } /// Generic trait for objects which can send targeted messages. pub trait MessageSenderProvider: Send { fn send(&self, message: GenericMessage) -> Result<(), GenericTargetedMessagingError>; } // Generic trait for objects which can receive targeted messages. pub trait MessageReceiverProvider { fn try_recv(&self) -> Result>, GenericTargetedMessagingError>; } pub struct MessageWithSenderIdReceiver>( pub Receiver, PhantomData, ); impl> From for MessageWithSenderIdReceiver { fn from(receiver: R) -> Self { MessageWithSenderIdReceiver(receiver, PhantomData) } } impl> MessageWithSenderIdReceiver { pub fn try_recv_message( &self, ) -> Result>, GenericTargetedMessagingError> { self.0.try_recv() } } pub struct MessageReceiverWithId> { local_channel_id: ComponentId, reply_receiver: MessageWithSenderIdReceiver, } impl> MessageReceiverWithId { pub fn new(local_channel_id: ComponentId, reply_receiver: R) -> Self { Self { local_channel_id, reply_receiver: MessageWithSenderIdReceiver::from(reply_receiver), } } pub fn local_channel_id(&self) -> ComponentId { self.local_channel_id } } impl> MessageReceiverWithId { pub fn try_recv_message( &self, ) -> Result>, GenericTargetedMessagingError> { self.reply_receiver.0.try_recv() } } pub trait MessageSenderStoreProvider: Default { fn add_message_target(&mut self, target_id: ComponentId, message_sender: Sender); fn send_message( &self, requestor_info: MessageMetadata, target_channel_id: ComponentId, message: Message, ) -> Result<(), GenericTargetedMessagingError>; } #[cfg(feature = "alloc")] pub mod alloc_mod { use crate::queue::GenericSendError; use std::convert::From; use super::*; use hashbrown::HashMap; pub struct OneMessageSender> { pub id_and_sender: Option<(ComponentId, S)>, pub(crate) phantom: PhantomData, } impl> Default for OneMessageSender { fn default() -> Self { Self { id_and_sender: Default::default(), phantom: Default::default(), } } } impl> MessageSenderStoreProvider for OneMessageSender { fn add_message_target(&mut self, target_id: ComponentId, message_sender: Sender) { if self.id_and_sender.is_some() { return; } self.id_and_sender = Some((target_id, message_sender)); } fn send_message( &self, requestor_info: MessageMetadata, target_channel_id: ComponentId, message: Msg, ) -> Result<(), GenericTargetedMessagingError> { if let Some((current_id, sender)) = &self.id_and_sender { if *current_id == target_channel_id { sender.send(GenericMessage::new(requestor_info, message))?; return Ok(()); } } Err(GenericSendError::TargetDoesNotExist(target_channel_id).into()) } } pub struct MessageSenderList>( pub alloc::vec::Vec<(ComponentId, S)>, pub(crate) PhantomData, ); impl> Default for MessageSenderList { fn default() -> Self { Self(Default::default(), PhantomData) } } impl> MessageSenderStoreProvider for MessageSenderList { fn add_message_target(&mut self, target_id: ComponentId, message_sender: Sender) { self.0.push((target_id, message_sender)); } fn send_message( &self, requestor_info: MessageMetadata, target_channel_id: ComponentId, message: Msg, ) -> Result<(), GenericTargetedMessagingError> { for (current_id, sender) in &self.0 { if *current_id == target_channel_id { sender.send(GenericMessage::new(requestor_info, message))?; return Ok(()); } } Err(GenericSendError::TargetDoesNotExist(target_channel_id).into()) } } pub struct MessageSenderMap>( pub HashMap, pub(crate) PhantomData, ); impl> Default for MessageSenderMap { fn default() -> Self { Self(Default::default(), PhantomData) } } impl> MessageSenderStoreProvider for MessageSenderMap { fn add_message_target(&mut self, target_id: ComponentId, message_sender: Sender) { self.0.insert(target_id, message_sender); } fn send_message( &self, requestor_info: MessageMetadata, target_channel_id: ComponentId, message: Msg, ) -> Result<(), GenericTargetedMessagingError> { if self.0.contains_key(&target_channel_id) { return self .0 .get(&target_channel_id) .unwrap() .send(GenericMessage::new(requestor_info, message)); } Err(GenericSendError::TargetDoesNotExist(target_channel_id).into()) } } pub struct MessageSenderAndReceiver< To, From, Sender: MessageSenderProvider, Receiver: MessageReceiverProvider, SenderStore: MessageSenderStoreProvider, > { pub local_channel_id: ComponentId, pub message_sender_store: SenderStore, pub message_receiver: MessageWithSenderIdReceiver, pub(crate) phantom: PhantomData<(To, Sender)>, } impl< To, From, Sender: MessageSenderProvider, Receiver: MessageReceiverProvider, SenderStore: MessageSenderStoreProvider, > MessageSenderAndReceiver { pub fn new(local_channel_id: ComponentId, message_receiver: Receiver) -> Self { Self { local_channel_id, message_sender_store: Default::default(), message_receiver: MessageWithSenderIdReceiver::from(message_receiver), phantom: PhantomData, } } pub fn add_message_target(&mut self, target_id: ComponentId, message_sender: Sender) { self.message_sender_store .add_message_target(target_id, message_sender) } pub fn local_channel_id_generic(&self) -> ComponentId { self.local_channel_id } /// Try to send a message, which can be a reply or a request, depending on the generics. pub fn send_message( &self, request_id: RequestId, target_id: ComponentId, message: To, ) -> Result<(), GenericTargetedMessagingError> { self.message_sender_store.send_message( MessageMetadata::new(request_id, self.local_channel_id_generic()), target_id, message, ) } /// Try to receive a message, which can be a reply or a request, depending on the generics. pub fn try_recv_message( &self, ) -> Result>, GenericTargetedMessagingError> { self.message_receiver.try_recv_message() } } pub struct RequestAndReplySenderAndReceiver< Request, ReqSender: MessageSenderProvider, ReqReceiver: MessageReceiverProvider, ReqSenderStore: MessageSenderStoreProvider, Reply, ReplySender: MessageSenderProvider, ReplyReceiver: MessageReceiverProvider, ReplySenderStore: MessageSenderStoreProvider, > { pub local_channel_id: ComponentId, // These 2 are a functional group. pub request_sender_store: ReqSenderStore, pub reply_receiver: MessageWithSenderIdReceiver, // These 2 are a functional group. pub request_receiver: MessageWithSenderIdReceiver, pub reply_sender_store: ReplySenderStore, phantom: PhantomData<(ReqSender, ReplySender)>, } impl< Request, ReqSender: MessageSenderProvider, ReqReceiver: MessageReceiverProvider, ReqSenderStore: MessageSenderStoreProvider, Reply, ReplySender: MessageSenderProvider, ReplyReceiver: MessageReceiverProvider, ReplySenderStore: MessageSenderStoreProvider, > RequestAndReplySenderAndReceiver< Request, ReqSender, ReqReceiver, ReqSenderStore, Reply, ReplySender, ReplyReceiver, ReplySenderStore, > { pub fn new( local_channel_id: ComponentId, request_receiver: ReqReceiver, reply_receiver: ReplyReceiver, ) -> Self { Self { local_channel_id, request_receiver: request_receiver.into(), reply_receiver: reply_receiver.into(), request_sender_store: Default::default(), reply_sender_store: Default::default(), phantom: PhantomData, } } pub fn local_channel_id_generic(&self) -> ComponentId { self.local_channel_id } } } #[cfg(feature = "std")] pub mod std_mod { use super::*; use std::sync::mpsc; use crate::queue::{GenericReceiveError, GenericSendError}; impl MessageSenderProvider for mpsc::Sender> { fn send(&self, message: GenericMessage) -> Result<(), GenericTargetedMessagingError> { self.send(message) .map_err(|_| GenericSendError::RxDisconnected)?; Ok(()) } } impl MessageSenderProvider for mpsc::SyncSender> { fn send(&self, message: GenericMessage) -> Result<(), GenericTargetedMessagingError> { if let Err(e) = self.try_send(message) { return match e { mpsc::TrySendError::Full(_) => Err(GenericSendError::QueueFull(None).into()), mpsc::TrySendError::Disconnected(_) => { Err(GenericSendError::RxDisconnected.into()) } }; } Ok(()) } } pub type MessageSenderMapMpsc = MessageReceiverWithId>; pub type MessageSenderMapBoundedMpsc = MessageReceiverWithId>; impl MessageReceiverProvider for mpsc::Receiver> { fn try_recv(&self) -> Result>, GenericTargetedMessagingError> { match self.try_recv() { Ok(msg) => Ok(Some(msg)), Err(e) => match e { mpsc::TryRecvError::Empty => Ok(None), mpsc::TryRecvError::Disconnected => { Err(GenericReceiveError::TxDisconnected(None).into()) } }, } } } pub type MessageReceiverWithIdMpsc = MessageReceiverWithId>; } #[cfg(test)] mod tests { use std::sync::mpsc; use alloc::string::ToString; use spacepackets::{ ecss::tc::{PusTcCreator, PusTcSecondaryHeader}, ByteConversionError, SpHeader, }; use crate::{ queue::{GenericReceiveError, GenericSendError, GenericTargetedMessagingError}, request::{MessageMetadata, MessageSenderMap, MessageSenderStoreProvider}, }; use super::{GenericMessage, MessageReceiverWithId, UniqueApidTargetId}; const TEST_CHANNEL_ID_0: u64 = 1; const TEST_CHANNEL_ID_1: u64 = 2; const TEST_CHANNEL_ID_2: u64 = 3; #[test] fn test_basic_target_id_with_apid() { let id = UniqueApidTargetId::new(0x111, 0x01); assert_eq!(id.apid, 0x111); assert_eq!(id.unique_id, 0x01); assert_eq!(id.id(), id.raw()); assert_eq!(u64::from(id), id.raw()); let id_raw = id.raw(); let id_from_raw = UniqueApidTargetId::from(id_raw); assert_eq!(id_from_raw, id); assert_eq!(id.id(), (0x111 << 32) | 0x01); let string = id.to_string(); assert_eq!( string, "Target and APID ID with APID 0x111 and target 1".to_string() ); } #[test] fn test_basic_target_id_with_apid_from_pus_tc() { let sp_header = SpHeader::new_for_unseg_tc(0x111, 5, 0); let app_data = 1_u32.to_be_bytes(); let pus_tc = PusTcCreator::new_simple(sp_header, 17, 1, &app_data, true); let id = UniqueApidTargetId::from_pus_tc(&pus_tc).unwrap(); assert_eq!(id.apid, 0x111); assert_eq!(id.unique_id, 1); } #[test] fn test_basic_target_id_with_apid_from_pus_tc_invalid_app_data() { let sp_header = SpHeader::new_for_unseg_tc(0x111, 5, 0); let sec_header = PusTcSecondaryHeader::new_simple(17, 1); let pus_tc = PusTcCreator::new_no_app_data(sp_header, sec_header, true); let error = UniqueApidTargetId::from_pus_tc(&pus_tc); assert!(error.is_err()); let error = error.unwrap_err(); if let ByteConversionError::FromSliceTooSmall { found, expected } = error { assert_eq!(found, 0); assert_eq!(expected, 4); } else { panic!("Unexpected error type"); } } #[test] fn test_receiver_only() { let (sender, receiver) = mpsc::channel(); // Test structure with only a receiver which has a channel ID. let receiver = MessageReceiverWithId::new(TEST_CHANNEL_ID_0, receiver); let request_id = 5; sender .send(GenericMessage::new( MessageMetadata::new(request_id, TEST_CHANNEL_ID_1), 5, )) .unwrap(); let reply = receiver.try_recv_message().unwrap(); assert!(reply.is_some()); assert_eq!(receiver.local_channel_id(), TEST_CHANNEL_ID_0); let reply = reply.unwrap(); assert_eq!(reply.requestor_info.request_id, request_id); assert_eq!(reply.requestor_info.sender_id, TEST_CHANNEL_ID_1); assert_eq!(reply.message, 5); } #[test] fn test_receiver_empty() { let (_sender, receiver) = mpsc::sync_channel::>(2); // Test structure with only a receiver which has a channel ID. let receiver = MessageReceiverWithId::new(TEST_CHANNEL_ID_0, receiver); let reply = receiver.try_recv_message().unwrap(); assert!(reply.is_none()); } #[test] fn test_all_tx_disconnected() { let (sender, receiver) = mpsc::sync_channel::>(2); // Test structure with only a receiver which has a channel ID. let receiver = MessageReceiverWithId::new(TEST_CHANNEL_ID_0, receiver); drop(sender); let reply = receiver.try_recv_message(); assert!(reply.is_err()); let error = reply.unwrap_err(); if let GenericTargetedMessagingError::Receive(GenericReceiveError::TxDisconnected(None)) = error { } else { panic!("unexpected error type"); } } #[test] fn test_sender_map() { let (sender0, receiver0) = mpsc::channel(); let (sender1, receiver1) = mpsc::channel(); let mut sender_map = MessageSenderMap::default(); sender_map.add_message_target(TEST_CHANNEL_ID_1, sender0); sender_map.add_message_target(TEST_CHANNEL_ID_2, sender1); sender_map .send_message( MessageMetadata::new(1, TEST_CHANNEL_ID_0), TEST_CHANNEL_ID_1, 5, ) .expect("sending message failed"); let mut reply = receiver0.recv().expect("receiving message failed"); assert_eq!(reply.request_id(), 1); assert_eq!(reply.sender_id(), TEST_CHANNEL_ID_0); assert_eq!(reply.message, 5); sender_map .send_message( MessageMetadata::new(2, TEST_CHANNEL_ID_0), TEST_CHANNEL_ID_2, 10, ) .expect("sending message failed"); reply = receiver1.recv().expect("receiving message failed"); assert_eq!(reply.request_id(), 2); assert_eq!(reply.sender_id(), TEST_CHANNEL_ID_0); assert_eq!(reply.message, 10); } #[test] fn test_sender_map_target_does_not_exist() { let (sender0, _) = mpsc::channel(); let mut sender_map_with_id = MessageSenderMap::default(); sender_map_with_id.add_message_target(TEST_CHANNEL_ID_1, sender0); let result = sender_map_with_id.send_message( MessageMetadata::new(1, TEST_CHANNEL_ID_0), TEST_CHANNEL_ID_2, 5, ); assert!(result.is_err()); let error = result.unwrap_err(); if let GenericTargetedMessagingError::Send(GenericSendError::TargetDoesNotExist(target)) = error { assert_eq!(target, TEST_CHANNEL_ID_2); } else { panic!("Unexpected error type"); } } #[test] fn test_sender_map_queue_full() { let (sender0, _receiver0) = mpsc::sync_channel(1); let mut sender_map_with_id = MessageSenderMap::default(); sender_map_with_id.add_message_target(TEST_CHANNEL_ID_1, sender0); sender_map_with_id .send_message( MessageMetadata::new(1, TEST_CHANNEL_ID_0), TEST_CHANNEL_ID_1, 5, ) .expect("sending message failed"); let result = sender_map_with_id.send_message( MessageMetadata::new(1, TEST_CHANNEL_ID_0), TEST_CHANNEL_ID_1, 5, ); assert!(result.is_err()); let error = result.unwrap_err(); if let GenericTargetedMessagingError::Send(GenericSendError::QueueFull(capacity)) = error { assert!(capacity.is_none()); } else { panic!("Unexpected error type {}", error); } } #[test] fn test_sender_map_queue_receiver_disconnected() { let (sender0, receiver0) = mpsc::sync_channel(1); let mut sender_map_with_id = MessageSenderMap::default(); sender_map_with_id.add_message_target(TEST_CHANNEL_ID_1, sender0); drop(receiver0); let result = sender_map_with_id.send_message( MessageMetadata::new(1, TEST_CHANNEL_ID_0), TEST_CHANNEL_ID_1, 5, ); assert!(result.is_err()); let error = result.unwrap_err(); if let GenericTargetedMessagingError::Send(GenericSendError::RxDisconnected) = error { } else { panic!("Unexpected error type {}", error); } } }