diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 9844424..0cdd462 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -182,7 +182,7 @@ fn static_tmtc_pool_main() { ); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone())); + let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone()); let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { @@ -193,7 +193,7 @@ fn static_tmtc_pool_main() { }, }; - let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver); let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); let sync_tm_tcp_source = SyncTcpTmSource::new(200); let mut tcp_server = TcpTask::new( @@ -396,7 +396,7 @@ fn dyn_tmtc_pool_main() { ); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone())); + let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone()); let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { @@ -406,7 +406,7 @@ fn dyn_tmtc_pool_main() { }, }; - let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver); let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); let sync_tm_tcp_source = SyncTcpTmSource::new(200); let mut tcp_server = TcpTask::new( diff --git a/satrs-example/src/tcp.rs b/satrs-example/src/tcp.rs index 566b445..014f300 100644 --- a/satrs-example/src/tcp.rs +++ b/satrs-example/src/tcp.rs @@ -6,11 +6,14 @@ use std::{ use log::{info, warn}; use satrs::{ hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer}, + pus::ReceivesEcssPusTc, spacepackets::PacketId, - tmtc::{CcsdsDistributor, CcsdsError, TmPacketSourceCore}, + tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore}, }; use satrs_example::config::PUS_APID; +use crate::ccsds::CcsdsReceiver; + pub const PACKET_ID_LOOKUP: &[PacketId] = &[PacketId::const_tc(true, PUS_APID)]; #[derive(Default, Clone)] @@ -69,20 +72,37 @@ impl TmPacketSourceCore for SyncTcpTmSource { } } -pub struct TcpTask { - server: TcpSpacepacketsServer< - (), - CcsdsError, - SyncTcpTmSource, - CcsdsDistributor, - >, +pub type TcpServerType = TcpSpacepacketsServer< + (), + CcsdsError, + SyncTcpTmSource, + CcsdsDistributor, MpscErrorType>, +>; + +pub struct TcpTask< + TcSource: ReceivesCcsdsTc + + ReceivesEcssPusTc + + Clone + + Send + + 'static, + MpscErrorType: 'static, +> { + server: TcpServerType, } -impl TcpTask { +impl< + TcSource: ReceivesCcsdsTc + + ReceivesEcssPusTc + + Clone + + Send + + 'static, + MpscErrorType: 'static + core::fmt::Debug, + > TcpTask +{ pub fn new( cfg: ServerConfig, tm_source: SyncTcpTmSource, - tc_receiver: CcsdsDistributor, + tc_receiver: CcsdsDistributor, MpscErrorType>, ) -> Result { Ok(Self { server: TcpSpacepacketsServer::new( diff --git a/satrs/src/tmtc/ccsds_distrib.rs b/satrs/src/tmtc/ccsds_distrib.rs index 74707e4..14fbfa5 100644 --- a/satrs/src/tmtc/ccsds_distrib.rs +++ b/satrs/src/tmtc/ccsds_distrib.rs @@ -52,7 +52,7 @@ //! } //! //! let apid_handler = ConcreteApidHandler::default(); -//! let mut ccsds_distributor = CcsdsDistributor::new(Box::new(apid_handler)); +//! let mut ccsds_distributor = CcsdsDistributor::new(apid_handler); //! //! // Create and pass PUS telecommand with a valid APID //! let mut space_packet_header = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap(); @@ -72,23 +72,17 @@ //! let tc_slice = &test_buf[0..size]; //! ccsds_distributor.pass_tc(&tc_slice).expect("Passing TC slice failed"); //! -//! // User helper function to retrieve concrete class -//! let concrete_handler_ref: &ConcreteApidHandler = ccsds_distributor -//! .apid_handler_ref() -//! .expect("Casting back to concrete type failed"); -//! assert_eq!(concrete_handler_ref.known_call_count, 1); -//! assert_eq!(concrete_handler_ref.unknown_call_count, 1); +//! // Retrieve the APID handler. +//! let handler_ref = ccsds_distributor.packet_handler(); +//! assert_eq!(handler_ref.known_call_count, 1); +//! assert_eq!(handler_ref.unknown_call_count, 1); //! -//! // It's also possible to retrieve a mutable reference -//! let mutable_ref: &mut ConcreteApidHandler = ccsds_distributor -//! .apid_handler_mut() -//! .expect("Casting back to concrete type failed"); -//! mutable_ref.mutable_foo(); +//! // Mutable access to the handler. +//! let mutable_handler_ref = ccsds_distributor.packet_handler_mut(); +//! mutable_handler_ref.mutable_foo(); //! ``` use crate::tmtc::{ReceivesCcsdsTc, ReceivesTcCore}; -use alloc::boxed::Box; use core::fmt::{Display, Formatter}; -use downcast_rs::Downcast; use spacepackets::{ByteConversionError, CcsdsPacket, SpHeader}; #[cfg(feature = "std")] use std::error::Error; @@ -99,11 +93,7 @@ use std::error::Error; /// instance of this handler to the [CcsdsDistributor]. The distributor will use the trait /// interface to dispatch received packets to the user based on the Application Process Identifier /// (APID) field of the CCSDS packet. -/// -/// This trait automatically implements the [downcast_rs::Downcast] to allow a more convenient API -/// to cast trait objects back to their concrete type after the handler was passed to the -/// distributor. -pub trait CcsdsPacketHandler: Downcast { +pub trait CcsdsPacketHandler { type Error; fn valid_apids(&self) -> &'static [u16]; @@ -116,23 +106,15 @@ pub trait CcsdsPacketHandler: Downcast { ) -> Result<(), Self::Error>; } -downcast_rs::impl_downcast!(CcsdsPacketHandler assoc Error); - -pub trait SendableCcsdsPacketHandler: CcsdsPacketHandler + Send {} - -impl SendableCcsdsPacketHandler for T {} - -downcast_rs::impl_downcast!(SendableCcsdsPacketHandler assoc Error); - /// The CCSDS distributor dispatches received CCSDS packets to a user provided packet handler. /// /// The passed APID handler is required to be [Send]able to allow more ergonomic usage with /// threads. -pub struct CcsdsDistributor { +pub struct CcsdsDistributor, E> { /// User provided APID handler stored as a generic trait object. /// It can be cast back to the original concrete type using the [Self::apid_handler_ref] or /// the [Self::apid_handler_mut] method. - pub apid_handler: Box>, + pub apid_handler: PacketHandler, } #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -160,7 +142,9 @@ impl Error for CcsdsError { } } -impl ReceivesCcsdsTc for CcsdsDistributor { +impl, E: 'static> ReceivesCcsdsTc + for CcsdsDistributor +{ type Error = CcsdsError; fn pass_ccsds(&mut self, header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { @@ -168,7 +152,9 @@ impl ReceivesCcsdsTc for CcsdsDistributor { } } -impl ReceivesTcCore for CcsdsDistributor { +impl, E: 'static> ReceivesTcCore + for CcsdsDistributor +{ type Error = CcsdsError; fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { @@ -186,22 +172,16 @@ impl ReceivesTcCore for CcsdsDistributor { } } -impl CcsdsDistributor { - pub fn new(apid_handler: Box>) -> Self { +impl, E: 'static> CcsdsDistributor { + pub fn new(apid_handler: PacketHandler) -> Self { CcsdsDistributor { apid_handler } } - /// This function can be used to retrieve a reference to the concrete instance of the APID - /// handler after it was passed to the distributor. See the - /// [module documentation][crate::tmtc::ccsds_distrib] for an fsrc-example. - pub fn apid_handler_ref>(&self) -> Option<&T> { - self.apid_handler.downcast_ref::() + pub fn packet_handler(&self) -> &PacketHandler { + &self.apid_handler } - - /// This function can be used to retrieve a mutable reference to the concrete instance of the - /// APID handler after it was passed to the distributor. - pub fn apid_handler_mut>(&mut self) -> Option<&mut T> { - self.apid_handler.downcast_mut::() + pub fn packet_handler_mut(&mut self) -> &mut PacketHandler { + &mut self.apid_handler } fn dispatch_ccsds(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), CcsdsError> { @@ -305,7 +285,8 @@ pub(crate) mod tests { ) -> Result<(), Self::Error> { let mut vec = Vec::new(); vec.extend_from_slice(tc_raw); - Ok(self.known_packet_queue.push_back((sp_header.apid(), vec))) + self.known_packet_queue.push_back((sp_header.apid(), vec)); + Ok(()) } fn handle_unknown_apid( @@ -315,7 +296,8 @@ pub(crate) mod tests { ) -> Result<(), Self::Error> { let mut vec = Vec::new(); vec.extend_from_slice(tc_raw); - Ok(self.unknown_packet_queue.push_back((sp_header.apid(), vec))) + self.unknown_packet_queue.push_back((sp_header.apid(), vec)); + Ok(()) } } @@ -327,7 +309,7 @@ pub(crate) mod tests { known_packet_queue: known_packet_queue.clone(), unknown_packet_queue: unknown_packet_queue.clone(), }; - let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler)); + let mut ccsds_distrib = CcsdsDistributor::new(apid_handler); is_send(&ccsds_distrib); let mut test_buf: [u8; 32] = [0; 32]; let tc_slice = generate_ping_tc(test_buf.as_mut_slice()); @@ -349,7 +331,7 @@ pub(crate) mod tests { known_packet_queue: known_packet_queue.clone(), unknown_packet_queue: unknown_packet_queue.clone(), }; - let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler)); + let mut ccsds_distrib = CcsdsDistributor::new(apid_handler); let mut sph = SpHeader::tc_unseg(0x004, 0x34, 0).unwrap(); let pus_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); let mut test_buf: [u8; 32] = [0; 32]; diff --git a/satrs/src/tmtc/mod.rs b/satrs/src/tmtc/mod.rs index b930359..d4f3333 100644 --- a/satrs/src/tmtc/mod.rs +++ b/satrs/src/tmtc/mod.rs @@ -18,7 +18,7 @@ pub mod tm_helper; #[cfg(feature = "alloc")] pub use ccsds_distrib::{CcsdsDistributor, CcsdsError, CcsdsPacketHandler}; #[cfg(feature = "alloc")] -pub use pus_distrib::{PusDistributor, PusServiceProvider}; +pub use pus_distrib::{PusDistributor, PusServiceDistributor}; /// Generic trait for object which can receive any telecommands in form of a raw bytestream, with /// no assumptions about the received protocol. diff --git a/satrs/src/tmtc/pus_distrib.rs b/satrs/src/tmtc/pus_distrib.rs index 9f3dc4f..ad12937 100644 --- a/satrs/src/tmtc/pus_distrib.rs +++ b/satrs/src/tmtc/pus_distrib.rs @@ -19,19 +19,20 @@ //! //! ```rust //! use spacepackets::ecss::WritablePusPacket; -//! use satrs::tmtc::pus_distrib::{PusDistributor, PusServiceProvider}; +//! use satrs::tmtc::pus_distrib::{PusDistributor, PusServiceDistributor}; //! use satrs::tmtc::{ReceivesTc, ReceivesTcCore}; //! use spacepackets::SpHeader; //! use spacepackets::ecss::tc::{PusTcCreator, PusTcReader}; +//! //! struct ConcretePusHandler { //! handler_call_count: u32 //! } //! //! // This is a very simple possible service provider. It increments an internal call count field, //! // which is used to verify the handler was called -//! impl PusServiceProvider for ConcretePusHandler { +//! impl PusServiceDistributor for ConcretePusHandler { //! type Error = (); -//! fn handle_pus_tc_packet(&mut self, service: u8, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { +//! fn distribute_packet(&mut self, service: u8, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { //! assert_eq!(service, 17); //! assert_eq!(pus_tc.len_packed(), 13); //! self.handler_call_count += 1; @@ -42,7 +43,7 @@ //! let service_handler = ConcretePusHandler { //! handler_call_count: 0 //! }; -//! let mut pus_distributor = PusDistributor::new(Box::new(service_handler)); +//! let mut pus_distributor = PusDistributor::new(service_handler); //! //! // Create and pass PUS ping telecommand with a valid APID //! let mut space_packet_header = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap(); @@ -57,49 +58,42 @@ //! //! // User helper function to retrieve concrete class. We check the call count here to verify //! // that the PUS ping telecommand was routed successfully. -//! let concrete_handler_ref: &ConcretePusHandler = pus_distributor -//! .service_provider_ref() -//! .expect("Casting back to concrete type failed"); -//! assert_eq!(concrete_handler_ref.handler_call_count, 1); +//! let concrete_handler = pus_distributor.service_distributor(); +//! assert_eq!(concrete_handler.handler_call_count, 1); //! ``` use crate::pus::ReceivesEcssPusTc; use crate::tmtc::{ReceivesCcsdsTc, ReceivesTcCore}; -use alloc::boxed::Box; use core::fmt::{Display, Formatter}; -use downcast_rs::Downcast; use spacepackets::ecss::tc::PusTcReader; use spacepackets::ecss::{PusError, PusPacket}; use spacepackets::SpHeader; #[cfg(feature = "std")] use std::error::Error; -pub trait PusServiceProvider: Downcast { +/// Trait for a generic distributor object which can distribute PUS packets based on packet +/// properties like the PUS service, space packet header or any other content of the PUS packet. +pub trait PusServiceDistributor { type Error; - fn handle_pus_tc_packet( + fn distribute_packet( &mut self, service: u8, header: &SpHeader, pus_tc: &PusTcReader, ) -> Result<(), Self::Error>; } -downcast_rs::impl_downcast!(PusServiceProvider assoc Error); - -pub trait SendablePusServiceProvider: PusServiceProvider + Send {} - -impl SendablePusServiceProvider for T {} - -downcast_rs::impl_downcast!(SendablePusServiceProvider assoc Error); /// Generic distributor object which dispatches received packets to a user provided handler. /// /// This distributor expects the passed trait object to be [Send]able to allow more ergonomic /// usage with threads. -pub struct PusDistributor { - pub service_provider: Box>, +pub struct PusDistributor, E> { + service_provider: ServiceDistributor, } -impl PusDistributor { - pub fn new(service_provider: Box>) -> Self { +impl, E> + PusDistributor +{ + pub fn new(service_provider: ServiceDistributor) -> Self { PusDistributor { service_provider } } } @@ -129,7 +123,9 @@ impl Error for PusDistribError { } } -impl ReceivesTcCore for PusDistributor { +impl, E: 'static> ReceivesTcCore + for PusDistributor +{ type Error = PusDistribError; fn pass_tc(&mut self, tm_raw: &[u8]) -> Result<(), Self::Error> { // Convert to ccsds and call pass_ccsds @@ -139,7 +135,9 @@ impl ReceivesTcCore for PusDistributor { } } -impl ReceivesCcsdsTc for PusDistributor { +impl, E: 'static> ReceivesCcsdsTc + for PusDistributor +{ type Error = PusDistribError; fn pass_ccsds(&mut self, header: &SpHeader, tm_raw: &[u8]) -> Result<(), Self::Error> { let (tc, _) = PusTcReader::new(tm_raw).map_err(|e| PusDistribError::PusError(e))?; @@ -147,24 +145,26 @@ impl ReceivesCcsdsTc for PusDistributor { } } -impl ReceivesEcssPusTc for PusDistributor { +impl, E: 'static> ReceivesEcssPusTc + for PusDistributor +{ type Error = PusDistribError; fn pass_pus_tc(&mut self, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { self.service_provider - .handle_pus_tc_packet(pus_tc.service(), header, pus_tc) + .distribute_packet(pus_tc.service(), header, pus_tc) .map_err(|e| PusDistribError::CustomError(e)) } } -impl PusDistributor { - pub fn service_provider_ref>(&self) -> Option<&T> { - self.service_provider.downcast_ref::() +impl, E: 'static> + PusDistributor +{ + pub fn service_distributor(&self) -> &ServiceDistributor { + &self.service_provider } - pub fn service_provider_mut>( - &mut self, - ) -> Option<&mut T> { - self.service_provider.downcast_mut::() + pub fn service_distributor_mut(&mut self) -> &mut ServiceDistributor { + &mut self.service_provider } } @@ -185,54 +185,64 @@ mod tests { fn is_send(_: &T) {} - struct PusHandlerSharedQueue { - pub pus_queue: Arc)>>>, + pub struct PacketInfo { + pub service: u8, + pub apid: u16, + pub packet: Vec, } + struct PusHandlerSharedQueue(Arc>>); #[derive(Default)] - struct PusHandlerOwnedQueue { - pub pus_queue: VecDeque<(u8, u16, Vec)>, - } + struct PusHandlerOwnedQueue(VecDeque); - impl PusServiceProvider for PusHandlerSharedQueue { + impl PusServiceDistributor for PusHandlerSharedQueue { type Error = PusError; - fn handle_pus_tc_packet( + fn distribute_packet( &mut self, service: u8, sp_header: &SpHeader, pus_tc: &PusTcReader, ) -> Result<(), Self::Error> { - let mut vec: Vec = Vec::new(); - vec.extend_from_slice(pus_tc.raw_data()); - Ok(self - .pus_queue + let mut packet: Vec = Vec::new(); + packet.extend_from_slice(pus_tc.raw_data()); + self.0 .lock() .expect("Mutex lock failed") - .push_back((service, sp_header.apid(), vec))) + .push_back(PacketInfo { + service, + apid: sp_header.apid(), + packet, + }); + Ok(()) } } - impl PusServiceProvider for PusHandlerOwnedQueue { + impl PusServiceDistributor for PusHandlerOwnedQueue { type Error = PusError; - fn handle_pus_tc_packet( + fn distribute_packet( &mut self, service: u8, sp_header: &SpHeader, pus_tc: &PusTcReader, ) -> Result<(), Self::Error> { - let mut vec: Vec = Vec::new(); - vec.extend_from_slice(pus_tc.raw_data()); - Ok(self.pus_queue.push_back((service, sp_header.apid(), vec))) + let mut packet: Vec = Vec::new(); + packet.extend_from_slice(pus_tc.raw_data()); + self.0.push_back(PacketInfo { + service, + apid: sp_header.apid(), + packet, + }); + Ok(()) } } struct ApidHandlerShared { - pub pus_distrib: PusDistributor, + pub pus_distrib: PusDistributor, pub handler_base: BasicApidHandlerSharedQueue, } struct ApidHandlerOwned { - pub pus_distrib: PusDistributor, + pub pus_distrib: PusDistributor, handler_base: BasicApidHandlerOwnedQueue, } @@ -290,23 +300,21 @@ mod tests { let known_packet_queue = Arc::new(Mutex::default()); let unknown_packet_queue = Arc::new(Mutex::default()); let pus_queue = Arc::new(Mutex::default()); - let pus_handler = PusHandlerSharedQueue { - pus_queue: pus_queue.clone(), - }; + let pus_handler = PusHandlerSharedQueue(pus_queue.clone()); let handler_base = BasicApidHandlerSharedQueue { known_packet_queue: known_packet_queue.clone(), unknown_packet_queue: unknown_packet_queue.clone(), }; let pus_distrib = PusDistributor { - service_provider: Box::new(pus_handler), + service_provider: pus_handler, }; is_send(&pus_distrib); let apid_handler = ApidHandlerShared { pus_distrib, handler_base, }; - let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler)); + let mut ccsds_distrib = CcsdsDistributor::new(apid_handler); let mut test_buf: [u8; 32] = [0; 32]; let tc_slice = generate_ping_tc(test_buf.as_mut_slice()); @@ -322,25 +330,23 @@ mod tests { assert_eq!(packet.as_slice(), tc_slice); let recvd_pus = pus_queue.lock().unwrap().pop_front(); assert!(recvd_pus.is_some()); - let (service, apid, tc_raw) = recvd_pus.unwrap(); - assert_eq!(service, 17); - assert_eq!(apid, 0x002); - assert_eq!(tc_raw, tc_slice); + let packet_info = recvd_pus.unwrap(); + assert_eq!(packet_info.service, 17); + assert_eq!(packet_info.apid, 0x002); + assert_eq!(packet_info.packet, tc_slice); } #[test] - fn test_as_any_cast() { + fn test_accessing_distributor() { let pus_handler = PusHandlerOwnedQueue::default(); let handler_base = BasicApidHandlerOwnedQueue::default(); - let pus_distrib = PusDistributor { - service_provider: Box::new(pus_handler), - }; + let pus_distrib = PusDistributor::new(pus_handler); let apid_handler = ApidHandlerOwned { pus_distrib, handler_base, }; - let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler)); + let mut ccsds_distrib = CcsdsDistributor::new(apid_handler); let mut test_buf: [u8; 32] = [0; 32]; let tc_slice = generate_ping_tc(test_buf.as_mut_slice()); @@ -349,21 +355,18 @@ mod tests { .pass_tc(tc_slice) .expect("Passing TC slice failed"); - let apid_handler_casted_back: &mut ApidHandlerOwned = ccsds_distrib - .apid_handler_mut() - .expect("Cast to concrete type ApidHandler failed"); + let apid_handler_casted_back = ccsds_distrib.packet_handler_mut(); assert!(!apid_handler_casted_back .handler_base .known_packet_queue .is_empty()); - let handler_casted_back: &mut PusHandlerOwnedQueue = apid_handler_casted_back + let handler_owned_queue = apid_handler_casted_back .pus_distrib - .service_provider_mut() - .expect("Cast to concrete type PusHandlerOwnedQueue failed"); - assert!(!handler_casted_back.pus_queue.is_empty()); - let (service, apid, packet_raw) = handler_casted_back.pus_queue.pop_front().unwrap(); - assert_eq!(service, 17); - assert_eq!(apid, 0x002); - assert_eq!(packet_raw.as_slice(), tc_slice); + .service_distributor_mut(); + assert!(!handler_owned_queue.0.is_empty()); + let packet_info = handler_owned_queue.0.pop_front().unwrap(); + assert_eq!(packet_info.service, 17); + assert_eq!(packet_info.apid, 0x002); + assert_eq!(packet_info.packet, tc_slice); } }