From 3de5954898294024ab7dd73ea0c5dbd22be3a357 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 4 Mar 2024 16:26:34 +0100 Subject: [PATCH] Refactor TMTC distribution modules --- satrs-example/src/main.rs | 8 +- satrs-example/src/tcp.rs | 40 ++++-- satrs/CHANGELOG.md | 5 + satrs/src/pus/mod.rs | 2 +- satrs/src/tmtc/ccsds_distrib.rs | 151 ++++++++++++---------- satrs/src/tmtc/mod.rs | 2 +- satrs/src/tmtc/pus_distrib.rs | 220 +++++++++++++++++++------------- 7 files changed, 256 insertions(+), 172 deletions(-) diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 9844424..0cdd462 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -182,7 +182,7 @@ fn static_tmtc_pool_main() { ); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone())); + let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone()); let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { @@ -193,7 +193,7 @@ fn static_tmtc_pool_main() { }, }; - let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver); let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); let sync_tm_tcp_source = SyncTcpTmSource::new(200); let mut tcp_server = TcpTask::new( @@ -396,7 +396,7 @@ fn dyn_tmtc_pool_main() { ); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone())); + let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone()); let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { @@ -406,7 +406,7 @@ fn dyn_tmtc_pool_main() { }, }; - let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver); let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); let sync_tm_tcp_source = SyncTcpTmSource::new(200); let mut tcp_server = TcpTask::new( diff --git a/satrs-example/src/tcp.rs b/satrs-example/src/tcp.rs index 566b445..014f300 100644 --- a/satrs-example/src/tcp.rs +++ b/satrs-example/src/tcp.rs @@ -6,11 +6,14 @@ use std::{ use log::{info, warn}; use satrs::{ hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer}, + pus::ReceivesEcssPusTc, spacepackets::PacketId, - tmtc::{CcsdsDistributor, CcsdsError, TmPacketSourceCore}, + tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore}, }; use satrs_example::config::PUS_APID; +use crate::ccsds::CcsdsReceiver; + pub const PACKET_ID_LOOKUP: &[PacketId] = &[PacketId::const_tc(true, PUS_APID)]; #[derive(Default, Clone)] @@ -69,20 +72,37 @@ impl TmPacketSourceCore for SyncTcpTmSource { } } -pub struct TcpTask { - server: TcpSpacepacketsServer< - (), - CcsdsError, - SyncTcpTmSource, - CcsdsDistributor, - >, +pub type TcpServerType = TcpSpacepacketsServer< + (), + CcsdsError, + SyncTcpTmSource, + CcsdsDistributor, MpscErrorType>, +>; + +pub struct TcpTask< + TcSource: ReceivesCcsdsTc + + ReceivesEcssPusTc + + Clone + + Send + + 'static, + MpscErrorType: 'static, +> { + server: TcpServerType, } -impl TcpTask { +impl< + TcSource: ReceivesCcsdsTc + + ReceivesEcssPusTc + + Clone + + Send + + 'static, + MpscErrorType: 'static + core::fmt::Debug, + > TcpTask +{ pub fn new( cfg: ServerConfig, tm_source: SyncTcpTmSource, - tc_receiver: CcsdsDistributor, + tc_receiver: CcsdsDistributor, MpscErrorType>, ) -> Result { Ok(Self { server: TcpSpacepacketsServer::new( diff --git a/satrs/CHANGELOG.md b/satrs/CHANGELOG.md index b640167..b0bc493 100644 --- a/satrs/CHANGELOG.md +++ b/satrs/CHANGELOG.md @@ -18,6 +18,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Refactored ECSS TM sender abstractions to be generic over different message queue backends. - Refactored Verification Reporter abstractions and implementation to be generic over the sender instead of using trait objects. +- `PusServiceProvider` renamed to `PusServiceDistributor` to make the purpose of the object + more clear +- `PusServiceProvider::handle_pus_tc_packet` renamed to `PusServiceDistributor::distribute_packet`. +- `PusServiceDistibutor` and `CcsdsDistributor` now use generics instead of trait objects. + This makes accessing the concrete trait implementations more easy as well. ## Fixed diff --git a/satrs/src/pus/mod.rs b/satrs/src/pus/mod.rs index ba0ff1d..60509ef 100644 --- a/satrs/src/pus/mod.rs +++ b/satrs/src/pus/mod.rs @@ -920,7 +920,7 @@ pub mod std_mod { } } - /// This function can be used to poll the internal [EcssTcReceiver] object for the next + /// This function can be used to poll the internal [EcssTcReceiverCore] object for the next /// telecommand packet. It will return `Ok(None)` if there are not packets available. /// In any other case, it will perform the acceptance of the ECSS TC packet using the /// internal [VerificationReportingProvider] object. It will then return the telecommand diff --git a/satrs/src/tmtc/ccsds_distrib.rs b/satrs/src/tmtc/ccsds_distrib.rs index 74707e4..a0d01b0 100644 --- a/satrs/src/tmtc/ccsds_distrib.rs +++ b/satrs/src/tmtc/ccsds_distrib.rs @@ -52,7 +52,7 @@ //! } //! //! let apid_handler = ConcreteApidHandler::default(); -//! let mut ccsds_distributor = CcsdsDistributor::new(Box::new(apid_handler)); +//! let mut ccsds_distributor = CcsdsDistributor::new(apid_handler); //! //! // Create and pass PUS telecommand with a valid APID //! let mut space_packet_header = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap(); @@ -72,23 +72,17 @@ //! let tc_slice = &test_buf[0..size]; //! ccsds_distributor.pass_tc(&tc_slice).expect("Passing TC slice failed"); //! -//! // User helper function to retrieve concrete class -//! let concrete_handler_ref: &ConcreteApidHandler = ccsds_distributor -//! .apid_handler_ref() -//! .expect("Casting back to concrete type failed"); -//! assert_eq!(concrete_handler_ref.known_call_count, 1); -//! assert_eq!(concrete_handler_ref.unknown_call_count, 1); +//! // Retrieve the APID handler. +//! let handler_ref = ccsds_distributor.packet_handler(); +//! assert_eq!(handler_ref.known_call_count, 1); +//! assert_eq!(handler_ref.unknown_call_count, 1); //! -//! // It's also possible to retrieve a mutable reference -//! let mutable_ref: &mut ConcreteApidHandler = ccsds_distributor -//! .apid_handler_mut() -//! .expect("Casting back to concrete type failed"); -//! mutable_ref.mutable_foo(); +//! // Mutable access to the handler. +//! let mutable_handler_ref = ccsds_distributor.packet_handler_mut(); +//! mutable_handler_ref.mutable_foo(); //! ``` use crate::tmtc::{ReceivesCcsdsTc, ReceivesTcCore}; -use alloc::boxed::Box; use core::fmt::{Display, Formatter}; -use downcast_rs::Downcast; use spacepackets::{ByteConversionError, CcsdsPacket, SpHeader}; #[cfg(feature = "std")] use std::error::Error; @@ -99,11 +93,7 @@ use std::error::Error; /// instance of this handler to the [CcsdsDistributor]. The distributor will use the trait /// interface to dispatch received packets to the user based on the Application Process Identifier /// (APID) field of the CCSDS packet. -/// -/// This trait automatically implements the [downcast_rs::Downcast] to allow a more convenient API -/// to cast trait objects back to their concrete type after the handler was passed to the -/// distributor. -pub trait CcsdsPacketHandler: Downcast { +pub trait CcsdsPacketHandler { type Error; fn valid_apids(&self) -> &'static [u16]; @@ -116,23 +106,12 @@ pub trait CcsdsPacketHandler: Downcast { ) -> Result<(), Self::Error>; } -downcast_rs::impl_downcast!(CcsdsPacketHandler assoc Error); - -pub trait SendableCcsdsPacketHandler: CcsdsPacketHandler + Send {} - -impl SendableCcsdsPacketHandler for T {} - -downcast_rs::impl_downcast!(SendableCcsdsPacketHandler assoc Error); - /// The CCSDS distributor dispatches received CCSDS packets to a user provided packet handler. -/// -/// The passed APID handler is required to be [Send]able to allow more ergonomic usage with -/// threads. -pub struct CcsdsDistributor { +pub struct CcsdsDistributor, E> { /// User provided APID handler stored as a generic trait object. - /// It can be cast back to the original concrete type using the [Self::apid_handler_ref] or - /// the [Self::apid_handler_mut] method. - pub apid_handler: Box>, + /// It can be cast back to the original concrete type using [Self::packet_handler] or + /// the [Self::packet_handler_mut] method. + packet_handler: PacketHandler, } #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -160,7 +139,9 @@ impl Error for CcsdsError { } } -impl ReceivesCcsdsTc for CcsdsDistributor { +impl, E: 'static> ReceivesCcsdsTc + for CcsdsDistributor +{ type Error = CcsdsError; fn pass_ccsds(&mut self, header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { @@ -168,7 +149,9 @@ impl ReceivesCcsdsTc for CcsdsDistributor { } } -impl ReceivesTcCore for CcsdsDistributor { +impl, E: 'static> ReceivesTcCore + for CcsdsDistributor +{ type Error = CcsdsError; fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { @@ -186,36 +169,31 @@ impl ReceivesTcCore for CcsdsDistributor { } } -impl CcsdsDistributor { - pub fn new(apid_handler: Box>) -> Self { - CcsdsDistributor { apid_handler } +impl, E: 'static> CcsdsDistributor { + pub fn new(packet_handler: PacketHandler) -> Self { + CcsdsDistributor { packet_handler } } - /// This function can be used to retrieve a reference to the concrete instance of the APID - /// handler after it was passed to the distributor. See the - /// [module documentation][crate::tmtc::ccsds_distrib] for an fsrc-example. - pub fn apid_handler_ref>(&self) -> Option<&T> { - self.apid_handler.downcast_ref::() + pub fn packet_handler(&self) -> &PacketHandler { + &self.packet_handler } - /// This function can be used to retrieve a mutable reference to the concrete instance of the - /// APID handler after it was passed to the distributor. - pub fn apid_handler_mut>(&mut self) -> Option<&mut T> { - self.apid_handler.downcast_mut::() + pub fn packet_handler_mut(&mut self) -> &mut PacketHandler { + &mut self.packet_handler } fn dispatch_ccsds(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), CcsdsError> { let apid = sp_header.apid(); - let valid_apids = self.apid_handler.valid_apids(); + let valid_apids = self.packet_handler.valid_apids(); for &valid_apid in valid_apids { if valid_apid == apid { return self - .apid_handler + .packet_handler .handle_known_apid(sp_header, tc_raw) .map_err(|e| CcsdsError::CustomError(e)); } } - self.apid_handler + self.packet_handler .handle_unknown_apid(sp_header, tc_raw) .map_err(|e| CcsdsError::CustomError(e)) } @@ -244,6 +222,13 @@ pub(crate) mod tests { &buf[0..size] } + pub fn generate_ping_tc_as_vec() -> Vec { + let mut sph = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap(); + PusTcCreator::new_simple(&mut sph, 17, 1, None, true) + .to_vec() + .unwrap() + } + type SharedPacketQueue = Arc)>>>; pub struct BasicApidHandlerSharedQueue { pub known_packet_queue: SharedPacketQueue, @@ -305,7 +290,8 @@ pub(crate) mod tests { ) -> Result<(), Self::Error> { let mut vec = Vec::new(); vec.extend_from_slice(tc_raw); - Ok(self.known_packet_queue.push_back((sp_header.apid(), vec))) + self.known_packet_queue.push_back((sp_header.apid(), vec)); + Ok(()) } fn handle_unknown_apid( @@ -315,7 +301,8 @@ pub(crate) mod tests { ) -> Result<(), Self::Error> { let mut vec = Vec::new(); vec.extend_from_slice(tc_raw); - Ok(self.unknown_packet_queue.push_back((sp_header.apid(), vec))) + self.unknown_packet_queue.push_back((sp_header.apid(), vec)); + Ok(()) } } @@ -327,7 +314,7 @@ pub(crate) mod tests { known_packet_queue: known_packet_queue.clone(), unknown_packet_queue: unknown_packet_queue.clone(), }; - let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler)); + let mut ccsds_distrib = CcsdsDistributor::new(apid_handler); is_send(&ccsds_distrib); let mut test_buf: [u8; 32] = [0; 32]; let tc_slice = generate_ping_tc(test_buf.as_mut_slice()); @@ -342,14 +329,9 @@ pub(crate) mod tests { } #[test] - fn test_distribs_unknown_apid() { - let known_packet_queue = Arc::new(Mutex::default()); - let unknown_packet_queue = Arc::new(Mutex::default()); - let apid_handler = BasicApidHandlerSharedQueue { - known_packet_queue: known_packet_queue.clone(), - unknown_packet_queue: unknown_packet_queue.clone(), - }; - let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler)); + fn test_unknown_apid_handling() { + let apid_handler = BasicApidHandlerOwnedQueue::default(); + let mut ccsds_distrib = CcsdsDistributor::new(apid_handler); let mut sph = SpHeader::tc_unseg(0x004, 0x34, 0).unwrap(); let pus_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); let mut test_buf: [u8; 32] = [0; 32]; @@ -357,11 +339,52 @@ pub(crate) mod tests { .write_to_bytes(test_buf.as_mut_slice()) .expect("Error writing TC to buffer"); ccsds_distrib.pass_tc(&test_buf).expect("Passing TC failed"); - let recvd = unknown_packet_queue.lock().unwrap().pop_front(); - assert!(known_packet_queue.lock().unwrap().is_empty()); + assert!(ccsds_distrib.packet_handler().known_packet_queue.is_empty()); + let apid_handler = ccsds_distrib.packet_handler_mut(); + let recvd = apid_handler.unknown_packet_queue.pop_front(); assert!(recvd.is_some()); let (apid, packet) = recvd.unwrap(); assert_eq!(apid, 0x004); assert_eq!(packet.as_slice(), test_buf); } + + #[test] + fn test_ccsds_distribution() { + let mut ccsds_distrib = CcsdsDistributor::new(BasicApidHandlerOwnedQueue::default()); + let mut sph = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap(); + let pus_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + let tc_vec = pus_tc.to_vec().unwrap(); + ccsds_distrib + .pass_ccsds(&sph, &tc_vec) + .expect("passing CCSDS TC failed"); + let recvd = ccsds_distrib + .packet_handler_mut() + .known_packet_queue + .pop_front(); + assert!(recvd.is_some()); + let recvd = recvd.unwrap(); + assert_eq!(recvd.0, 0x002); + assert_eq!(recvd.1, tc_vec); + } + + #[test] + fn test_distribution_short_packet_fails() { + let mut ccsds_distrib = CcsdsDistributor::new(BasicApidHandlerOwnedQueue::default()); + let mut sph = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap(); + let pus_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + let tc_vec = pus_tc.to_vec().unwrap(); + let result = ccsds_distrib.pass_tc(&tc_vec[0..6]); + assert!(result.is_err()); + let error = result.unwrap_err(); + if let CcsdsError::ByteConversionError(ByteConversionError::FromSliceTooSmall { + found, + expected, + }) = error + { + assert_eq!(found, 6); + assert_eq!(expected, 7); + } else { + panic!("Unexpected error variant"); + } + } } diff --git a/satrs/src/tmtc/mod.rs b/satrs/src/tmtc/mod.rs index b930359..d4f3333 100644 --- a/satrs/src/tmtc/mod.rs +++ b/satrs/src/tmtc/mod.rs @@ -18,7 +18,7 @@ pub mod tm_helper; #[cfg(feature = "alloc")] pub use ccsds_distrib::{CcsdsDistributor, CcsdsError, CcsdsPacketHandler}; #[cfg(feature = "alloc")] -pub use pus_distrib::{PusDistributor, PusServiceProvider}; +pub use pus_distrib::{PusDistributor, PusServiceDistributor}; /// Generic trait for object which can receive any telecommands in form of a raw bytestream, with /// no assumptions about the received protocol. diff --git a/satrs/src/tmtc/pus_distrib.rs b/satrs/src/tmtc/pus_distrib.rs index 9f3dc4f..f5d6c8d 100644 --- a/satrs/src/tmtc/pus_distrib.rs +++ b/satrs/src/tmtc/pus_distrib.rs @@ -2,7 +2,7 @@ //! //! The routing components consist of two core components: //! 1. [PusDistributor] component which dispatches received packets to a user-provided handler. -//! 2. [PusServiceProvider] trait which should be implemented by the user-provided PUS packet +//! 2. [PusServiceDistributor] trait which should be implemented by the user-provided PUS packet //! handler. //! //! The [PusDistributor] implements the [ReceivesEcssPusTc], [ReceivesCcsdsTc] and the @@ -13,25 +13,26 @@ //! the raw bytestream. If this process fails, a [PusDistribError::PusError] is returned to the //! user. //! 2. If it was possible to extract both components, the packet will be passed to the -//! [PusServiceProvider::handle_pus_tc_packet] method provided by the user. +//! [PusServiceDistributor::distribute_packet] method provided by the user. //! //! # Example //! //! ```rust //! use spacepackets::ecss::WritablePusPacket; -//! use satrs::tmtc::pus_distrib::{PusDistributor, PusServiceProvider}; +//! use satrs::tmtc::pus_distrib::{PusDistributor, PusServiceDistributor}; //! use satrs::tmtc::{ReceivesTc, ReceivesTcCore}; //! use spacepackets::SpHeader; //! use spacepackets::ecss::tc::{PusTcCreator, PusTcReader}; +//! //! struct ConcretePusHandler { //! handler_call_count: u32 //! } //! //! // This is a very simple possible service provider. It increments an internal call count field, //! // which is used to verify the handler was called -//! impl PusServiceProvider for ConcretePusHandler { +//! impl PusServiceDistributor for ConcretePusHandler { //! type Error = (); -//! fn handle_pus_tc_packet(&mut self, service: u8, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { +//! fn distribute_packet(&mut self, service: u8, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { //! assert_eq!(service, 17); //! assert_eq!(pus_tc.len_packed(), 13); //! self.handler_call_count += 1; @@ -42,7 +43,7 @@ //! let service_handler = ConcretePusHandler { //! handler_call_count: 0 //! }; -//! let mut pus_distributor = PusDistributor::new(Box::new(service_handler)); +//! let mut pus_distributor = PusDistributor::new(service_handler); //! //! // Create and pass PUS ping telecommand with a valid APID //! let mut space_packet_header = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap(); @@ -57,50 +58,42 @@ //! //! // User helper function to retrieve concrete class. We check the call count here to verify //! // that the PUS ping telecommand was routed successfully. -//! let concrete_handler_ref: &ConcretePusHandler = pus_distributor -//! .service_provider_ref() -//! .expect("Casting back to concrete type failed"); -//! assert_eq!(concrete_handler_ref.handler_call_count, 1); +//! let concrete_handler = pus_distributor.service_distributor(); +//! assert_eq!(concrete_handler.handler_call_count, 1); //! ``` use crate::pus::ReceivesEcssPusTc; use crate::tmtc::{ReceivesCcsdsTc, ReceivesTcCore}; -use alloc::boxed::Box; use core::fmt::{Display, Formatter}; -use downcast_rs::Downcast; use spacepackets::ecss::tc::PusTcReader; use spacepackets::ecss::{PusError, PusPacket}; use spacepackets::SpHeader; #[cfg(feature = "std")] use std::error::Error; -pub trait PusServiceProvider: Downcast { +/// Trait for a generic distributor object which can distribute PUS packets based on packet +/// properties like the PUS service, space packet header or any other content of the PUS packet. +pub trait PusServiceDistributor { type Error; - fn handle_pus_tc_packet( + fn distribute_packet( &mut self, service: u8, header: &SpHeader, pus_tc: &PusTcReader, ) -> Result<(), Self::Error>; } -downcast_rs::impl_downcast!(PusServiceProvider assoc Error); - -pub trait SendablePusServiceProvider: PusServiceProvider + Send {} - -impl SendablePusServiceProvider for T {} - -downcast_rs::impl_downcast!(SendablePusServiceProvider assoc Error); /// Generic distributor object which dispatches received packets to a user provided handler. -/// -/// This distributor expects the passed trait object to be [Send]able to allow more ergonomic -/// usage with threads. -pub struct PusDistributor { - pub service_provider: Box>, +pub struct PusDistributor, E> { + service_distributor: ServiceDistributor, } -impl PusDistributor { - pub fn new(service_provider: Box>) -> Self { - PusDistributor { service_provider } +impl, E> + PusDistributor +{ + pub fn new(service_provider: ServiceDistributor) -> Self { + PusDistributor { + service_distributor: service_provider, + } } } @@ -113,8 +106,8 @@ pub enum PusDistribError { impl Display for PusDistribError { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { match self { - PusDistribError::CustomError(e) => write!(f, "{e}"), - PusDistribError::PusError(e) => write!(f, "{e}"), + PusDistribError::CustomError(e) => write!(f, "pus distribution error: {e}"), + PusDistribError::PusError(e) => write!(f, "pus distribution error: {e}"), } } } @@ -129,7 +122,9 @@ impl Error for PusDistribError { } } -impl ReceivesTcCore for PusDistributor { +impl, E: 'static> ReceivesTcCore + for PusDistributor +{ type Error = PusDistribError; fn pass_tc(&mut self, tm_raw: &[u8]) -> Result<(), Self::Error> { // Convert to ccsds and call pass_ccsds @@ -139,7 +134,9 @@ impl ReceivesTcCore for PusDistributor { } } -impl ReceivesCcsdsTc for PusDistributor { +impl, E: 'static> ReceivesCcsdsTc + for PusDistributor +{ type Error = PusDistribError; fn pass_ccsds(&mut self, header: &SpHeader, tm_raw: &[u8]) -> Result<(), Self::Error> { let (tc, _) = PusTcReader::new(tm_raw).map_err(|e| PusDistribError::PusError(e))?; @@ -147,34 +144,39 @@ impl ReceivesCcsdsTc for PusDistributor { } } -impl ReceivesEcssPusTc for PusDistributor { +impl, E: 'static> ReceivesEcssPusTc + for PusDistributor +{ type Error = PusDistribError; fn pass_pus_tc(&mut self, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { - self.service_provider - .handle_pus_tc_packet(pus_tc.service(), header, pus_tc) + self.service_distributor + .distribute_packet(pus_tc.service(), header, pus_tc) .map_err(|e| PusDistribError::CustomError(e)) } } -impl PusDistributor { - pub fn service_provider_ref>(&self) -> Option<&T> { - self.service_provider.downcast_ref::() +impl, E: 'static> + PusDistributor +{ + pub fn service_distributor(&self) -> &ServiceDistributor { + &self.service_distributor } - pub fn service_provider_mut>( - &mut self, - ) -> Option<&mut T> { - self.service_provider.downcast_mut::() + pub fn service_distributor_mut(&mut self) -> &mut ServiceDistributor { + &mut self.service_distributor } } #[cfg(test)] mod tests { use super::*; + use crate::queue::GenericSendError; use crate::tmtc::ccsds_distrib::tests::{ - generate_ping_tc, BasicApidHandlerOwnedQueue, BasicApidHandlerSharedQueue, + generate_ping_tc, generate_ping_tc_as_vec, BasicApidHandlerOwnedQueue, + BasicApidHandlerSharedQueue, }; use crate::tmtc::ccsds_distrib::{CcsdsDistributor, CcsdsPacketHandler}; + use alloc::format; use alloc::vec::Vec; use spacepackets::ecss::PusError; use spacepackets::CcsdsPacket; @@ -185,54 +187,65 @@ mod tests { fn is_send(_: &T) {} - struct PusHandlerSharedQueue { - pub pus_queue: Arc)>>>, + pub struct PacketInfo { + pub service: u8, + pub apid: u16, + pub packet: Vec, } + struct PusHandlerSharedQueue(Arc>>); + #[derive(Default)] - struct PusHandlerOwnedQueue { - pub pus_queue: VecDeque<(u8, u16, Vec)>, - } + struct PusHandlerOwnedQueue(VecDeque); - impl PusServiceProvider for PusHandlerSharedQueue { + impl PusServiceDistributor for PusHandlerSharedQueue { type Error = PusError; - fn handle_pus_tc_packet( + fn distribute_packet( &mut self, service: u8, sp_header: &SpHeader, pus_tc: &PusTcReader, ) -> Result<(), Self::Error> { - let mut vec: Vec = Vec::new(); - vec.extend_from_slice(pus_tc.raw_data()); - Ok(self - .pus_queue + let mut packet: Vec = Vec::new(); + packet.extend_from_slice(pus_tc.raw_data()); + self.0 .lock() .expect("Mutex lock failed") - .push_back((service, sp_header.apid(), vec))) + .push_back(PacketInfo { + service, + apid: sp_header.apid(), + packet, + }); + Ok(()) } } - impl PusServiceProvider for PusHandlerOwnedQueue { + impl PusServiceDistributor for PusHandlerOwnedQueue { type Error = PusError; - fn handle_pus_tc_packet( + fn distribute_packet( &mut self, service: u8, sp_header: &SpHeader, pus_tc: &PusTcReader, ) -> Result<(), Self::Error> { - let mut vec: Vec = Vec::new(); - vec.extend_from_slice(pus_tc.raw_data()); - Ok(self.pus_queue.push_back((service, sp_header.apid(), vec))) + let mut packet: Vec = Vec::new(); + packet.extend_from_slice(pus_tc.raw_data()); + self.0.push_back(PacketInfo { + service, + apid: sp_header.apid(), + packet, + }); + Ok(()) } } struct ApidHandlerShared { - pub pus_distrib: PusDistributor, + pub pus_distrib: PusDistributor, pub handler_base: BasicApidHandlerSharedQueue, } struct ApidHandlerOwned { - pub pus_distrib: PusDistributor, + pub pus_distrib: PusDistributor, handler_base: BasicApidHandlerOwnedQueue, } @@ -285,28 +298,36 @@ mod tests { } #[test] - #[cfg(feature = "std")] - fn test_pus_distribution() { + fn test_pus_distribution_as_raw_packet() { + let mut pus_distrib = PusDistributor::new(PusHandlerOwnedQueue::default()); + let tc = generate_ping_tc_as_vec(); + let result = pus_distrib.pass_tc(&tc); + assert!(result.is_ok()); + assert_eq!(pus_distrib.service_distributor_mut().0.len(), 1); + let packet_info = pus_distrib.service_distributor_mut().0.pop_front().unwrap(); + assert_eq!(packet_info.service, 17); + assert_eq!(packet_info.apid, 0x002); + assert_eq!(packet_info.packet, tc); + } + + #[test] + fn test_pus_distribution_combined_handler() { let known_packet_queue = Arc::new(Mutex::default()); let unknown_packet_queue = Arc::new(Mutex::default()); let pus_queue = Arc::new(Mutex::default()); - let pus_handler = PusHandlerSharedQueue { - pus_queue: pus_queue.clone(), - }; + let pus_handler = PusHandlerSharedQueue(pus_queue.clone()); let handler_base = BasicApidHandlerSharedQueue { known_packet_queue: known_packet_queue.clone(), unknown_packet_queue: unknown_packet_queue.clone(), }; - let pus_distrib = PusDistributor { - service_provider: Box::new(pus_handler), - }; + let pus_distrib = PusDistributor::new(pus_handler); is_send(&pus_distrib); let apid_handler = ApidHandlerShared { pus_distrib, handler_base, }; - let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler)); + let mut ccsds_distrib = CcsdsDistributor::new(apid_handler); let mut test_buf: [u8; 32] = [0; 32]; let tc_slice = generate_ping_tc(test_buf.as_mut_slice()); @@ -322,25 +343,23 @@ mod tests { assert_eq!(packet.as_slice(), tc_slice); let recvd_pus = pus_queue.lock().unwrap().pop_front(); assert!(recvd_pus.is_some()); - let (service, apid, tc_raw) = recvd_pus.unwrap(); - assert_eq!(service, 17); - assert_eq!(apid, 0x002); - assert_eq!(tc_raw, tc_slice); + let packet_info = recvd_pus.unwrap(); + assert_eq!(packet_info.service, 17); + assert_eq!(packet_info.apid, 0x002); + assert_eq!(packet_info.packet, tc_slice); } #[test] - fn test_as_any_cast() { + fn test_accessing_combined_distributor() { let pus_handler = PusHandlerOwnedQueue::default(); let handler_base = BasicApidHandlerOwnedQueue::default(); - let pus_distrib = PusDistributor { - service_provider: Box::new(pus_handler), - }; + let pus_distrib = PusDistributor::new(pus_handler); let apid_handler = ApidHandlerOwned { pus_distrib, handler_base, }; - let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler)); + let mut ccsds_distrib = CcsdsDistributor::new(apid_handler); let mut test_buf: [u8; 32] = [0; 32]; let tc_slice = generate_ping_tc(test_buf.as_mut_slice()); @@ -349,21 +368,38 @@ mod tests { .pass_tc(tc_slice) .expect("Passing TC slice failed"); - let apid_handler_casted_back: &mut ApidHandlerOwned = ccsds_distrib - .apid_handler_mut() - .expect("Cast to concrete type ApidHandler failed"); + let apid_handler_casted_back = ccsds_distrib.packet_handler_mut(); assert!(!apid_handler_casted_back .handler_base .known_packet_queue .is_empty()); - let handler_casted_back: &mut PusHandlerOwnedQueue = apid_handler_casted_back + let handler_owned_queue = apid_handler_casted_back .pus_distrib - .service_provider_mut() - .expect("Cast to concrete type PusHandlerOwnedQueue failed"); - assert!(!handler_casted_back.pus_queue.is_empty()); - let (service, apid, packet_raw) = handler_casted_back.pus_queue.pop_front().unwrap(); - assert_eq!(service, 17); - assert_eq!(apid, 0x002); - assert_eq!(packet_raw.as_slice(), tc_slice); + .service_distributor_mut(); + assert!(!handler_owned_queue.0.is_empty()); + let packet_info = handler_owned_queue.0.pop_front().unwrap(); + assert_eq!(packet_info.service, 17); + assert_eq!(packet_info.apid, 0x002); + assert_eq!(packet_info.packet, tc_slice); + } + + #[test] + fn test_pus_distrib_error_custom_error() { + let error = PusDistribError::CustomError(GenericSendError::RxDisconnected); + let error_string = format!("{}", error); + assert_eq!( + error_string, + "pus distribution error: rx side has disconnected" + ); + } + + #[test] + fn test_pus_distrib_error_pus_error() { + let error = PusDistribError::::PusError(PusError::CrcCalculationMissing); + let error_string = format!("{}", error); + assert_eq!( + error_string, + "pus distribution error: crc16 was not calculated" + ); } }