Refactor TMTC distribution modules
Rust/sat-rs/pipeline/head This commit looks good Details

This commit is contained in:
Robin Müller 2024-03-04 16:26:34 +01:00
parent 5600aa576c
commit 3de5954898
Signed by: muellerr
GPG Key ID: A649FB78196E3849
7 changed files with 256 additions and 172 deletions

View File

@ -182,7 +182,7 @@ fn static_tmtc_pool_main() {
); );
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone())); let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone());
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor))
.expect("creating UDP TMTC server failed"); .expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer { let mut udp_tmtc_server = UdpTmtcServer {
@ -193,7 +193,7 @@ fn static_tmtc_pool_main() {
}, },
}; };
let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver);
let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192);
let sync_tm_tcp_source = SyncTcpTmSource::new(200); let sync_tm_tcp_source = SyncTcpTmSource::new(200);
let mut tcp_server = TcpTask::new( let mut tcp_server = TcpTask::new(
@ -396,7 +396,7 @@ fn dyn_tmtc_pool_main() {
); );
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone())); let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone());
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor))
.expect("creating UDP TMTC server failed"); .expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer { let mut udp_tmtc_server = UdpTmtcServer {
@ -406,7 +406,7 @@ fn dyn_tmtc_pool_main() {
}, },
}; };
let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver);
let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192);
let sync_tm_tcp_source = SyncTcpTmSource::new(200); let sync_tm_tcp_source = SyncTcpTmSource::new(200);
let mut tcp_server = TcpTask::new( let mut tcp_server = TcpTask::new(

View File

@ -6,11 +6,14 @@ use std::{
use log::{info, warn}; use log::{info, warn};
use satrs::{ use satrs::{
hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer}, hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer},
pus::ReceivesEcssPusTc,
spacepackets::PacketId, spacepackets::PacketId,
tmtc::{CcsdsDistributor, CcsdsError, TmPacketSourceCore}, tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore},
}; };
use satrs_example::config::PUS_APID; use satrs_example::config::PUS_APID;
use crate::ccsds::CcsdsReceiver;
pub const PACKET_ID_LOOKUP: &[PacketId] = &[PacketId::const_tc(true, PUS_APID)]; pub const PACKET_ID_LOOKUP: &[PacketId] = &[PacketId::const_tc(true, PUS_APID)];
#[derive(Default, Clone)] #[derive(Default, Clone)]
@ -69,20 +72,37 @@ impl TmPacketSourceCore for SyncTcpTmSource {
} }
} }
pub struct TcpTask<MpscErrorType: 'static> { pub type TcpServerType<TcSource, MpscErrorType> = TcpSpacepacketsServer<
server: TcpSpacepacketsServer< (),
(), CcsdsError<MpscErrorType>,
CcsdsError<MpscErrorType>, SyncTcpTmSource,
SyncTcpTmSource, CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
CcsdsDistributor<MpscErrorType>, >;
>,
pub struct TcpTask<
TcSource: ReceivesCcsdsTc<Error = MpscErrorType>
+ ReceivesEcssPusTc<Error = MpscErrorType>
+ Clone
+ Send
+ 'static,
MpscErrorType: 'static,
> {
server: TcpServerType<TcSource, MpscErrorType>,
} }
impl<MpscErrorType: 'static + core::fmt::Debug> TcpTask<MpscErrorType> { impl<
TcSource: ReceivesCcsdsTc<Error = MpscErrorType>
+ ReceivesEcssPusTc<Error = MpscErrorType>
+ Clone
+ Send
+ 'static,
MpscErrorType: 'static + core::fmt::Debug,
> TcpTask<TcSource, MpscErrorType>
{
pub fn new( pub fn new(
cfg: ServerConfig, cfg: ServerConfig,
tm_source: SyncTcpTmSource, tm_source: SyncTcpTmSource,
tc_receiver: CcsdsDistributor<MpscErrorType>, tc_receiver: CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
Ok(Self { Ok(Self {
server: TcpSpacepacketsServer::new( server: TcpSpacepacketsServer::new(

View File

@ -18,6 +18,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Refactored ECSS TM sender abstractions to be generic over different message queue backends. - Refactored ECSS TM sender abstractions to be generic over different message queue backends.
- Refactored Verification Reporter abstractions and implementation to be generic over the sender - Refactored Verification Reporter abstractions and implementation to be generic over the sender
instead of using trait objects. instead of using trait objects.
- `PusServiceProvider` renamed to `PusServiceDistributor` to make the purpose of the object
more clear
- `PusServiceProvider::handle_pus_tc_packet` renamed to `PusServiceDistributor::distribute_packet`.
- `PusServiceDistibutor` and `CcsdsDistributor` now use generics instead of trait objects.
This makes accessing the concrete trait implementations more easy as well.
## Fixed ## Fixed

View File

@ -920,7 +920,7 @@ pub mod std_mod {
} }
} }
/// This function can be used to poll the internal [EcssTcReceiver] object for the next /// This function can be used to poll the internal [EcssTcReceiverCore] object for the next
/// telecommand packet. It will return `Ok(None)` if there are not packets available. /// telecommand packet. It will return `Ok(None)` if there are not packets available.
/// In any other case, it will perform the acceptance of the ECSS TC packet using the /// In any other case, it will perform the acceptance of the ECSS TC packet using the
/// internal [VerificationReportingProvider] object. It will then return the telecommand /// internal [VerificationReportingProvider] object. It will then return the telecommand

View File

@ -52,7 +52,7 @@
//! } //! }
//! //!
//! let apid_handler = ConcreteApidHandler::default(); //! let apid_handler = ConcreteApidHandler::default();
//! let mut ccsds_distributor = CcsdsDistributor::new(Box::new(apid_handler)); //! let mut ccsds_distributor = CcsdsDistributor::new(apid_handler);
//! //!
//! // Create and pass PUS telecommand with a valid APID //! // Create and pass PUS telecommand with a valid APID
//! let mut space_packet_header = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap(); //! let mut space_packet_header = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap();
@ -72,23 +72,17 @@
//! let tc_slice = &test_buf[0..size]; //! let tc_slice = &test_buf[0..size];
//! ccsds_distributor.pass_tc(&tc_slice).expect("Passing TC slice failed"); //! ccsds_distributor.pass_tc(&tc_slice).expect("Passing TC slice failed");
//! //!
//! // User helper function to retrieve concrete class //! // Retrieve the APID handler.
//! let concrete_handler_ref: &ConcreteApidHandler = ccsds_distributor //! let handler_ref = ccsds_distributor.packet_handler();
//! .apid_handler_ref() //! assert_eq!(handler_ref.known_call_count, 1);
//! .expect("Casting back to concrete type failed"); //! assert_eq!(handler_ref.unknown_call_count, 1);
//! assert_eq!(concrete_handler_ref.known_call_count, 1);
//! assert_eq!(concrete_handler_ref.unknown_call_count, 1);
//! //!
//! // It's also possible to retrieve a mutable reference //! // Mutable access to the handler.
//! let mutable_ref: &mut ConcreteApidHandler = ccsds_distributor //! let mutable_handler_ref = ccsds_distributor.packet_handler_mut();
//! .apid_handler_mut() //! mutable_handler_ref.mutable_foo();
//! .expect("Casting back to concrete type failed");
//! mutable_ref.mutable_foo();
//! ``` //! ```
use crate::tmtc::{ReceivesCcsdsTc, ReceivesTcCore}; use crate::tmtc::{ReceivesCcsdsTc, ReceivesTcCore};
use alloc::boxed::Box;
use core::fmt::{Display, Formatter}; use core::fmt::{Display, Formatter};
use downcast_rs::Downcast;
use spacepackets::{ByteConversionError, CcsdsPacket, SpHeader}; use spacepackets::{ByteConversionError, CcsdsPacket, SpHeader};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::error::Error; use std::error::Error;
@ -99,11 +93,7 @@ use std::error::Error;
/// instance of this handler to the [CcsdsDistributor]. The distributor will use the trait /// instance of this handler to the [CcsdsDistributor]. The distributor will use the trait
/// interface to dispatch received packets to the user based on the Application Process Identifier /// interface to dispatch received packets to the user based on the Application Process Identifier
/// (APID) field of the CCSDS packet. /// (APID) field of the CCSDS packet.
/// pub trait CcsdsPacketHandler {
/// This trait automatically implements the [downcast_rs::Downcast] to allow a more convenient API
/// to cast trait objects back to their concrete type after the handler was passed to the
/// distributor.
pub trait CcsdsPacketHandler: Downcast {
type Error; type Error;
fn valid_apids(&self) -> &'static [u16]; fn valid_apids(&self) -> &'static [u16];
@ -116,23 +106,12 @@ pub trait CcsdsPacketHandler: Downcast {
) -> Result<(), Self::Error>; ) -> Result<(), Self::Error>;
} }
downcast_rs::impl_downcast!(CcsdsPacketHandler assoc Error);
pub trait SendableCcsdsPacketHandler: CcsdsPacketHandler + Send {}
impl<T: CcsdsPacketHandler + Send> SendableCcsdsPacketHandler for T {}
downcast_rs::impl_downcast!(SendableCcsdsPacketHandler assoc Error);
/// The CCSDS distributor dispatches received CCSDS packets to a user provided packet handler. /// The CCSDS distributor dispatches received CCSDS packets to a user provided packet handler.
/// pub struct CcsdsDistributor<PacketHandler: CcsdsPacketHandler<Error = E>, E> {
/// The passed APID handler is required to be [Send]able to allow more ergonomic usage with
/// threads.
pub struct CcsdsDistributor<E> {
/// User provided APID handler stored as a generic trait object. /// User provided APID handler stored as a generic trait object.
/// It can be cast back to the original concrete type using the [Self::apid_handler_ref] or /// It can be cast back to the original concrete type using [Self::packet_handler] or
/// the [Self::apid_handler_mut] method. /// the [Self::packet_handler_mut] method.
pub apid_handler: Box<dyn SendableCcsdsPacketHandler<Error = E>>, packet_handler: PacketHandler,
} }
#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[derive(Debug, Copy, Clone, PartialEq, Eq)]
@ -160,7 +139,9 @@ impl<E: Error> Error for CcsdsError<E> {
} }
} }
impl<E: 'static> ReceivesCcsdsTc for CcsdsDistributor<E> { impl<PacketHandler: CcsdsPacketHandler<Error = E>, E: 'static> ReceivesCcsdsTc
for CcsdsDistributor<PacketHandler, E>
{
type Error = CcsdsError<E>; type Error = CcsdsError<E>;
fn pass_ccsds(&mut self, header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { fn pass_ccsds(&mut self, header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
@ -168,7 +149,9 @@ impl<E: 'static> ReceivesCcsdsTc for CcsdsDistributor<E> {
} }
} }
impl<E: 'static> ReceivesTcCore for CcsdsDistributor<E> { impl<PacketHandler: CcsdsPacketHandler<Error = E>, E: 'static> ReceivesTcCore
for CcsdsDistributor<PacketHandler, E>
{
type Error = CcsdsError<E>; type Error = CcsdsError<E>;
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
@ -186,36 +169,31 @@ impl<E: 'static> ReceivesTcCore for CcsdsDistributor<E> {
} }
} }
impl<E: 'static> CcsdsDistributor<E> { impl<PacketHandler: CcsdsPacketHandler<Error = E>, E: 'static> CcsdsDistributor<PacketHandler, E> {
pub fn new(apid_handler: Box<dyn SendableCcsdsPacketHandler<Error = E>>) -> Self { pub fn new(packet_handler: PacketHandler) -> Self {
CcsdsDistributor { apid_handler } CcsdsDistributor { packet_handler }
} }
/// This function can be used to retrieve a reference to the concrete instance of the APID pub fn packet_handler(&self) -> &PacketHandler {
/// handler after it was passed to the distributor. See the &self.packet_handler
/// [module documentation][crate::tmtc::ccsds_distrib] for an fsrc-example.
pub fn apid_handler_ref<T: SendableCcsdsPacketHandler<Error = E>>(&self) -> Option<&T> {
self.apid_handler.downcast_ref::<T>()
} }
/// This function can be used to retrieve a mutable reference to the concrete instance of the pub fn packet_handler_mut(&mut self) -> &mut PacketHandler {
/// APID handler after it was passed to the distributor. &mut self.packet_handler
pub fn apid_handler_mut<T: SendableCcsdsPacketHandler<Error = E>>(&mut self) -> Option<&mut T> {
self.apid_handler.downcast_mut::<T>()
} }
fn dispatch_ccsds(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), CcsdsError<E>> { fn dispatch_ccsds(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), CcsdsError<E>> {
let apid = sp_header.apid(); let apid = sp_header.apid();
let valid_apids = self.apid_handler.valid_apids(); let valid_apids = self.packet_handler.valid_apids();
for &valid_apid in valid_apids { for &valid_apid in valid_apids {
if valid_apid == apid { if valid_apid == apid {
return self return self
.apid_handler .packet_handler
.handle_known_apid(sp_header, tc_raw) .handle_known_apid(sp_header, tc_raw)
.map_err(|e| CcsdsError::CustomError(e)); .map_err(|e| CcsdsError::CustomError(e));
} }
} }
self.apid_handler self.packet_handler
.handle_unknown_apid(sp_header, tc_raw) .handle_unknown_apid(sp_header, tc_raw)
.map_err(|e| CcsdsError::CustomError(e)) .map_err(|e| CcsdsError::CustomError(e))
} }
@ -244,6 +222,13 @@ pub(crate) mod tests {
&buf[0..size] &buf[0..size]
} }
pub fn generate_ping_tc_as_vec() -> Vec<u8> {
let mut sph = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap();
PusTcCreator::new_simple(&mut sph, 17, 1, None, true)
.to_vec()
.unwrap()
}
type SharedPacketQueue = Arc<Mutex<VecDeque<(u16, Vec<u8>)>>>; type SharedPacketQueue = Arc<Mutex<VecDeque<(u16, Vec<u8>)>>>;
pub struct BasicApidHandlerSharedQueue { pub struct BasicApidHandlerSharedQueue {
pub known_packet_queue: SharedPacketQueue, pub known_packet_queue: SharedPacketQueue,
@ -305,7 +290,8 @@ pub(crate) mod tests {
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
let mut vec = Vec::new(); let mut vec = Vec::new();
vec.extend_from_slice(tc_raw); vec.extend_from_slice(tc_raw);
Ok(self.known_packet_queue.push_back((sp_header.apid(), vec))) self.known_packet_queue.push_back((sp_header.apid(), vec));
Ok(())
} }
fn handle_unknown_apid( fn handle_unknown_apid(
@ -315,7 +301,8 @@ pub(crate) mod tests {
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
let mut vec = Vec::new(); let mut vec = Vec::new();
vec.extend_from_slice(tc_raw); vec.extend_from_slice(tc_raw);
Ok(self.unknown_packet_queue.push_back((sp_header.apid(), vec))) self.unknown_packet_queue.push_back((sp_header.apid(), vec));
Ok(())
} }
} }
@ -327,7 +314,7 @@ pub(crate) mod tests {
known_packet_queue: known_packet_queue.clone(), known_packet_queue: known_packet_queue.clone(),
unknown_packet_queue: unknown_packet_queue.clone(), unknown_packet_queue: unknown_packet_queue.clone(),
}; };
let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler)); let mut ccsds_distrib = CcsdsDistributor::new(apid_handler);
is_send(&ccsds_distrib); is_send(&ccsds_distrib);
let mut test_buf: [u8; 32] = [0; 32]; let mut test_buf: [u8; 32] = [0; 32];
let tc_slice = generate_ping_tc(test_buf.as_mut_slice()); let tc_slice = generate_ping_tc(test_buf.as_mut_slice());
@ -342,14 +329,9 @@ pub(crate) mod tests {
} }
#[test] #[test]
fn test_distribs_unknown_apid() { fn test_unknown_apid_handling() {
let known_packet_queue = Arc::new(Mutex::default()); let apid_handler = BasicApidHandlerOwnedQueue::default();
let unknown_packet_queue = Arc::new(Mutex::default()); let mut ccsds_distrib = CcsdsDistributor::new(apid_handler);
let apid_handler = BasicApidHandlerSharedQueue {
known_packet_queue: known_packet_queue.clone(),
unknown_packet_queue: unknown_packet_queue.clone(),
};
let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler));
let mut sph = SpHeader::tc_unseg(0x004, 0x34, 0).unwrap(); let mut sph = SpHeader::tc_unseg(0x004, 0x34, 0).unwrap();
let pus_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); let pus_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
let mut test_buf: [u8; 32] = [0; 32]; let mut test_buf: [u8; 32] = [0; 32];
@ -357,11 +339,52 @@ pub(crate) mod tests {
.write_to_bytes(test_buf.as_mut_slice()) .write_to_bytes(test_buf.as_mut_slice())
.expect("Error writing TC to buffer"); .expect("Error writing TC to buffer");
ccsds_distrib.pass_tc(&test_buf).expect("Passing TC failed"); ccsds_distrib.pass_tc(&test_buf).expect("Passing TC failed");
let recvd = unknown_packet_queue.lock().unwrap().pop_front(); assert!(ccsds_distrib.packet_handler().known_packet_queue.is_empty());
assert!(known_packet_queue.lock().unwrap().is_empty()); let apid_handler = ccsds_distrib.packet_handler_mut();
let recvd = apid_handler.unknown_packet_queue.pop_front();
assert!(recvd.is_some()); assert!(recvd.is_some());
let (apid, packet) = recvd.unwrap(); let (apid, packet) = recvd.unwrap();
assert_eq!(apid, 0x004); assert_eq!(apid, 0x004);
assert_eq!(packet.as_slice(), test_buf); assert_eq!(packet.as_slice(), test_buf);
} }
#[test]
fn test_ccsds_distribution() {
let mut ccsds_distrib = CcsdsDistributor::new(BasicApidHandlerOwnedQueue::default());
let mut sph = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap();
let pus_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
let tc_vec = pus_tc.to_vec().unwrap();
ccsds_distrib
.pass_ccsds(&sph, &tc_vec)
.expect("passing CCSDS TC failed");
let recvd = ccsds_distrib
.packet_handler_mut()
.known_packet_queue
.pop_front();
assert!(recvd.is_some());
let recvd = recvd.unwrap();
assert_eq!(recvd.0, 0x002);
assert_eq!(recvd.1, tc_vec);
}
#[test]
fn test_distribution_short_packet_fails() {
let mut ccsds_distrib = CcsdsDistributor::new(BasicApidHandlerOwnedQueue::default());
let mut sph = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap();
let pus_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
let tc_vec = pus_tc.to_vec().unwrap();
let result = ccsds_distrib.pass_tc(&tc_vec[0..6]);
assert!(result.is_err());
let error = result.unwrap_err();
if let CcsdsError::ByteConversionError(ByteConversionError::FromSliceTooSmall {
found,
expected,
}) = error
{
assert_eq!(found, 6);
assert_eq!(expected, 7);
} else {
panic!("Unexpected error variant");
}
}
} }

View File

@ -18,7 +18,7 @@ pub mod tm_helper;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub use ccsds_distrib::{CcsdsDistributor, CcsdsError, CcsdsPacketHandler}; pub use ccsds_distrib::{CcsdsDistributor, CcsdsError, CcsdsPacketHandler};
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub use pus_distrib::{PusDistributor, PusServiceProvider}; pub use pus_distrib::{PusDistributor, PusServiceDistributor};
/// Generic trait for object which can receive any telecommands in form of a raw bytestream, with /// Generic trait for object which can receive any telecommands in form of a raw bytestream, with
/// no assumptions about the received protocol. /// no assumptions about the received protocol.

View File

@ -2,7 +2,7 @@
//! //!
//! The routing components consist of two core components: //! The routing components consist of two core components:
//! 1. [PusDistributor] component which dispatches received packets to a user-provided handler. //! 1. [PusDistributor] component which dispatches received packets to a user-provided handler.
//! 2. [PusServiceProvider] trait which should be implemented by the user-provided PUS packet //! 2. [PusServiceDistributor] trait which should be implemented by the user-provided PUS packet
//! handler. //! handler.
//! //!
//! The [PusDistributor] implements the [ReceivesEcssPusTc], [ReceivesCcsdsTc] and the //! The [PusDistributor] implements the [ReceivesEcssPusTc], [ReceivesCcsdsTc] and the
@ -13,25 +13,26 @@
//! the raw bytestream. If this process fails, a [PusDistribError::PusError] is returned to the //! the raw bytestream. If this process fails, a [PusDistribError::PusError] is returned to the
//! user. //! user.
//! 2. If it was possible to extract both components, the packet will be passed to the //! 2. If it was possible to extract both components, the packet will be passed to the
//! [PusServiceProvider::handle_pus_tc_packet] method provided by the user. //! [PusServiceDistributor::distribute_packet] method provided by the user.
//! //!
//! # Example //! # Example
//! //!
//! ```rust //! ```rust
//! use spacepackets::ecss::WritablePusPacket; //! use spacepackets::ecss::WritablePusPacket;
//! use satrs::tmtc::pus_distrib::{PusDistributor, PusServiceProvider}; //! use satrs::tmtc::pus_distrib::{PusDistributor, PusServiceDistributor};
//! use satrs::tmtc::{ReceivesTc, ReceivesTcCore}; //! use satrs::tmtc::{ReceivesTc, ReceivesTcCore};
//! use spacepackets::SpHeader; //! use spacepackets::SpHeader;
//! use spacepackets::ecss::tc::{PusTcCreator, PusTcReader}; //! use spacepackets::ecss::tc::{PusTcCreator, PusTcReader};
//!
//! struct ConcretePusHandler { //! struct ConcretePusHandler {
//! handler_call_count: u32 //! handler_call_count: u32
//! } //! }
//! //!
//! // This is a very simple possible service provider. It increments an internal call count field, //! // This is a very simple possible service provider. It increments an internal call count field,
//! // which is used to verify the handler was called //! // which is used to verify the handler was called
//! impl PusServiceProvider for ConcretePusHandler { //! impl PusServiceDistributor for ConcretePusHandler {
//! type Error = (); //! type Error = ();
//! fn handle_pus_tc_packet(&mut self, service: u8, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { //! fn distribute_packet(&mut self, service: u8, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
//! assert_eq!(service, 17); //! assert_eq!(service, 17);
//! assert_eq!(pus_tc.len_packed(), 13); //! assert_eq!(pus_tc.len_packed(), 13);
//! self.handler_call_count += 1; //! self.handler_call_count += 1;
@ -42,7 +43,7 @@
//! let service_handler = ConcretePusHandler { //! let service_handler = ConcretePusHandler {
//! handler_call_count: 0 //! handler_call_count: 0
//! }; //! };
//! let mut pus_distributor = PusDistributor::new(Box::new(service_handler)); //! let mut pus_distributor = PusDistributor::new(service_handler);
//! //!
//! // Create and pass PUS ping telecommand with a valid APID //! // Create and pass PUS ping telecommand with a valid APID
//! let mut space_packet_header = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap(); //! let mut space_packet_header = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap();
@ -57,50 +58,42 @@
//! //!
//! // User helper function to retrieve concrete class. We check the call count here to verify //! // User helper function to retrieve concrete class. We check the call count here to verify
//! // that the PUS ping telecommand was routed successfully. //! // that the PUS ping telecommand was routed successfully.
//! let concrete_handler_ref: &ConcretePusHandler = pus_distributor //! let concrete_handler = pus_distributor.service_distributor();
//! .service_provider_ref() //! assert_eq!(concrete_handler.handler_call_count, 1);
//! .expect("Casting back to concrete type failed");
//! assert_eq!(concrete_handler_ref.handler_call_count, 1);
//! ``` //! ```
use crate::pus::ReceivesEcssPusTc; use crate::pus::ReceivesEcssPusTc;
use crate::tmtc::{ReceivesCcsdsTc, ReceivesTcCore}; use crate::tmtc::{ReceivesCcsdsTc, ReceivesTcCore};
use alloc::boxed::Box;
use core::fmt::{Display, Formatter}; use core::fmt::{Display, Formatter};
use downcast_rs::Downcast;
use spacepackets::ecss::tc::PusTcReader; use spacepackets::ecss::tc::PusTcReader;
use spacepackets::ecss::{PusError, PusPacket}; use spacepackets::ecss::{PusError, PusPacket};
use spacepackets::SpHeader; use spacepackets::SpHeader;
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::error::Error; use std::error::Error;
pub trait PusServiceProvider: Downcast { /// Trait for a generic distributor object which can distribute PUS packets based on packet
/// properties like the PUS service, space packet header or any other content of the PUS packet.
pub trait PusServiceDistributor {
type Error; type Error;
fn handle_pus_tc_packet( fn distribute_packet(
&mut self, &mut self,
service: u8, service: u8,
header: &SpHeader, header: &SpHeader,
pus_tc: &PusTcReader, pus_tc: &PusTcReader,
) -> Result<(), Self::Error>; ) -> Result<(), Self::Error>;
} }
downcast_rs::impl_downcast!(PusServiceProvider assoc Error);
pub trait SendablePusServiceProvider: PusServiceProvider + Send {}
impl<T: Send + PusServiceProvider> SendablePusServiceProvider for T {}
downcast_rs::impl_downcast!(SendablePusServiceProvider assoc Error);
/// Generic distributor object which dispatches received packets to a user provided handler. /// Generic distributor object which dispatches received packets to a user provided handler.
/// pub struct PusDistributor<ServiceDistributor: PusServiceDistributor<Error = E>, E> {
/// This distributor expects the passed trait object to be [Send]able to allow more ergonomic service_distributor: ServiceDistributor,
/// usage with threads.
pub struct PusDistributor<E> {
pub service_provider: Box<dyn SendablePusServiceProvider<Error = E>>,
} }
impl<E> PusDistributor<E> { impl<ServiceDistributor: PusServiceDistributor<Error = E>, E>
pub fn new(service_provider: Box<dyn SendablePusServiceProvider<Error = E>>) -> Self { PusDistributor<ServiceDistributor, E>
PusDistributor { service_provider } {
pub fn new(service_provider: ServiceDistributor) -> Self {
PusDistributor {
service_distributor: service_provider,
}
} }
} }
@ -113,8 +106,8 @@ pub enum PusDistribError<E> {
impl<E: Display> Display for PusDistribError<E> { impl<E: Display> Display for PusDistribError<E> {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self { match self {
PusDistribError::CustomError(e) => write!(f, "{e}"), PusDistribError::CustomError(e) => write!(f, "pus distribution error: {e}"),
PusDistribError::PusError(e) => write!(f, "{e}"), PusDistribError::PusError(e) => write!(f, "pus distribution error: {e}"),
} }
} }
} }
@ -129,7 +122,9 @@ impl<E: Error> Error for PusDistribError<E> {
} }
} }
impl<E: 'static> ReceivesTcCore for PusDistributor<E> { impl<ServiceDistributor: PusServiceDistributor<Error = E>, E: 'static> ReceivesTcCore
for PusDistributor<ServiceDistributor, E>
{
type Error = PusDistribError<E>; type Error = PusDistribError<E>;
fn pass_tc(&mut self, tm_raw: &[u8]) -> Result<(), Self::Error> { fn pass_tc(&mut self, tm_raw: &[u8]) -> Result<(), Self::Error> {
// Convert to ccsds and call pass_ccsds // Convert to ccsds and call pass_ccsds
@ -139,7 +134,9 @@ impl<E: 'static> ReceivesTcCore for PusDistributor<E> {
} }
} }
impl<E: 'static> ReceivesCcsdsTc for PusDistributor<E> { impl<ServiceDistributor: PusServiceDistributor<Error = E>, E: 'static> ReceivesCcsdsTc
for PusDistributor<ServiceDistributor, E>
{
type Error = PusDistribError<E>; type Error = PusDistribError<E>;
fn pass_ccsds(&mut self, header: &SpHeader, tm_raw: &[u8]) -> Result<(), Self::Error> { fn pass_ccsds(&mut self, header: &SpHeader, tm_raw: &[u8]) -> Result<(), Self::Error> {
let (tc, _) = PusTcReader::new(tm_raw).map_err(|e| PusDistribError::PusError(e))?; let (tc, _) = PusTcReader::new(tm_raw).map_err(|e| PusDistribError::PusError(e))?;
@ -147,34 +144,39 @@ impl<E: 'static> ReceivesCcsdsTc for PusDistributor<E> {
} }
} }
impl<E: 'static> ReceivesEcssPusTc for PusDistributor<E> { impl<ServiceDistributor: PusServiceDistributor<Error = E>, E: 'static> ReceivesEcssPusTc
for PusDistributor<ServiceDistributor, E>
{
type Error = PusDistribError<E>; type Error = PusDistribError<E>;
fn pass_pus_tc(&mut self, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { fn pass_pus_tc(&mut self, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
self.service_provider self.service_distributor
.handle_pus_tc_packet(pus_tc.service(), header, pus_tc) .distribute_packet(pus_tc.service(), header, pus_tc)
.map_err(|e| PusDistribError::CustomError(e)) .map_err(|e| PusDistribError::CustomError(e))
} }
} }
impl<E: 'static> PusDistributor<E> { impl<ServiceDistributor: PusServiceDistributor<Error = E>, E: 'static>
pub fn service_provider_ref<T: SendablePusServiceProvider<Error = E>>(&self) -> Option<&T> { PusDistributor<ServiceDistributor, E>
self.service_provider.downcast_ref::<T>() {
pub fn service_distributor(&self) -> &ServiceDistributor {
&self.service_distributor
} }
pub fn service_provider_mut<T: SendablePusServiceProvider<Error = E>>( pub fn service_distributor_mut(&mut self) -> &mut ServiceDistributor {
&mut self, &mut self.service_distributor
) -> Option<&mut T> {
self.service_provider.downcast_mut::<T>()
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::queue::GenericSendError;
use crate::tmtc::ccsds_distrib::tests::{ use crate::tmtc::ccsds_distrib::tests::{
generate_ping_tc, BasicApidHandlerOwnedQueue, BasicApidHandlerSharedQueue, generate_ping_tc, generate_ping_tc_as_vec, BasicApidHandlerOwnedQueue,
BasicApidHandlerSharedQueue,
}; };
use crate::tmtc::ccsds_distrib::{CcsdsDistributor, CcsdsPacketHandler}; use crate::tmtc::ccsds_distrib::{CcsdsDistributor, CcsdsPacketHandler};
use alloc::format;
use alloc::vec::Vec; use alloc::vec::Vec;
use spacepackets::ecss::PusError; use spacepackets::ecss::PusError;
use spacepackets::CcsdsPacket; use spacepackets::CcsdsPacket;
@ -185,54 +187,65 @@ mod tests {
fn is_send<T: Send>(_: &T) {} fn is_send<T: Send>(_: &T) {}
struct PusHandlerSharedQueue { pub struct PacketInfo {
pub pus_queue: Arc<Mutex<VecDeque<(u8, u16, Vec<u8>)>>>, pub service: u8,
pub apid: u16,
pub packet: Vec<u8>,
} }
struct PusHandlerSharedQueue(Arc<Mutex<VecDeque<PacketInfo>>>);
#[derive(Default)] #[derive(Default)]
struct PusHandlerOwnedQueue { struct PusHandlerOwnedQueue(VecDeque<PacketInfo>);
pub pus_queue: VecDeque<(u8, u16, Vec<u8>)>,
}
impl PusServiceProvider for PusHandlerSharedQueue { impl PusServiceDistributor for PusHandlerSharedQueue {
type Error = PusError; type Error = PusError;
fn handle_pus_tc_packet( fn distribute_packet(
&mut self, &mut self,
service: u8, service: u8,
sp_header: &SpHeader, sp_header: &SpHeader,
pus_tc: &PusTcReader, pus_tc: &PusTcReader,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
let mut vec: Vec<u8> = Vec::new(); let mut packet: Vec<u8> = Vec::new();
vec.extend_from_slice(pus_tc.raw_data()); packet.extend_from_slice(pus_tc.raw_data());
Ok(self self.0
.pus_queue
.lock() .lock()
.expect("Mutex lock failed") .expect("Mutex lock failed")
.push_back((service, sp_header.apid(), vec))) .push_back(PacketInfo {
service,
apid: sp_header.apid(),
packet,
});
Ok(())
} }
} }
impl PusServiceProvider for PusHandlerOwnedQueue { impl PusServiceDistributor for PusHandlerOwnedQueue {
type Error = PusError; type Error = PusError;
fn handle_pus_tc_packet( fn distribute_packet(
&mut self, &mut self,
service: u8, service: u8,
sp_header: &SpHeader, sp_header: &SpHeader,
pus_tc: &PusTcReader, pus_tc: &PusTcReader,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
let mut vec: Vec<u8> = Vec::new(); let mut packet: Vec<u8> = Vec::new();
vec.extend_from_slice(pus_tc.raw_data()); packet.extend_from_slice(pus_tc.raw_data());
Ok(self.pus_queue.push_back((service, sp_header.apid(), vec))) self.0.push_back(PacketInfo {
service,
apid: sp_header.apid(),
packet,
});
Ok(())
} }
} }
struct ApidHandlerShared { struct ApidHandlerShared {
pub pus_distrib: PusDistributor<PusError>, pub pus_distrib: PusDistributor<PusHandlerSharedQueue, PusError>,
pub handler_base: BasicApidHandlerSharedQueue, pub handler_base: BasicApidHandlerSharedQueue,
} }
struct ApidHandlerOwned { struct ApidHandlerOwned {
pub pus_distrib: PusDistributor<PusError>, pub pus_distrib: PusDistributor<PusHandlerOwnedQueue, PusError>,
handler_base: BasicApidHandlerOwnedQueue, handler_base: BasicApidHandlerOwnedQueue,
} }
@ -285,28 +298,36 @@ mod tests {
} }
#[test] #[test]
#[cfg(feature = "std")] fn test_pus_distribution_as_raw_packet() {
fn test_pus_distribution() { let mut pus_distrib = PusDistributor::new(PusHandlerOwnedQueue::default());
let tc = generate_ping_tc_as_vec();
let result = pus_distrib.pass_tc(&tc);
assert!(result.is_ok());
assert_eq!(pus_distrib.service_distributor_mut().0.len(), 1);
let packet_info = pus_distrib.service_distributor_mut().0.pop_front().unwrap();
assert_eq!(packet_info.service, 17);
assert_eq!(packet_info.apid, 0x002);
assert_eq!(packet_info.packet, tc);
}
#[test]
fn test_pus_distribution_combined_handler() {
let known_packet_queue = Arc::new(Mutex::default()); let known_packet_queue = Arc::new(Mutex::default());
let unknown_packet_queue = Arc::new(Mutex::default()); let unknown_packet_queue = Arc::new(Mutex::default());
let pus_queue = Arc::new(Mutex::default()); let pus_queue = Arc::new(Mutex::default());
let pus_handler = PusHandlerSharedQueue { let pus_handler = PusHandlerSharedQueue(pus_queue.clone());
pus_queue: pus_queue.clone(),
};
let handler_base = BasicApidHandlerSharedQueue { let handler_base = BasicApidHandlerSharedQueue {
known_packet_queue: known_packet_queue.clone(), known_packet_queue: known_packet_queue.clone(),
unknown_packet_queue: unknown_packet_queue.clone(), unknown_packet_queue: unknown_packet_queue.clone(),
}; };
let pus_distrib = PusDistributor { let pus_distrib = PusDistributor::new(pus_handler);
service_provider: Box::new(pus_handler),
};
is_send(&pus_distrib); is_send(&pus_distrib);
let apid_handler = ApidHandlerShared { let apid_handler = ApidHandlerShared {
pus_distrib, pus_distrib,
handler_base, handler_base,
}; };
let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler)); let mut ccsds_distrib = CcsdsDistributor::new(apid_handler);
let mut test_buf: [u8; 32] = [0; 32]; let mut test_buf: [u8; 32] = [0; 32];
let tc_slice = generate_ping_tc(test_buf.as_mut_slice()); let tc_slice = generate_ping_tc(test_buf.as_mut_slice());
@ -322,25 +343,23 @@ mod tests {
assert_eq!(packet.as_slice(), tc_slice); assert_eq!(packet.as_slice(), tc_slice);
let recvd_pus = pus_queue.lock().unwrap().pop_front(); let recvd_pus = pus_queue.lock().unwrap().pop_front();
assert!(recvd_pus.is_some()); assert!(recvd_pus.is_some());
let (service, apid, tc_raw) = recvd_pus.unwrap(); let packet_info = recvd_pus.unwrap();
assert_eq!(service, 17); assert_eq!(packet_info.service, 17);
assert_eq!(apid, 0x002); assert_eq!(packet_info.apid, 0x002);
assert_eq!(tc_raw, tc_slice); assert_eq!(packet_info.packet, tc_slice);
} }
#[test] #[test]
fn test_as_any_cast() { fn test_accessing_combined_distributor() {
let pus_handler = PusHandlerOwnedQueue::default(); let pus_handler = PusHandlerOwnedQueue::default();
let handler_base = BasicApidHandlerOwnedQueue::default(); let handler_base = BasicApidHandlerOwnedQueue::default();
let pus_distrib = PusDistributor { let pus_distrib = PusDistributor::new(pus_handler);
service_provider: Box::new(pus_handler),
};
let apid_handler = ApidHandlerOwned { let apid_handler = ApidHandlerOwned {
pus_distrib, pus_distrib,
handler_base, handler_base,
}; };
let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler)); let mut ccsds_distrib = CcsdsDistributor::new(apid_handler);
let mut test_buf: [u8; 32] = [0; 32]; let mut test_buf: [u8; 32] = [0; 32];
let tc_slice = generate_ping_tc(test_buf.as_mut_slice()); let tc_slice = generate_ping_tc(test_buf.as_mut_slice());
@ -349,21 +368,38 @@ mod tests {
.pass_tc(tc_slice) .pass_tc(tc_slice)
.expect("Passing TC slice failed"); .expect("Passing TC slice failed");
let apid_handler_casted_back: &mut ApidHandlerOwned = ccsds_distrib let apid_handler_casted_back = ccsds_distrib.packet_handler_mut();
.apid_handler_mut()
.expect("Cast to concrete type ApidHandler failed");
assert!(!apid_handler_casted_back assert!(!apid_handler_casted_back
.handler_base .handler_base
.known_packet_queue .known_packet_queue
.is_empty()); .is_empty());
let handler_casted_back: &mut PusHandlerOwnedQueue = apid_handler_casted_back let handler_owned_queue = apid_handler_casted_back
.pus_distrib .pus_distrib
.service_provider_mut() .service_distributor_mut();
.expect("Cast to concrete type PusHandlerOwnedQueue failed"); assert!(!handler_owned_queue.0.is_empty());
assert!(!handler_casted_back.pus_queue.is_empty()); let packet_info = handler_owned_queue.0.pop_front().unwrap();
let (service, apid, packet_raw) = handler_casted_back.pus_queue.pop_front().unwrap(); assert_eq!(packet_info.service, 17);
assert_eq!(service, 17); assert_eq!(packet_info.apid, 0x002);
assert_eq!(apid, 0x002); assert_eq!(packet_info.packet, tc_slice);
assert_eq!(packet_raw.as_slice(), tc_slice); }
#[test]
fn test_pus_distrib_error_custom_error() {
let error = PusDistribError::CustomError(GenericSendError::RxDisconnected);
let error_string = format!("{}", error);
assert_eq!(
error_string,
"pus distribution error: rx side has disconnected"
);
}
#[test]
fn test_pus_distrib_error_pus_error() {
let error = PusDistribError::<GenericSendError>::PusError(PusError::CrcCalculationMissing);
let error_string = format!("{}", error);
assert_eq!(
error_string,
"pus distribution error: crc16 was not calculated"
);
} }
} }