From cecf9c263e3c289f58754fc5a018fb52789c70ca Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Sat, 13 Apr 2024 15:08:08 +0200 Subject: [PATCH] re-worked TMTC modules --- satrs-example/src/interface/tcp.rs | 52 +-- satrs-example/src/interface/udp.rs | 75 ++-- satrs-example/src/main.rs | 58 +-- satrs-example/src/pus/mod.rs | 64 +-- satrs-example/src/pus/scheduler.rs | 11 +- satrs-example/src/tmtc/ccsds.rs | 53 --- satrs-example/src/tmtc/mod.rs | 227 +++------- satrs-example/src/tmtc/tc_source.rs | 129 ++++++ .../src/tmtc/{tm_funnel.rs => tm_sink.rs} | 0 satrs/CHANGELOG.md | 5 + satrs/src/encoding/ccsds.rs | 10 +- satrs/src/encoding/cobs.rs | 4 +- satrs/src/encoding/mod.rs | 4 +- satrs/src/hal/std/tcp_cobs_server.rs | 2 +- satrs/src/hal/std/tcp_server.rs | 10 +- satrs/src/hal/std/tcp_spacepackets_server.rs | 10 +- satrs/src/hal/std/udp_server.rs | 92 ++-- satrs/src/pus/mod.rs | 8 +- satrs/src/tmtc/ccsds_distrib.rs | 403 ----------------- satrs/src/tmtc/mod.rs | 109 +++-- satrs/src/tmtc/pus_distrib.rs | 414 ------------------ satrs/src/tmtc/tc_helper.rs | 109 +++++ satrs/tests/tcp_servers.rs | 6 +- 23 files changed, 558 insertions(+), 1297 deletions(-) delete mode 100644 satrs-example/src/tmtc/ccsds.rs create mode 100644 satrs-example/src/tmtc/tc_source.rs rename satrs-example/src/tmtc/{tm_funnel.rs => tm_sink.rs} (100%) delete mode 100644 satrs/src/tmtc/ccsds_distrib.rs delete mode 100644 satrs/src/tmtc/pus_distrib.rs create mode 100644 satrs/src/tmtc/tc_helper.rs diff --git a/satrs-example/src/interface/tcp.rs b/satrs-example/src/interface/tcp.rs index b12d4b4..fc13493 100644 --- a/satrs-example/src/interface/tcp.rs +++ b/satrs-example/src/interface/tcp.rs @@ -1,18 +1,17 @@ use std::{ collections::{HashSet, VecDeque}, + fmt::Debug, + marker::PhantomData, sync::{Arc, Mutex}, }; use log::{info, warn}; use satrs::{ hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer}, - pus::ReceivesEcssPusTc, spacepackets::PacketId, - tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore}, + tmtc::{ReceivesTc, TmPacketSource}, }; -use crate::tmtc::ccsds::CcsdsReceiver; - #[derive(Default)] pub struct ConnectionFinishedHandler {} @@ -53,7 +52,7 @@ impl SyncTcpTmSource { } } -impl TmPacketSourceCore for SyncTcpTmSource { +impl TmPacketSource for SyncTcpTmSource { type Error = (); fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { @@ -81,56 +80,45 @@ impl TmPacketSourceCore for SyncTcpTmSource { } } -pub type TcpServerType = TcpSpacepacketsServer< +pub type TcpServer = TcpSpacepacketsServer< SyncTcpTmSource, - CcsdsDistributor, MpscErrorType>, + ReceivesTc, HashSet, ConnectionFinishedHandler, (), - CcsdsError, + SendError, >; -pub struct TcpTask< - TcSource: ReceivesCcsdsTc - + ReceivesEcssPusTc - + Clone - + Send - + 'static, - MpscErrorType: 'static, -> { - server: TcpServerType, -} +pub struct TcpTask, SendError: Debug + 'static>( + pub TcpServer, + PhantomData, +); -impl< - TcSource: ReceivesCcsdsTc - + ReceivesEcssPusTc - + Clone - + Send - + 'static, - MpscErrorType: 'static + core::fmt::Debug, - > TcpTask +impl, SendError: Debug + 'static> + TcpTask { pub fn new( cfg: ServerConfig, tm_source: SyncTcpTmSource, - tc_receiver: CcsdsDistributor, MpscErrorType>, + tc_sender: TcSender, packet_id_lookup: HashSet, ) -> Result { - Ok(Self { - server: TcpSpacepacketsServer::new( + Ok(Self( + TcpSpacepacketsServer::new( cfg, tm_source, - tc_receiver, + tc_sender, packet_id_lookup, ConnectionFinishedHandler::default(), None, )?, - }) + PhantomData, + )) } pub fn periodic_operation(&mut self) { loop { - let result = self.server.handle_next_connection(None); + let result = self.0.handle_next_connection(None); match result { Ok(_conn_result) => (), Err(e) => { diff --git a/satrs-example/src/interface/udp.rs b/satrs-example/src/interface/udp.rs index 2cb4823..269a526 100644 --- a/satrs-example/src/interface/udp.rs +++ b/satrs-example/src/interface/udp.rs @@ -1,12 +1,13 @@ +use core::fmt::Debug; use std::net::{SocketAddr, UdpSocket}; use std::sync::mpsc; use log::{info, warn}; use satrs::pus::{PusTmAsVec, PusTmInPool}; +use satrs::tmtc::ReceivesTc; use satrs::{ hal::std::udp_server::{ReceiveResult, UdpTcServer}, pool::{PoolProviderWithGuards, SharedStaticMemoryPool}, - tmtc::CcsdsError, }; pub trait UdpTmHandler { @@ -64,13 +65,20 @@ impl UdpTmHandler for DynamicUdpTmHandler { } } -pub struct UdpTmtcServer { - pub udp_tc_server: UdpTcServer>, +pub struct UdpTmtcServer< + TcSender: ReceivesTc, + TmHandler: UdpTmHandler, + SendError, +> { + pub udp_tc_server: UdpTcServer, pub tm_handler: TmHandler, } -impl - UdpTmtcServer +impl< + TcSender: ReceivesTc, + TmHandler: UdpTmHandler, + SendError: Debug + 'static, + > UdpTmtcServer { pub fn periodic_operation(&mut self) { while self.poll_tc_server() {} @@ -84,21 +92,15 @@ impl match self.udp_tc_server.try_recv_tc() { Ok(_) => true, Err(e) => match e { - ReceiveResult::ReceiverError(e) => match e { - CcsdsError::ByteConversionError(e) => { - warn!("packet error: {e:?}"); - true - } - CcsdsError::CustomError(e) => { - warn!("mpsc custom error {e:?}"); - true - } - }, - ReceiveResult::IoError(e) => { + ReceiveResult::Io(e) => { warn!("IO error {e}"); false } ReceiveResult::NothingReceived => false, + ReceiveResult::Send(send_error) => { + warn!("send error {send_error:?}"); + false + } }, } } @@ -117,21 +119,21 @@ mod tests { ecss::{tc::PusTcCreator, WritablePusPacket}, SpHeader, }, - tmtc::ReceivesTcCore, + tmtc::ReceivesTc, }; use satrs_example::config::{components, OBSW_SERVER_ADDR}; use super::*; - #[derive(Default, Debug, Clone)] - pub struct TestReceiver { - tc_vec: Arc>>>, + #[derive(Default, Debug)] + pub struct TestSender { + tc_vec: VecDeque>, } - impl ReceivesTcCore for TestReceiver { - type Error = CcsdsError<()>; + impl ReceivesTc for TestSender { + type Error = (); fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.tc_vec.lock().unwrap().push_back(tc_raw.to_vec()); + self.tc_vec.push_back(tc_raw.to_vec()); Ok(()) } } @@ -150,9 +152,9 @@ mod tests { #[test] fn test_basic() { let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); - let test_receiver = TestReceiver::default(); - let tc_queue = test_receiver.tc_vec.clone(); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap(); + let test_receiver = TestSender::default(); + // let tc_queue = test_receiver.tc_vec.clone(); + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, test_receiver).unwrap(); let tm_handler = TestTmHandler::default(); let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); let mut udp_dyn_server = UdpTmtcServer { @@ -160,16 +162,16 @@ mod tests { tm_handler, }; udp_dyn_server.periodic_operation(); - assert!(tc_queue.lock().unwrap().is_empty()); + assert!(udp_dyn_server.udp_tc_server.tc_sender.tc_vec.is_empty()); assert!(tm_handler_calls.lock().unwrap().is_empty()); } #[test] fn test_transactions() { let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); - let test_receiver = TestReceiver::default(); - let tc_queue = test_receiver.tc_vec.clone(); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap(); + let test_receiver = TestSender::default(); + // let tc_queue = test_receiver.tc_vec.clone(); + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, test_receiver).unwrap(); let server_addr = udp_tc_server.socket.local_addr().unwrap(); let tm_handler = TestTmHandler::default(); let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); @@ -187,9 +189,14 @@ mod tests { client.send(&ping_tc).unwrap(); udp_dyn_server.periodic_operation(); { - let mut tc_queue = tc_queue.lock().unwrap(); - assert!(!tc_queue.is_empty()); - let received_tc = tc_queue.pop_front().unwrap(); + //let mut tc_queue = tc_queue.lock().unwrap(); + assert!(!udp_dyn_server.udp_tc_server.tc_sender.tc_vec.is_empty()); + let received_tc = udp_dyn_server + .udp_tc_server + .tc_sender + .tc_vec + .pop_front() + .unwrap(); assert_eq!(received_tc, ping_tc); } @@ -201,7 +208,7 @@ mod tests { assert_eq!(received_addr, client_addr); } udp_dyn_server.periodic_operation(); - assert!(tc_queue.lock().unwrap().is_empty()); + assert!(udp_dyn_server.udp_tc_server.tc_sender.tc_vec.is_empty()); // Still tries to send to the same client. { let mut tm_handler_calls = tm_handler_calls.lock().unwrap(); diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 2ecc23a..8acf832 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -10,19 +10,20 @@ mod tmtc; use crate::events::EventHandler; use crate::interface::udp::DynamicUdpTmHandler; use crate::pus::stack::PusStack; -use crate::tmtc::tm_funnel::{TmFunnelDynamic, TmFunnelStatic}; +use crate::tmtc::tc_source::{TcSourceTaskDynamic, TcSourceTaskStatic}; +use crate::tmtc::tm_sink::{TmFunnelDynamic, TmFunnelStatic}; use log::info; use pus::test::create_test_service_dynamic; use satrs::hal::std::tcp_server::ServerConfig; use satrs::hal::std::udp_server::UdpTcServer; use satrs::request::GenericMessage; +use satrs::tmtc::tc_helper::{SharedTcPool, TcSenderSharedPool}; use satrs::tmtc::tm_helper::SharedTmPool; use satrs_example::config::pool::{create_sched_tc_pool, create_static_pools}; use satrs_example::config::tasks::{ FREQ_MS_AOCS, FREQ_MS_EVENT_HANDLING, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC, }; use satrs_example::config::{OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT}; -use tmtc::PusTcSourceProviderDynamic; use crate::acs::mgm::{MgmHandlerLis3Mdl, MpscModeLeafInterface, SpiDummyInterface}; use crate::interface::tcp::{SyncTcpTmSource, TcpTask}; @@ -34,17 +35,12 @@ use crate::pus::hk::{create_hk_service_dynamic, create_hk_service_static}; use crate::pus::mode::{create_mode_service_dynamic, create_mode_service_static}; use crate::pus::scheduler::{create_scheduler_service_dynamic, create_scheduler_service_static}; use crate::pus::test::create_test_service_static; -use crate::pus::{PusReceiver, PusTcMpscRouter}; +use crate::pus::{PusTcDistributor, PusTcMpscRouter}; use crate::requests::{CompositeRequest, GenericRequestRouter}; -use crate::tmtc::ccsds::CcsdsReceiver; -use crate::tmtc::{ - PusTcSourceProviderSharedPool, SharedTcPool, TcSourceTaskDynamic, TcSourceTaskStatic, -}; use satrs::mode::ModeRequest; use satrs::pus::event_man::EventRequestWithToken; use satrs::pus::TmInSharedPoolSender; use satrs::spacepackets::{time::cds::CdsTime, time::TimeWriter}; -use satrs::tmtc::CcsdsDistributor; use satrs_example::config::components::MGM_HANDLER_0; use std::net::{IpAddr, SocketAddr}; use std::sync::mpsc; @@ -56,9 +52,7 @@ use std::time::Duration; fn static_tmtc_pool_main() { let (tm_pool, tc_pool) = create_static_pools(); let shared_tm_pool = SharedTmPool::new(tm_pool); - let shared_tc_pool = SharedTcPool { - pool: Arc::new(RwLock::new(tc_pool)), - }; + let shared_tc_pool = SharedTcPool(Arc::new(RwLock::new(tc_pool))); let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50); let (tm_funnel_tx, tm_funnel_rx) = mpsc::sync_channel(50); let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50); @@ -81,10 +75,7 @@ fn static_tmtc_pool_main() { // This helper structure is used by all telecommand providers which need to send telecommands // to the TC source. - let tc_source = PusTcSourceProviderSharedPool { - shared_pool: shared_tc_pool.clone(), - tc_source: tc_source_tx, - }; + let tc_source = TcSenderSharedPool::new(tc_source_tx, shared_tc_pool.clone()); // Create event handling components // These sender handles are used to send event requests, for example to enable or disable @@ -116,7 +107,7 @@ fn static_tmtc_pool_main() { }; let pus_test_service = create_test_service_static( tm_funnel_tx_sender.clone(), - shared_tc_pool.pool.clone(), + shared_tc_pool.0.clone(), event_handler.clone_event_sender(), pus_test_rx, ); @@ -128,27 +119,27 @@ fn static_tmtc_pool_main() { ); let pus_event_service = create_event_service_static( tm_funnel_tx_sender.clone(), - shared_tc_pool.pool.clone(), + shared_tc_pool.0.clone(), pus_event_rx, event_request_tx, ); let pus_action_service = create_action_service_static( tm_funnel_tx_sender.clone(), - shared_tc_pool.pool.clone(), + shared_tc_pool.0.clone(), pus_action_rx, request_map.clone(), pus_action_reply_rx, ); let pus_hk_service = create_hk_service_static( tm_funnel_tx_sender.clone(), - shared_tc_pool.pool.clone(), + shared_tc_pool.0.clone(), pus_hk_rx, request_map.clone(), pus_hk_reply_rx, ); let pus_mode_service = create_mode_service_static( tm_funnel_tx_sender.clone(), - shared_tc_pool.pool.clone(), + shared_tc_pool.0.clone(), pus_mode_rx, request_map, pus_mode_reply_rx, @@ -162,16 +153,14 @@ fn static_tmtc_pool_main() { pus_mode_service, ); - let ccsds_receiver = CcsdsReceiver { tc_source }; let mut tmtc_task = TcSourceTaskStatic::new( shared_tc_pool.clone(), tc_source_rx, - PusReceiver::new(tm_funnel_tx_sender, pus_router), + PusTcDistributor::new(tm_funnel_tx_sender, pus_router), ); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone()); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, tc_source.clone()) .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, @@ -181,13 +170,12 @@ fn static_tmtc_pool_main() { }, }; - let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver); let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); let sync_tm_tcp_source = SyncTcpTmSource::new(200); let mut tcp_server = TcpTask::new( tcp_server_cfg, sync_tm_tcp_source.clone(), - tcp_ccsds_distributor, + tc_source.clone(), PACKET_ID_VALIDATOR.clone(), ) .expect("tcp server creation failed"); @@ -317,8 +305,6 @@ fn dyn_tmtc_pool_main() { .mode_router_map .insert(MGM_HANDLER_0.raw(), mgm_handler_mode_tx); - let tc_source = PusTcSourceProviderDynamic(tc_source_tx); - // Create event handling components // These sender handles are used to send event requests, for example to enable or disable // certain events. @@ -354,7 +340,7 @@ fn dyn_tmtc_pool_main() { ); let pus_scheduler_service = create_scheduler_service_dynamic( tm_funnel_tx.clone(), - tc_source.0.clone(), + tc_source_tx.clone(), pus_sched_rx, create_sched_tc_pool(), ); @@ -388,16 +374,18 @@ fn dyn_tmtc_pool_main() { pus_mode_service, ); - let ccsds_receiver = CcsdsReceiver { tc_source }; + // let ccsds_receiver = CcsdsDistributor { + //pus_sender: tc_source_tx, + //}; let mut tmtc_task = TcSourceTaskDynamic::new( tc_source_rx, - PusReceiver::new(tm_funnel_tx.clone(), pus_router), + PusTcDistributor::new(tm_funnel_tx.clone(), pus_router), ); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone()); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) + //let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone()); + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, tc_source_tx.clone()) .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, @@ -406,13 +394,13 @@ fn dyn_tmtc_pool_main() { }, }; - let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver); + //let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver); let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); let sync_tm_tcp_source = SyncTcpTmSource::new(200); let mut tcp_server = TcpTask::new( tcp_server_cfg, sync_tm_tcp_source.clone(), - tcp_ccsds_distributor, + tc_source_tx.clone(), PACKET_ID_VALIDATOR.clone(), ) .expect("tcp server creation failed"); diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index 28c645a..16b1f07 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -1,5 +1,5 @@ use crate::requests::GenericRequestRouter; -use crate::tmtc::MpscStoreAndSendError; +use crate::tmtc::StoreAndSendError; use log::warn; use satrs::pus::verification::{ self, FailParams, TcStateAccepted, TcStateStarted, VerificationReporter, @@ -11,7 +11,7 @@ use satrs::pus::{ GenericRoutingError, PusPacketHandlerResult, PusPacketHandlingError, PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory, }; -use satrs::queue::GenericReceiveError; +use satrs::queue::{GenericReceiveError, GenericSendError}; use satrs::request::{Apid, GenericMessage, MessageMetadata}; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::PusServiceId; @@ -53,7 +53,7 @@ pub struct PusTcMpscRouter { pub mode_tc_sender: Sender, } -pub struct PusReceiver { +pub struct PusTcDistributor { pub id: ComponentId, pub tm_sender: TmSender, pub verif_reporter: VerificationReporter, @@ -61,7 +61,7 @@ pub struct PusReceiver { stamp_helper: TimeStampHelper, } -impl PusReceiver { +impl PusTcDistributor { pub fn new(tm_sender: TmSender, pus_router: PusTcMpscRouter) -> Self { Self { id: PUS_ROUTING_SERVICE.raw(), @@ -80,7 +80,7 @@ impl PusReceiver { tc_in_memory: TcInMemory, service: u8, pus_tc: &PusTcReader, - ) -> Result { + ) -> Result { let init_token = self.verif_reporter.add_tc(pus_tc); self.stamp_helper.update_from_now(); let accepted_token = self @@ -90,26 +90,38 @@ impl PusReceiver { let service = PusServiceId::try_from(service); match service { Ok(standard_service) => match standard_service { - PusServiceId::Test => self.pus_router.test_tc_sender.send(EcssTcAndToken { - tc_in_memory, - token: Some(accepted_token.into()), - })?, - PusServiceId::Housekeeping => { - self.pus_router.hk_tc_sender.send(EcssTcAndToken { + PusServiceId::Test => self + .pus_router + .test_tc_sender + .send(EcssTcAndToken { tc_in_memory, token: Some(accepted_token.into()), - })? - } - PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken { - tc_in_memory, - token: Some(accepted_token.into()), - })?, - PusServiceId::Scheduling => { - self.pus_router.sched_tc_sender.send(EcssTcAndToken { + }) + .map_err(|_| GenericSendError::RxDisconnected)?, + PusServiceId::Housekeeping => self + .pus_router + .hk_tc_sender + .send(EcssTcAndToken { tc_in_memory, token: Some(accepted_token.into()), - })? - } + }) + .map_err(|_| GenericSendError::RxDisconnected)?, + PusServiceId::Event => self + .pus_router + .event_tc_sender + .send(EcssTcAndToken { + tc_in_memory, + token: Some(accepted_token.into()), + }) + .map_err(|_| GenericSendError::RxDisconnected)?, + PusServiceId::Scheduling => self + .pus_router + .sched_tc_sender + .send(EcssTcAndToken { + tc_in_memory, + token: Some(accepted_token.into()), + }) + .map_err(|_| GenericSendError::RxDisconnected)?, _ => { let result = self.verif_reporter.start_failure( &self.tm_sender, @@ -128,12 +140,14 @@ impl PusReceiver { Err(e) => { if let Ok(custom_service) = CustomPusServiceId::try_from(e.number) { match custom_service { - CustomPusServiceId::Mode => { - self.pus_router.mode_tc_sender.send(EcssTcAndToken { + CustomPusServiceId::Mode => self + .pus_router + .mode_tc_sender + .send(EcssTcAndToken { tc_in_memory, token: Some(accepted_token.into()), - })? - } + }) + .map_err(|_| GenericSendError::RxDisconnected)?, CustomPusServiceId::Health => {} } } else { diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index a774577..3b73036 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -12,23 +12,22 @@ use satrs::pus::{ EcssTmSenderCore, MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded, PusPacketHandlerResult, PusServiceHelper, PusTmAsVec, PusTmInPool, TmInSharedPoolSender, }; +use satrs::tmtc::tc_helper::TcSenderSharedPool; use satrs_example::config::components::PUS_SCHED_SERVICE; -use crate::tmtc::PusTcSourceProviderSharedPool; - use super::HandlingStatus; pub trait TcReleaser { fn release(&mut self, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool; } -impl TcReleaser for PusTcSourceProviderSharedPool { +impl TcReleaser for TcSenderSharedPool { fn release(&mut self, enabled: bool, _info: &TcInfo, tc: &[u8]) -> bool { if enabled { // Transfer TC from scheduler TC pool to shared TC pool. let released_tc_addr = self .shared_pool - .pool + .0 .write() .expect("locking pool failed") .add(tc) @@ -122,7 +121,7 @@ impl pub fn create_scheduler_service_static( tm_sender: TmInSharedPoolSender>, - tc_releaser: PusTcSourceProviderSharedPool, + tc_releaser: TcSenderSharedPool, pus_sched_rx: mpsc::Receiver, sched_tc_pool: StaticMemoryPool, ) -> SchedulingServiceWrapper { @@ -134,7 +133,7 @@ pub fn create_scheduler_service_static( pus_sched_rx, tm_sender, create_verification_reporter(PUS_SCHED_SERVICE.id(), PUS_SCHED_SERVICE.apid), - EcssTcInSharedStoreConverter::new(tc_releaser.clone_backing_pool(), 2048), + EcssTcInSharedStoreConverter::new(tc_releaser.shared_pool(), 2048), ), scheduler, ); diff --git a/satrs-example/src/tmtc/ccsds.rs b/satrs-example/src/tmtc/ccsds.rs deleted file mode 100644 index 1841d17..0000000 --- a/satrs-example/src/tmtc/ccsds.rs +++ /dev/null @@ -1,53 +0,0 @@ -use satrs::pus::ReceivesEcssPusTc; -use satrs::spacepackets::{CcsdsPacket, SpHeader}; -use satrs::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; -use satrs::ValidatorU16Id; -use satrs_example::config::components::Apid; -use satrs_example::config::APID_VALIDATOR; - -#[derive(Clone)] -pub struct CcsdsReceiver< - TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone, - E, -> { - pub tc_source: TcSource, -} - -impl< - TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone + 'static, - E: 'static, - > ValidatorU16Id for CcsdsReceiver -{ - fn validate(&self, apid: u16) -> bool { - APID_VALIDATOR.contains(&apid) - } -} - -impl< - TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone + 'static, - E: 'static, - > CcsdsPacketHandler for CcsdsReceiver -{ - type Error = E; - - fn handle_packet_with_valid_apid( - &mut self, - sp_header: &SpHeader, - tc_raw: &[u8], - ) -> Result<(), Self::Error> { - if sp_header.apid() == Apid::Cfdp as u16 { - } else { - return self.tc_source.pass_ccsds(sp_header, tc_raw); - } - Ok(()) - } - - fn handle_packet_with_unknown_apid( - &mut self, - sp_header: &SpHeader, - _tc_raw: &[u8], - ) -> Result<(), Self::Error> { - log::warn!("unknown APID 0x{:x?} detected", sp_header.apid()); - Ok(()) - } -} diff --git a/satrs-example/src/tmtc/mod.rs b/satrs-example/src/tmtc/mod.rs index 93d2927..f3ebfbd 100644 --- a/satrs-example/src/tmtc/mod.rs +++ b/satrs-example/src/tmtc/mod.rs @@ -1,215 +1,96 @@ -use log::warn; -use satrs::pus::{ - EcssTcAndToken, MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded, ReceivesEcssPusTc, -}; +use satrs::pus::ReceivesEcssPusTc; +use satrs::queue::GenericSendError; use satrs::spacepackets::SpHeader; -use std::sync::mpsc::{self, Receiver, SendError, Sender, SyncSender, TryRecvError}; +use std::sync::mpsc::{self}; use thiserror::Error; -use crate::pus::PusReceiver; use satrs::pool::{PoolProvider, SharedStaticMemoryPool, StoreAddr, StoreError}; use satrs::spacepackets::ecss::tc::PusTcReader; -use satrs::spacepackets::ecss::PusPacket; -use satrs::tmtc::ReceivesCcsdsTc; +use satrs::tmtc::{ReceivesCcsdsTc, ReceivesTc}; -pub mod ccsds; -pub mod tm_funnel; +pub mod tc_source; +pub mod tm_sink; #[derive(Debug, Clone, PartialEq, Eq, Error)] -pub enum MpscStoreAndSendError { +pub enum StoreAndSendError { #[error("Store error: {0}")] Store(#[from] StoreError), - #[error("TC send error: {0}")] - TcSend(#[from] SendError), - #[error("TMTC send error: {0}")] - TmTcSend(#[from] SendError), + #[error("Genreric send error: {0}")] + Send(#[from] GenericSendError), } #[derive(Clone)] -pub struct SharedTcPool { - pub pool: SharedStaticMemoryPool, -} +pub struct SharedTcPool(pub SharedStaticMemoryPool); impl SharedTcPool { pub fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result { - let mut pg = self.pool.write().expect("error locking TC store"); + let mut pg = self.0.write().expect("error locking TC store"); let addr = pg.free_element(pus_tc.len_packed(), |buf| { buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data()); })?; Ok(addr) } + + pub fn add_ccsds_tc(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result { + self.add_raw_tc(tc_raw) + } + + pub fn add_raw_tc(&mut self, tc_raw: &[u8]) -> Result { + let mut pg = self.0.write().expect("error locking TC store"); + let addr = pg.free_element(tc_raw.len(), |buf| { + buf[0..tc_raw.len()].copy_from_slice(tc_raw); + })?; + Ok(addr) + } } #[derive(Clone)] -pub struct PusTcSourceProviderSharedPool { - pub tc_source: SyncSender, +pub struct TcSenderSharedPool { + pub tc_source: mpsc::SyncSender, pub shared_pool: SharedTcPool, } -impl PusTcSourceProviderSharedPool { +impl TcSenderSharedPool { #[allow(dead_code)] - pub fn clone_backing_pool(&self) -> SharedStaticMemoryPool { - self.shared_pool.pool.clone() + pub fn shared_pool(&self) -> SharedStaticMemoryPool { + self.shared_pool.0.clone() } } -impl ReceivesEcssPusTc for PusTcSourceProviderSharedPool { - type Error = MpscStoreAndSendError; +impl ReceivesTc for TcSenderSharedPool { + type Error = StoreAndSendError; + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + let addr = self.shared_pool.add_raw_tc(tc_raw)?; + self.tc_source.try_send(addr).map_err(|e| match e { + mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), + mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, + })?; + Ok(()) + } +} +impl ReceivesEcssPusTc for TcSenderSharedPool { + type Error = StoreAndSendError; fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { let addr = self.shared_pool.add_pus_tc(pus_tc)?; - self.tc_source.send(addr)?; + self.tc_source.try_send(addr).map_err(|e| match e { + mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), + mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, + })?; Ok(()) } } -impl ReceivesCcsdsTc for PusTcSourceProviderSharedPool { - type Error = MpscStoreAndSendError; +impl ReceivesCcsdsTc for TcSenderSharedPool { + type Error = StoreAndSendError; - fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { - let mut pool = self.shared_pool.pool.write().expect("locking pool failed"); - let addr = pool.add(tc_raw)?; - drop(pool); - self.tc_source.send(addr)?; + fn pass_ccsds(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { + let addr = self.shared_pool.add_ccsds_tc(sp_header, tc_raw)?; + self.tc_source.try_send(addr).map_err(|e| match e { + mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), + mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, + })?; Ok(()) } } - -// Newtype, can not implement necessary traits on MPSC sender directly because of orphan rules. -#[derive(Clone)] -pub struct PusTcSourceProviderDynamic(pub Sender>); - -impl ReceivesEcssPusTc for PusTcSourceProviderDynamic { - type Error = SendError>; - - fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { - self.0.send(pus_tc.raw_data().to_vec())?; - Ok(()) - } -} - -impl ReceivesCcsdsTc for PusTcSourceProviderDynamic { - type Error = mpsc::SendError>; - - fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.0.send(tc_raw.to_vec())?; - Ok(()) - } -} - -// TC source components where static pools are the backing memory of the received telecommands. -pub struct TcSourceTaskStatic { - shared_tc_pool: SharedTcPool, - tc_receiver: Receiver, - tc_buf: [u8; 4096], - pus_receiver: PusReceiver, -} - -impl TcSourceTaskStatic { - pub fn new( - shared_tc_pool: SharedTcPool, - tc_receiver: Receiver, - pus_receiver: PusReceiver, - ) -> Self { - Self { - shared_tc_pool, - tc_receiver, - tc_buf: [0; 4096], - pus_receiver, - } - } - - pub fn periodic_operation(&mut self) { - self.poll_tc(); - } - - pub fn poll_tc(&mut self) -> bool { - match self.tc_receiver.try_recv() { - Ok(addr) => { - let pool = self - .shared_tc_pool - .pool - .read() - .expect("locking tc pool failed"); - pool.read(&addr, &mut self.tc_buf) - .expect("reading pool failed"); - drop(pool); - match PusTcReader::new(&self.tc_buf) { - Ok((pus_tc, _)) => { - self.pus_receiver - .handle_tc_packet( - satrs::pus::TcInMemory::StoreAddr(addr), - pus_tc.service(), - &pus_tc, - ) - .ok(); - true - } - Err(e) => { - warn!("error creating PUS TC from raw data: {e}"); - warn!("raw data: {:x?}", self.tc_buf); - true - } - } - } - Err(e) => match e { - TryRecvError::Empty => false, - TryRecvError::Disconnected => { - warn!("tmtc thread: sender disconnected"); - false - } - }, - } - } -} - -// TC source components where the heap is the backing memory of the received telecommands. -pub struct TcSourceTaskDynamic { - pub tc_receiver: Receiver>, - pus_receiver: PusReceiver, -} - -impl TcSourceTaskDynamic { - pub fn new( - tc_receiver: Receiver>, - pus_receiver: PusReceiver, - ) -> Self { - Self { - tc_receiver, - pus_receiver, - } - } - - pub fn periodic_operation(&mut self) { - self.poll_tc(); - } - - pub fn poll_tc(&mut self) -> bool { - match self.tc_receiver.try_recv() { - Ok(tc) => match PusTcReader::new(&tc) { - Ok((pus_tc, _)) => { - self.pus_receiver - .handle_tc_packet( - satrs::pus::TcInMemory::Vec(tc.clone()), - pus_tc.service(), - &pus_tc, - ) - .ok(); - true - } - Err(e) => { - warn!("error creating PUS TC from raw data: {e}"); - warn!("raw data: {:x?}", tc); - true - } - }, - Err(e) => match e { - TryRecvError::Empty => false, - TryRecvError::Disconnected => { - warn!("tmtc thread: sender disconnected"); - false - } - }, - } - } -} diff --git a/satrs-example/src/tmtc/tc_source.rs b/satrs-example/src/tmtc/tc_source.rs new file mode 100644 index 0000000..f40e559 --- /dev/null +++ b/satrs-example/src/tmtc/tc_source.rs @@ -0,0 +1,129 @@ +use satrs::{pool::PoolProvider, tmtc::tc_helper::SharedTcPool}; +use std::sync::mpsc::{self, TryRecvError}; + +use satrs::{ + pool::StoreAddr, + pus::{MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded}, + spacepackets::ecss::{tc::PusTcReader, PusPacket}, +}; + +use crate::pus::PusTcDistributor; + + +// TC source components where static pools are the backing memory of the received telecommands. +pub struct TcSourceTaskStatic { + shared_tc_pool: SharedTcPool, + tc_receiver: mpsc::Receiver, + tc_buf: [u8; 4096], + pus_receiver: PusTcDistributor, +} + +impl TcSourceTaskStatic { + pub fn new( + shared_tc_pool: SharedTcPool, + tc_receiver: mpsc::Receiver, + pus_receiver: PusTcDistributor, + ) -> Self { + Self { + shared_tc_pool, + tc_receiver, + tc_buf: [0; 4096], + pus_receiver, + } + } + + pub fn periodic_operation(&mut self) { + self.poll_tc(); + } + + pub fn poll_tc(&mut self) -> bool { + match self.tc_receiver.try_recv() { + Ok(addr) => { + let pool = self + .shared_tc_pool + .0 + .read() + .expect("locking tc pool failed"); + pool.read(&addr, &mut self.tc_buf) + .expect("reading pool failed"); + drop(pool); + match PusTcReader::new(&self.tc_buf) { + Ok((pus_tc, _)) => { + self.pus_receiver + .handle_tc_packet( + satrs::pus::TcInMemory::StoreAddr(addr), + pus_tc.service(), + &pus_tc, + ) + .ok(); + true + } + Err(e) => { + log::warn!("error creating PUS TC from raw data: {e}"); + log::warn!("raw data: {:x?}", self.tc_buf); + true + } + } + } + Err(e) => match e { + TryRecvError::Empty => false, + TryRecvError::Disconnected => { + log::warn!("tmtc thread: sender disconnected"); + false + } + }, + } + } +} + +// TC source components where the heap is the backing memory of the received telecommands. +pub struct TcSourceTaskDynamic { + pub tc_receiver: mpsc::Receiver>, + pus_receiver: PusTcDistributor, +} + +impl TcSourceTaskDynamic { + pub fn new( + tc_receiver: mpsc::Receiver>, + pus_receiver: PusTcDistributor, + ) -> Self { + Self { + tc_receiver, + pus_receiver, + } + } + + pub fn periodic_operation(&mut self) { + self.poll_tc(); + } + + pub fn poll_tc(&mut self) -> bool { + // Right now, we only expect PUS packets. + match self.tc_receiver.try_recv() { + Ok(tc) => match PusTcReader::new(&tc) { + Ok((pus_tc, _)) => { + self.pus_receiver + .handle_tc_packet( + satrs::pus::TcInMemory::Vec(tc.clone()), + pus_tc.service(), + &pus_tc, + ) + .ok(); + true + } + Err(e) => { + log::warn!("error creating PUS TC from raw data: {e}"); + log::warn!("raw data: {:x?}", tc); + true + } + }, + Err(e) => match e { + TryRecvError::Empty => false, + TryRecvError::Disconnected => { + log::warn!("tmtc thread: sender disconnected"); + false + } + }, + } + } +} diff --git a/satrs-example/src/tmtc/tm_funnel.rs b/satrs-example/src/tmtc/tm_sink.rs similarity index 100% rename from satrs-example/src/tmtc/tm_funnel.rs rename to satrs-example/src/tmtc/tm_sink.rs diff --git a/satrs/CHANGELOG.md b/satrs/CHANGELOG.md index 508c69b..f1c8a2e 100644 --- a/satrs/CHANGELOG.md +++ b/satrs/CHANGELOG.md @@ -28,6 +28,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## Changed +- Renamed `ReceivesTcCore` to `ReceivesTc`. +- Renamed `TmPacketSourceCore` to `TmPacketSource`. - TCP server generics order. The error generics come last now. - `encoding::ccsds::PacketIdValidator` renamed to `ValidatorU16Id`, which lives in the crate root. It can be used for both CCSDS packet ID and CCSDS APID validation. @@ -76,6 +78,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## Removed - Remove `objects` module. +- Removed CCSDS and PUS distributor modules. Their worth is questionable in an architecture + where routing traits are sufficient and the core logic to demultiplex and distribute packets + is simple enough to be application code. # [v0.2.0-rc.0] 2024-02-21 diff --git a/satrs/src/encoding/ccsds.rs b/satrs/src/encoding/ccsds.rs index 30adccf..f7a2529 100644 --- a/satrs/src/encoding/ccsds.rs +++ b/satrs/src/encoding/ccsds.rs @@ -1,8 +1,8 @@ -use crate::{tmtc::ReceivesTcCore, ValidatorU16Id}; +use crate::{tmtc::ReceivesTc, ValidatorU16Id}; /// This function parses a given buffer for tightly packed CCSDS space packets. It uses the -/// [PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet and then -/// uses the length field of the packet to extract CCSDS packets. +/// [spacepackets::PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet +/// and then uses the length field of the packet to extract CCSDS packets. /// /// This function is also able to deal with broken tail packets at the end as long a the parser /// can read the full 7 bytes which constitue a space packet header plus one byte minimal size. @@ -10,12 +10,12 @@ use crate::{tmtc::ReceivesTcCore, ValidatorU16Id}; /// index for future write operations will be written to the `next_write_idx` argument. /// /// The parser will write all packets which were decoded successfully to the given `tc_receiver` -/// and return the number of packets found. If the [ReceivesTcCore::pass_tc] calls fails, the +/// and return the number of packets found. If the [ReceivesTc::pass_tc] calls fails, the /// error will be returned. pub fn parse_buffer_for_ccsds_space_packets( buf: &mut [u8], packet_id_validator: &(impl ValidatorU16Id + ?Sized), - tc_receiver: &mut (impl ReceivesTcCore + ?Sized), + tc_receiver: &mut (impl ReceivesTc + ?Sized), next_write_idx: &mut usize, ) -> Result { *next_write_idx = 0; diff --git a/satrs/src/encoding/cobs.rs b/satrs/src/encoding/cobs.rs index 6953c3b..9f8d1df 100644 --- a/satrs/src/encoding/cobs.rs +++ b/satrs/src/encoding/cobs.rs @@ -1,4 +1,4 @@ -use crate::tmtc::ReceivesTcCore; +use crate::tmtc::ReceivesTc; use cobs::{decode_in_place, encode, max_encoding_length}; /// This function encodes the given packet with COBS and also wraps the encoded packet with @@ -57,7 +57,7 @@ pub fn encode_packet_with_cobs( /// The parser will write all packets which were decoded successfully to the given `tc_receiver`. pub fn parse_buffer_for_cobs_encoded_packets( buf: &mut [u8], - tc_receiver: &mut dyn ReceivesTcCore, + tc_receiver: &mut (impl ReceivesTc + ?Sized), next_write_idx: &mut usize, ) -> Result { let mut start_index_packet = 0; diff --git a/satrs/src/encoding/mod.rs b/satrs/src/encoding/mod.rs index 94e3dee..71bd169 100644 --- a/satrs/src/encoding/mod.rs +++ b/satrs/src/encoding/mod.rs @@ -8,7 +8,7 @@ pub use crate::encoding::cobs::{encode_packet_with_cobs, parse_buffer_for_cobs_e pub(crate) mod tests { use alloc::{collections::VecDeque, vec::Vec}; - use crate::tmtc::ReceivesTcCore; + use crate::tmtc::ReceivesTc; use super::cobs::encode_packet_with_cobs; @@ -20,7 +20,7 @@ pub(crate) mod tests { pub(crate) tc_queue: VecDeque>, } - impl ReceivesTcCore for TcCacher { + impl ReceivesTc for TcCacher { type Error = (); fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { diff --git a/satrs/src/hal/std/tcp_cobs_server.rs b/satrs/src/hal/std/tcp_cobs_server.rs index 8b044f6..6b334fc 100644 --- a/satrs/src/hal/std/tcp_cobs_server.rs +++ b/satrs/src/hal/std/tcp_cobs_server.rs @@ -35,7 +35,7 @@ impl TcpTcParser for CobsTcParser { ) -> Result<(), TcpTmtcError> { conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets( &mut tc_buffer[..current_write_idx], - tc_receiver.upcast_mut(), + tc_receiver, next_write_idx, ) .map_err(|e| TcpTmtcError::TcError(e))?; diff --git a/satrs/src/hal/std/tcp_server.rs b/satrs/src/hal/std/tcp_server.rs index 9b05f43..e6b91ca 100644 --- a/satrs/src/hal/std/tcp_server.rs +++ b/satrs/src/hal/std/tcp_server.rs @@ -178,8 +178,8 @@ pub struct TcpTmtcGenericServer< pub(crate) tc_buffer: Vec, poll: Poll, events: Events, - tc_handler: TcParser, - tm_handler: TmSender, + pub tc_handler: TcParser, + pub tm_handler: TmSender, stop_signal: Option>, } @@ -424,7 +424,7 @@ pub(crate) mod tests { use alloc::{collections::VecDeque, sync::Arc, vec::Vec}; - use crate::tmtc::{ReceivesTcCore, TmPacketSourceCore}; + use crate::tmtc::{ReceivesTc, TmPacketSource}; use super::*; @@ -432,7 +432,7 @@ pub(crate) mod tests { pub(crate) struct SyncTcCacher { pub(crate) tc_queue: Arc>>>, } - impl ReceivesTcCore for SyncTcCacher { + impl ReceivesTc for SyncTcCacher { type Error = (); fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { @@ -454,7 +454,7 @@ pub(crate) mod tests { } } - impl TmPacketSourceCore for SyncTmSource { + impl TmPacketSource for SyncTmSource { type Error = (); fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { diff --git a/satrs/src/hal/std/tcp_spacepackets_server.rs b/satrs/src/hal/std/tcp_spacepackets_server.rs index 45f5fd8..17415b5 100644 --- a/satrs/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs/src/hal/std/tcp_spacepackets_server.rs @@ -41,7 +41,7 @@ impl TcpTcParser TcpTmSender for SpacepacketsTmSender { /// also serves as the example application for this module. pub struct TcpSpacepacketsServer< TmSource: TmPacketSource, - TcReceiver: ReceivesTc, + TcSender: ReceivesTc, PacketIdChecker: ValidatorU16Id, HandledConnection: HandledConnectionHandler, TmError, - TcError: 'static, + SendError: 'static, > { pub generic_server: TcpTmtcGenericServer< TmSource, - TcReceiver, + TcSender, SpacepacketsTmSender, SpacepacketsTcParser, HandledConnection, TmError, - TcError, + SendError, >, } diff --git a/satrs/src/hal/std/udp_server.rs b/satrs/src/hal/std/udp_server.rs index 8f77c2b..8bcae13 100644 --- a/satrs/src/hal/std/udp_server.rs +++ b/satrs/src/hal/std/udp_server.rs @@ -1,7 +1,7 @@ //! Generic UDP TC server. -use crate::tmtc::{ReceivesTc, ReceivesTcCore}; -use std::boxed::Box; -use std::io::{Error, ErrorKind}; +use crate::tmtc::ReceivesTc; +use core::fmt::Debug; +use std::io::{self, ErrorKind}; use std::net::{SocketAddr, ToSocketAddrs, UdpSocket}; use std::vec; use std::vec::Vec; @@ -11,9 +11,10 @@ use std::vec::Vec; /// /// It caches all received telecomands into a vector. The maximum expected telecommand size should /// be declared upfront. This avoids dynamic allocation during run-time. The user can specify a TC -/// receiver in form of a special trait object which implements [ReceivesTc]. Please note that the +/// sender in form of a special trait object which implements [ReceivesTc]. For example, this can +/// be used to send the telecommands to a centralized TC source. Please note that the /// receiver should copy out the received data if it the data is required past the -/// [ReceivesTcCore::pass_tc] call. +/// [ReceivesTc::pass_tc] call and no message passing is used to process the packet. /// /// # Examples /// @@ -21,13 +22,13 @@ use std::vec::Vec; /// use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; /// use spacepackets::ecss::WritablePusPacket; /// use satrs::hal::std::udp_server::UdpTcServer; -/// use satrs::tmtc::{ReceivesTc, ReceivesTcCore}; +/// use satrs::tmtc::ReceivesTc; /// use spacepackets::SpHeader; /// use spacepackets::ecss::tc::PusTcCreator; /// /// #[derive (Default)] /// struct PingReceiver {} -/// impl ReceivesTcCore for PingReceiver { +/// impl ReceivesTc for PingReceiver { /// type Error = (); /// fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { /// assert_eq!(tc_raw.len(), 13); @@ -38,7 +39,7 @@ use std::vec::Vec; /// let mut buf = [0; 32]; /// let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777); /// let ping_receiver = PingReceiver::default(); -/// let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, Box::new(ping_receiver)) +/// let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, ping_receiver) /// .expect("Creating UDP TMTC server failed"); /// let sph = SpHeader::new_from_apid(0x02); /// let pus_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true); @@ -57,65 +58,42 @@ use std::vec::Vec; /// [example code](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/tmtc.rs#L67) /// on how to use this TC server. It uses the server to receive PUS telecommands on a specific port /// and then forwards them to a generic CCSDS packet receiver. -pub struct UdpTcServer { +pub struct UdpTcServer, SendError> { pub socket: UdpSocket, recv_buf: Vec, sender_addr: Option, - tc_receiver: Box>, + pub tc_sender: TcSender, } -#[derive(Debug)] -pub enum ReceiveResult { +#[derive(Debug, thiserror::Error)] +pub enum ReceiveResult { + #[error("nothing was received")] NothingReceived, - IoError(Error), - ReceiverError(E), + #[error(transparent)] + Io(#[from] io::Error), + #[error(transparent)] + Send(SendError), } -impl From for ReceiveResult { - fn from(e: Error) -> Self { - ReceiveResult::IoError(e) - } -} - -impl PartialEq for ReceiveResult { - fn eq(&self, other: &Self) -> bool { - use ReceiveResult::*; - match (self, other) { - (IoError(ref e), IoError(ref other_e)) => e.kind() == other_e.kind(), - (NothingReceived, NothingReceived) => true, - (ReceiverError(e), ReceiverError(other_e)) => e == other_e, - _ => false, - } - } -} - -impl Eq for ReceiveResult {} - -impl ReceivesTcCore for UdpTcServer { - type Error = E; - - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.tc_receiver.pass_tc(tc_raw) - } -} - -impl UdpTcServer { +impl, SendError: Debug + 'static> + UdpTcServer +{ pub fn new( addr: A, max_recv_size: usize, - tc_receiver: Box>, - ) -> Result { + tc_sender: TcSender, + ) -> Result { let server = Self { socket: UdpSocket::bind(addr)?, recv_buf: vec![0; max_recv_size], sender_addr: None, - tc_receiver, + tc_sender, }; server.socket.set_nonblocking(true)?; Ok(server) } - pub fn try_recv_tc(&mut self) -> Result<(usize, SocketAddr), ReceiveResult> { + pub fn try_recv_tc(&mut self) -> Result<(usize, SocketAddr), ReceiveResult> { let res = match self.socket.recv_from(&mut self.recv_buf) { Ok(res) => res, Err(e) => { @@ -128,9 +106,9 @@ impl UdpTcServer { }; let (num_bytes, from) = res; self.sender_addr = Some(from); - self.tc_receiver + self.tc_sender .pass_tc(&self.recv_buf[0..num_bytes]) - .map_err(|e| ReceiveResult::ReceiverError(e))?; + .map_err(ReceiveResult::Send)?; Ok(res) } @@ -142,11 +120,11 @@ impl UdpTcServer { #[cfg(test)] mod tests { use crate::hal::std::udp_server::{ReceiveResult, UdpTcServer}; - use crate::tmtc::ReceivesTcCore; + use crate::queue::GenericSendError; + use crate::tmtc::ReceivesTc; use spacepackets::ecss::tc::PusTcCreator; use spacepackets::ecss::WritablePusPacket; use spacepackets::SpHeader; - use std::boxed::Box; use std::collections::VecDeque; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::vec::Vec; @@ -158,8 +136,8 @@ mod tests { pub sent_cmds: VecDeque>, } - impl ReceivesTcCore for PingReceiver { - type Error = (); + impl ReceivesTc for PingReceiver { + type Error = GenericSendError; fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { let mut sent_data = Vec::new(); @@ -175,7 +153,7 @@ mod tests { let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777); let ping_receiver = PingReceiver::default(); is_send(&ping_receiver); - let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, Box::new(ping_receiver)) + let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, ping_receiver) .expect("Creating UDP TMTC server failed"); is_send(&udp_tc_server); let sph = SpHeader::new_from_apid(0x02); @@ -195,7 +173,7 @@ mod tests { udp_tc_server.last_sender().expect("No sender set"), local_addr ); - let ping_receiver: &mut PingReceiver = udp_tc_server.tc_receiver.downcast_mut().unwrap(); + let ping_receiver = &mut udp_tc_server.tc_sender; assert_eq!(ping_receiver.sent_cmds.len(), 1); let sent_cmd = ping_receiver.sent_cmds.pop_front().unwrap(); assert_eq!(sent_cmd, buf[0..len]); @@ -205,11 +183,11 @@ mod tests { fn test_nothing_received() { let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7779); let ping_receiver = PingReceiver::default(); - let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, Box::new(ping_receiver)) + let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, ping_receiver) .expect("Creating UDP TMTC server failed"); let res = udp_tc_server.try_recv_tc(); assert!(res.is_err()); let err = res.unwrap_err(); - assert_eq!(err, ReceiveResult::NothingReceived); + matches!(err, ReceiveResult::NothingReceived); } } diff --git a/satrs/src/pus/mod.rs b/satrs/src/pus/mod.rs index 64187e4..fca3b77 100644 --- a/satrs/src/pus/mod.rs +++ b/satrs/src/pus/mod.rs @@ -273,12 +273,8 @@ pub trait EcssTcReceiverCore { fn recv_tc(&self) -> Result; } -/// Generic trait for objects which can receive ECSS PUS telecommands. This trait is -/// implemented by the [crate::tmtc::pus_distrib::PusDistributor] objects to allow passing PUS TC -/// packets into it. It is generally assumed that the telecommand is stored in some pool structure, -/// and the store address is passed as well. This allows efficient zero-copy forwarding of -/// telecommands. -pub trait ReceivesEcssPusTc { +/// Generic trait for objects which can receive ECSS PUS telecommands. +pub trait ReceivesEcssPusTc: Send { type Error; fn pass_pus_tc(&mut self, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error>; } diff --git a/satrs/src/tmtc/ccsds_distrib.rs b/satrs/src/tmtc/ccsds_distrib.rs deleted file mode 100644 index 607b461..0000000 --- a/satrs/src/tmtc/ccsds_distrib.rs +++ /dev/null @@ -1,403 +0,0 @@ -//! CCSDS packet routing components. -//! -//! The routing components consist of two core components: -//! 1. [CcsdsDistributor] component which dispatches received packets to a user-provided handler -//! 2. [CcsdsPacketHandler] trait which should be implemented by the user-provided packet handler. -//! -//! The [CcsdsDistributor] implements the [ReceivesCcsdsTc] and [ReceivesTcCore] trait which allows to -//! pass raw or CCSDS packets to it. Upon receiving a packet, it performs the following steps: -//! -//! 1. It tries to identify the target Application Process Identifier (APID) based on the -//! respective CCSDS space packet header field. If that process fails, a [ByteConversionError] is -//! returned to the user -//! 2. If a valid APID is found and matches one of the APIDs provided by -//! [CcsdsPacketHandler::valid_apids], it will pass the packet to the user provided -//! [CcsdsPacketHandler::handle_known_apid] function. If no valid APID is found, the packet -//! will be passed to the [CcsdsPacketHandler::handle_unknown_apid] function. -//! -//! # Example -//! -//! ```rust -//! use satrs::ValidatorU16Id; -//! use satrs::tmtc::ccsds_distrib::{CcsdsPacketHandler, CcsdsDistributor}; -//! use satrs::tmtc::{ReceivesTc, ReceivesTcCore}; -//! use spacepackets::{CcsdsPacket, SpHeader}; -//! use spacepackets::ecss::WritablePusPacket; -//! use spacepackets::ecss::tc::PusTcCreator; -//! -//! #[derive (Default)] -//! struct ConcreteApidHandler { -//! known_call_count: u32, -//! unknown_call_count: u32 -//! } -//! -//! impl ConcreteApidHandler { -//! fn mutable_foo(&mut self) {} -//! } -//! -//! impl ValidatorU16Id for ConcreteApidHandler { -//! fn validate(&self, apid: u16) -> bool { apid == 0x0002 } -//! } -//! -//! impl CcsdsPacketHandler for ConcreteApidHandler { -//! type Error = (); -//! fn handle_packet_with_valid_apid(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { -//! assert_eq!(sp_header.apid(), 0x002); -//! assert_eq!(tc_raw.len(), 13); -//! self.known_call_count += 1; -//! Ok(()) -//! } -//! fn handle_packet_with_unknown_apid(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { -//! assert_eq!(sp_header.apid(), 0x003); -//! assert_eq!(tc_raw.len(), 13); -//! self.unknown_call_count += 1; -//! Ok(()) -//! } -//! } -//! -//! let apid_handler = ConcreteApidHandler::default(); -//! let mut ccsds_distributor = CcsdsDistributor::new(apid_handler); -//! -//! // Create and pass PUS telecommand with a valid APID -//! let sp_header = SpHeader::new_for_unseg_tc(0x002, 0x34, 0); -//! let mut pus_tc = PusTcCreator::new_simple(sp_header, 17, 1, &[], true); -//! let mut test_buf: [u8; 32] = [0; 32]; -//! let mut size = pus_tc -//! .write_to_bytes(test_buf.as_mut_slice()) -//! .expect("Error writing TC to buffer"); -//! let tc_slice = &test_buf[0..size]; -//! ccsds_distributor.pass_tc(&tc_slice).expect("Passing TC slice failed"); -//! -//! // Now pass a packet with an unknown APID to the distributor -//! pus_tc.set_apid(0x003); -//! size = pus_tc -//! .write_to_bytes(test_buf.as_mut_slice()) -//! .expect("Error writing TC to buffer"); -//! let tc_slice = &test_buf[0..size]; -//! ccsds_distributor.pass_tc(&tc_slice).expect("Passing TC slice failed"); -//! -//! // Retrieve the APID handler. -//! let handler_ref = ccsds_distributor.packet_handler(); -//! assert_eq!(handler_ref.known_call_count, 1); -//! assert_eq!(handler_ref.unknown_call_count, 1); -//! -//! // Mutable access to the handler. -//! let mutable_handler_ref = ccsds_distributor.packet_handler_mut(); -//! mutable_handler_ref.mutable_foo(); -//! ``` -use crate::{ - tmtc::{ReceivesCcsdsTc, ReceivesTcCore}, - ValidatorU16Id, -}; -use core::fmt::{Display, Formatter}; -use spacepackets::{ByteConversionError, CcsdsPacket, SpHeader}; -#[cfg(feature = "std")] -use std::error::Error; - -/// Generic trait for a handler or dispatcher object handling CCSDS packets. -/// -/// Users should implement this trait on their custom CCSDS packet handler and then pass a boxed -/// instance of this handler to the [CcsdsDistributor]. The distributor will use the trait -/// interface to dispatch received packets to the user based on the Application Process Identifier -/// (APID) field of the CCSDS packet. The APID will be checked using the generic [ValidatorU16Id] -/// trait. -pub trait CcsdsPacketHandler: ValidatorU16Id { - type Error; - - fn handle_packet_with_valid_apid( - &mut self, - sp_header: &SpHeader, - tc_raw: &[u8], - ) -> Result<(), Self::Error>; - - fn handle_packet_with_unknown_apid( - &mut self, - sp_header: &SpHeader, - tc_raw: &[u8], - ) -> Result<(), Self::Error>; -} - -/// The CCSDS distributor dispatches received CCSDS packets to a user provided packet handler. -pub struct CcsdsDistributor, E> { - /// User provided APID handler stored as a generic trait object. - /// It can be cast back to the original concrete type using [Self::packet_handler] or - /// the [Self::packet_handler_mut] method. - packet_handler: PacketHandler, -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum CcsdsError { - CustomError(E), - ByteConversionError(ByteConversionError), -} - -impl Display for CcsdsError { - fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { - match self { - Self::CustomError(e) => write!(f, "{e}"), - Self::ByteConversionError(e) => write!(f, "{e}"), - } - } -} - -#[cfg(feature = "std")] -impl Error for CcsdsError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - match self { - Self::CustomError(e) => e.source(), - Self::ByteConversionError(e) => e.source(), - } - } -} - -impl, E: 'static> ReceivesCcsdsTc - for CcsdsDistributor -{ - type Error = CcsdsError; - - fn pass_ccsds(&mut self, header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.dispatch_ccsds(header, tc_raw) - } -} - -impl, E: 'static> ReceivesTcCore - for CcsdsDistributor -{ - type Error = CcsdsError; - - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { - if tc_raw.len() < 7 { - return Err(CcsdsError::ByteConversionError( - ByteConversionError::FromSliceTooSmall { - found: tc_raw.len(), - expected: 7, - }, - )); - } - let (sp_header, _) = - SpHeader::from_be_bytes(tc_raw).map_err(|e| CcsdsError::ByteConversionError(e))?; - self.dispatch_ccsds(&sp_header, tc_raw) - } -} - -impl, E: 'static> CcsdsDistributor { - pub fn new(packet_handler: PacketHandler) -> Self { - CcsdsDistributor { packet_handler } - } - - pub fn packet_handler(&self) -> &PacketHandler { - &self.packet_handler - } - - pub fn packet_handler_mut(&mut self) -> &mut PacketHandler { - &mut self.packet_handler - } - - fn dispatch_ccsds(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), CcsdsError> { - let valid_apid = self.packet_handler().validate(sp_header.apid()); - if valid_apid { - self.packet_handler - .handle_packet_with_valid_apid(sp_header, tc_raw) - .map_err(|e| CcsdsError::CustomError(e))?; - return Ok(()); - } - self.packet_handler - .handle_packet_with_unknown_apid(sp_header, tc_raw) - .map_err(|e| CcsdsError::CustomError(e)) - } -} - -#[cfg(test)] -pub(crate) mod tests { - use super::*; - use crate::tmtc::ccsds_distrib::{CcsdsDistributor, CcsdsPacketHandler}; - use spacepackets::ecss::tc::PusTcCreator; - use spacepackets::ecss::WritablePusPacket; - use spacepackets::CcsdsPacket; - use std::collections::VecDeque; - use std::sync::{Arc, Mutex}; - use std::vec::Vec; - - fn is_send(_: &T) {} - - pub fn generate_ping_tc(buf: &mut [u8]) -> &[u8] { - let sph = SpHeader::new_for_unseg_tc(0x002, 0x34, 0); - let pus_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true); - let size = pus_tc - .write_to_bytes(buf) - .expect("Error writing TC to buffer"); - assert_eq!(size, 13); - &buf[0..size] - } - - pub fn generate_ping_tc_as_vec() -> Vec { - let sph = SpHeader::new_for_unseg_tc(0x002, 0x34, 0); - PusTcCreator::new_simple(sph, 17, 1, &[], true) - .to_vec() - .unwrap() - } - - type SharedPacketQueue = Arc)>>>; - pub struct BasicApidHandlerSharedQueue { - pub known_packet_queue: SharedPacketQueue, - pub unknown_packet_queue: SharedPacketQueue, - } - - #[derive(Default)] - pub struct BasicApidHandlerOwnedQueue { - pub known_packet_queue: VecDeque<(u16, Vec)>, - pub unknown_packet_queue: VecDeque<(u16, Vec)>, - } - - impl ValidatorU16Id for BasicApidHandlerSharedQueue { - fn validate(&self, packet_id: u16) -> bool { - [0x000, 0x002].contains(&packet_id) - } - } - - impl CcsdsPacketHandler for BasicApidHandlerSharedQueue { - type Error = (); - - fn handle_packet_with_valid_apid( - &mut self, - sp_header: &SpHeader, - tc_raw: &[u8], - ) -> Result<(), Self::Error> { - let mut vec = Vec::new(); - vec.extend_from_slice(tc_raw); - self.known_packet_queue - .lock() - .unwrap() - .push_back((sp_header.apid(), vec)); - Ok(()) - } - - fn handle_packet_with_unknown_apid( - &mut self, - sp_header: &SpHeader, - tc_raw: &[u8], - ) -> Result<(), Self::Error> { - let mut vec = Vec::new(); - vec.extend_from_slice(tc_raw); - self.unknown_packet_queue - .lock() - .unwrap() - .push_back((sp_header.apid(), vec)); - Ok(()) - } - } - - impl ValidatorU16Id for BasicApidHandlerOwnedQueue { - fn validate(&self, packet_id: u16) -> bool { - [0x000, 0x002].contains(&packet_id) - } - } - - impl CcsdsPacketHandler for BasicApidHandlerOwnedQueue { - type Error = (); - - fn handle_packet_with_valid_apid( - &mut self, - sp_header: &SpHeader, - tc_raw: &[u8], - ) -> Result<(), Self::Error> { - let mut vec = Vec::new(); - vec.extend_from_slice(tc_raw); - self.known_packet_queue.push_back((sp_header.apid(), vec)); - Ok(()) - } - - fn handle_packet_with_unknown_apid( - &mut self, - sp_header: &SpHeader, - tc_raw: &[u8], - ) -> Result<(), Self::Error> { - let mut vec = Vec::new(); - vec.extend_from_slice(tc_raw); - self.unknown_packet_queue.push_back((sp_header.apid(), vec)); - Ok(()) - } - } - - #[test] - fn test_distribs_known_apid() { - let known_packet_queue = Arc::new(Mutex::default()); - let unknown_packet_queue = Arc::new(Mutex::default()); - let apid_handler = BasicApidHandlerSharedQueue { - known_packet_queue: known_packet_queue.clone(), - unknown_packet_queue: unknown_packet_queue.clone(), - }; - let mut ccsds_distrib = CcsdsDistributor::new(apid_handler); - is_send(&ccsds_distrib); - let mut test_buf: [u8; 32] = [0; 32]; - let tc_slice = generate_ping_tc(test_buf.as_mut_slice()); - - ccsds_distrib.pass_tc(tc_slice).expect("Passing TC failed"); - let recvd = known_packet_queue.lock().unwrap().pop_front(); - assert!(unknown_packet_queue.lock().unwrap().is_empty()); - assert!(recvd.is_some()); - let (apid, packet) = recvd.unwrap(); - assert_eq!(apid, 0x002); - assert_eq!(packet, tc_slice); - } - - #[test] - fn test_unknown_apid_handling() { - let apid_handler = BasicApidHandlerOwnedQueue::default(); - let mut ccsds_distrib = CcsdsDistributor::new(apid_handler); - let sph = SpHeader::new_for_unseg_tc(0x004, 0x34, 0); - let pus_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true); - let mut test_buf: [u8; 32] = [0; 32]; - pus_tc - .write_to_bytes(test_buf.as_mut_slice()) - .expect("Error writing TC to buffer"); - ccsds_distrib.pass_tc(&test_buf).expect("Passing TC failed"); - assert!(ccsds_distrib.packet_handler().known_packet_queue.is_empty()); - let apid_handler = ccsds_distrib.packet_handler_mut(); - let recvd = apid_handler.unknown_packet_queue.pop_front(); - assert!(recvd.is_some()); - let (apid, packet) = recvd.unwrap(); - assert_eq!(apid, 0x004); - assert_eq!(packet.as_slice(), test_buf); - } - - #[test] - fn test_ccsds_distribution() { - let mut ccsds_distrib = CcsdsDistributor::new(BasicApidHandlerOwnedQueue::default()); - let sph = SpHeader::new_for_unseg_tc(0x002, 0x34, 0); - let pus_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true); - let tc_vec = pus_tc.to_vec().unwrap(); - ccsds_distrib - .pass_ccsds(&sph, &tc_vec) - .expect("passing CCSDS TC failed"); - let recvd = ccsds_distrib - .packet_handler_mut() - .known_packet_queue - .pop_front(); - assert!(recvd.is_some()); - let recvd = recvd.unwrap(); - assert_eq!(recvd.0, 0x002); - assert_eq!(recvd.1, tc_vec); - } - - #[test] - fn test_distribution_short_packet_fails() { - let mut ccsds_distrib = CcsdsDistributor::new(BasicApidHandlerOwnedQueue::default()); - let sph = SpHeader::new_for_unseg_tc(0x002, 0x34, 0); - let pus_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true); - let tc_vec = pus_tc.to_vec().unwrap(); - let result = ccsds_distrib.pass_tc(&tc_vec[0..6]); - assert!(result.is_err()); - let error = result.unwrap_err(); - if let CcsdsError::ByteConversionError(ByteConversionError::FromSliceTooSmall { - found, - expected, - }) = error - { - assert_eq!(found, 6); - assert_eq!(expected, 7); - } else { - panic!("Unexpected error variant"); - } - } -} diff --git a/satrs/src/tmtc/mod.rs b/satrs/src/tmtc/mod.rs index d4f3333..762aad0 100644 --- a/satrs/src/tmtc/mod.rs +++ b/satrs/src/tmtc/mod.rs @@ -5,111 +5,148 @@ //! directly dispatch received packets to packet listeners based on packet fields like the CCSDS //! Application Process ID (APID) or the ECSS PUS service type. This allows for fast packet //! routing without the overhead and complication of using message queues. However, it also requires +use std::sync::mpsc; + #[cfg(feature = "alloc")] use downcast_rs::{impl_downcast, Downcast}; use spacepackets::SpHeader; -#[cfg(feature = "alloc")] -pub mod ccsds_distrib; -#[cfg(feature = "alloc")] -pub mod pus_distrib; +#[cfg(feature = "std")] +pub mod tc_helper; pub mod tm_helper; -#[cfg(feature = "alloc")] -pub use ccsds_distrib::{CcsdsDistributor, CcsdsError, CcsdsPacketHandler}; -#[cfg(feature = "alloc")] -pub use pus_distrib::{PusDistributor, PusServiceDistributor}; +use crate::queue::GenericSendError; /// Generic trait for object which can receive any telecommands in form of a raw bytestream, with /// no assumptions about the received protocol. /// -/// This trait is implemented by both the [crate::tmtc::pus_distrib::PusDistributor] and the -/// [crate::tmtc::ccsds_distrib::CcsdsDistributor] which allows to pass the respective packets in -/// raw byte format into them. -pub trait ReceivesTcCore { +/// This trait can also be implemented for sender components which forward the packet. +/// It is implemented for common types like [mpsc::Sender] and [mpsc::SyncSender]. +pub trait ReceivesTc: Send { type Error; fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error>; } -/// Extension trait of [ReceivesTcCore] which allows downcasting by implementing [Downcast] and -/// is also sendable. +#[cfg(feature = "std")] +impl ReceivesTc for mpsc::Sender> { + type Error = GenericSendError; + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + self.send(tc_raw.to_vec()) + .map_err(|_| GenericSendError::RxDisconnected) + } +} + +#[cfg(feature = "std")] +impl ReceivesTc for mpsc::SyncSender> { + type Error = GenericSendError; + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + self.try_send(tc_raw.to_vec()).map_err(|e| match e { + mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), + mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, + }) + } +} + +/// Extension trait of [ReceivesTc] which allows downcasting by implementing [Downcast]. #[cfg(feature = "alloc")] -pub trait ReceivesTc: ReceivesTcCore + Downcast + Send { +pub trait ReceivesTcExt: ReceivesTc + Downcast { // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast(&self) -> &dyn ReceivesTcCore; + fn upcast(&self) -> &dyn ReceivesTc; // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast_mut(&mut self) -> &mut dyn ReceivesTcCore; + fn upcast_mut(&mut self) -> &mut dyn ReceivesTc; } /// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature /// is enabled. #[cfg(feature = "alloc")] -impl ReceivesTc for T +impl ReceivesTcExt for T where - T: ReceivesTcCore + Send + 'static, + T: ReceivesTc + Send + 'static, { // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast(&self) -> &dyn ReceivesTcCore { + fn upcast(&self) -> &dyn ReceivesTc { self } // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast_mut(&mut self) -> &mut dyn ReceivesTcCore { + fn upcast_mut(&mut self) -> &mut dyn ReceivesTc { self } } #[cfg(feature = "alloc")] -impl_downcast!(ReceivesTc assoc Error); +impl_downcast!(ReceivesTcExt assoc Error); /// Generic trait for object which can receive CCSDS space packets, for example ECSS PUS packets /// for CCSDS File Delivery Protocol (CFDP) packets. /// -/// This trait is implemented by both the [crate::tmtc::pus_distrib::PusDistributor] and the -/// [crate::tmtc::ccsds_distrib::CcsdsDistributor] which allows -/// to pass the respective packets in raw byte format or in CCSDS format into them. -pub trait ReceivesCcsdsTc { +/// This trait can also be implemented for sender components which forward the packet. +/// It is implemented for common types like [mpsc::Sender] and [mpsc::SyncSender]. +pub trait ReceivesCcsdsTc: Send { type Error; fn pass_ccsds(&mut self, header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error>; } +#[cfg(feature = "std")] +impl ReceivesCcsdsTc for mpsc::Sender> { + type Error = GenericSendError; + + fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { + self.send(tc_raw.to_vec()) + .map_err(|_| GenericSendError::RxDisconnected) + } +} + +#[cfg(feature = "std")] +impl ReceivesCcsdsTc for mpsc::SyncSender> { + type Error = GenericSendError; + + fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { + self.try_send(tc_raw.to_vec()).map_err(|e| match e { + mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), + mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, + }) + } +} + /// Generic trait for a TM packet source, with no restrictions on the type of TM. /// Implementors write the telemetry into the provided buffer and return the size of the telemetry. -pub trait TmPacketSourceCore { +pub trait TmPacketSource: Send { type Error; fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result; } -/// Extension trait of [TmPacketSourceCore] which allows downcasting by implementing [Downcast] and -/// is also sendable. +/// Extension trait of [TmPacketSource] which allows downcasting by implementing [Downcast]. #[cfg(feature = "alloc")] -pub trait TmPacketSource: TmPacketSourceCore + Downcast + Send { +pub trait TmPacketSourceExt: TmPacketSource + Downcast { // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast(&self) -> &dyn TmPacketSourceCore; + fn upcast(&self) -> &dyn TmPacketSource; // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast_mut(&mut self) -> &mut dyn TmPacketSourceCore; + fn upcast_mut(&mut self) -> &mut dyn TmPacketSource; } /// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature /// is enabled. #[cfg(feature = "alloc")] -impl TmPacketSource for T +impl TmPacketSourceExt for T where - T: TmPacketSourceCore + Send + 'static, + T: TmPacketSource + 'static, { // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast(&self) -> &dyn TmPacketSourceCore { + fn upcast(&self) -> &dyn TmPacketSource { self } // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast_mut(&mut self) -> &mut dyn TmPacketSourceCore { + fn upcast_mut(&mut self) -> &mut dyn TmPacketSource { self } } diff --git a/satrs/src/tmtc/pus_distrib.rs b/satrs/src/tmtc/pus_distrib.rs deleted file mode 100644 index 53056bc..0000000 --- a/satrs/src/tmtc/pus_distrib.rs +++ /dev/null @@ -1,414 +0,0 @@ -//! ECSS PUS packet routing components. -//! -//! The routing components consist of two core components: -//! 1. [PusDistributor] component which dispatches received packets to a user-provided handler. -//! 2. [PusServiceDistributor] trait which should be implemented by the user-provided PUS packet -//! handler. -//! -//! The [PusDistributor] implements the [ReceivesEcssPusTc], [ReceivesCcsdsTc] and the -//! [ReceivesTcCore] trait which allows to pass raw packets, CCSDS packets and PUS TC packets into -//! it. Upon receiving a packet, it performs the following steps: -//! -//! 1. It tries to extract the [SpHeader] and [spacepackets::ecss::tc::PusTcReader] objects from -//! the raw bytestream. If this process fails, a [PusDistribError::PusError] is returned to the -//! user. -//! 2. If it was possible to extract both components, the packet will be passed to the -//! [PusServiceDistributor::distribute_packet] method provided by the user. -//! -//! # Example -//! -//! ```rust -//! use spacepackets::ecss::WritablePusPacket; -//! use satrs::tmtc::pus_distrib::{PusDistributor, PusServiceDistributor}; -//! use satrs::tmtc::{ReceivesTc, ReceivesTcCore}; -//! use spacepackets::SpHeader; -//! use spacepackets::ecss::tc::{PusTcCreator, PusTcReader}; -//! -//! struct ConcretePusHandler { -//! handler_call_count: u32 -//! } -//! -//! // This is a very simple possible service provider. It increments an internal call count field, -//! // which is used to verify the handler was called -//! impl PusServiceDistributor for ConcretePusHandler { -//! type Error = (); -//! fn distribute_packet(&mut self, service: u8, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { -//! assert_eq!(service, 17); -//! assert_eq!(pus_tc.len_packed(), 13); -//! self.handler_call_count += 1; -//! Ok(()) -//! } -//! } -//! -//! let service_handler = ConcretePusHandler { -//! handler_call_count: 0 -//! }; -//! let mut pus_distributor = PusDistributor::new(service_handler); -//! -//! // Create and pass PUS ping telecommand with a valid APID -//! let sp_header = SpHeader::new_for_unseg_tc(0x002, 0x34, 0); -//! let mut pus_tc = PusTcCreator::new_simple(sp_header, 17, 1, &[], true); -//! let mut test_buf: [u8; 32] = [0; 32]; -//! let mut size = pus_tc -//! .write_to_bytes(test_buf.as_mut_slice()) -//! .expect("Error writing TC to buffer"); -//! let tc_slice = &test_buf[0..size]; -//! -//! pus_distributor.pass_tc(tc_slice).expect("Passing PUS telecommand failed"); -//! -//! // User helper function to retrieve concrete class. We check the call count here to verify -//! // that the PUS ping telecommand was routed successfully. -//! let concrete_handler = pus_distributor.service_distributor(); -//! assert_eq!(concrete_handler.handler_call_count, 1); -//! ``` -use crate::pus::ReceivesEcssPusTc; -use crate::tmtc::{ReceivesCcsdsTc, ReceivesTcCore}; -use core::fmt::{Display, Formatter}; -use spacepackets::ecss::tc::PusTcReader; -use spacepackets::ecss::{PusError, PusPacket}; -use spacepackets::SpHeader; -#[cfg(feature = "std")] -use std::error::Error; - -/// Trait for a generic distributor object which can distribute PUS packets based on packet -/// properties like the PUS service, space packet header or any other content of the PUS packet. -pub trait PusServiceDistributor { - type Error; - fn distribute_packet( - &mut self, - service: u8, - header: &SpHeader, - pus_tc: &PusTcReader, - ) -> Result<(), Self::Error>; -} - -/// Generic distributor object which dispatches received packets to a user provided handler. -pub struct PusDistributor, E> { - service_distributor: ServiceDistributor, -} - -impl, E> - PusDistributor -{ - pub fn new(service_provider: ServiceDistributor) -> Self { - PusDistributor { - service_distributor: service_provider, - } - } -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum PusDistribError { - CustomError(E), - PusError(PusError), -} - -impl Display for PusDistribError { - fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { - match self { - PusDistribError::CustomError(e) => write!(f, "pus distribution error: {e}"), - PusDistribError::PusError(e) => write!(f, "pus distribution error: {e}"), - } - } -} - -#[cfg(feature = "std")] -impl Error for PusDistribError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - match self { - Self::CustomError(e) => e.source(), - Self::PusError(e) => e.source(), - } - } -} - -impl, E: 'static> ReceivesTcCore - for PusDistributor -{ - type Error = PusDistribError; - fn pass_tc(&mut self, tm_raw: &[u8]) -> Result<(), Self::Error> { - // Convert to ccsds and call pass_ccsds - let (sp_header, _) = SpHeader::from_be_bytes(tm_raw) - .map_err(|e| PusDistribError::PusError(PusError::ByteConversion(e)))?; - self.pass_ccsds(&sp_header, tm_raw) - } -} - -impl, E: 'static> ReceivesCcsdsTc - for PusDistributor -{ - type Error = PusDistribError; - fn pass_ccsds(&mut self, header: &SpHeader, tm_raw: &[u8]) -> Result<(), Self::Error> { - let (tc, _) = PusTcReader::new(tm_raw).map_err(|e| PusDistribError::PusError(e))?; - self.pass_pus_tc(header, &tc) - } -} - -impl, E: 'static> ReceivesEcssPusTc - for PusDistributor -{ - type Error = PusDistribError; - fn pass_pus_tc(&mut self, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { - self.service_distributor - .distribute_packet(pus_tc.service(), header, pus_tc) - .map_err(|e| PusDistribError::CustomError(e)) - } -} - -impl, E: 'static> - PusDistributor -{ - pub fn service_distributor(&self) -> &ServiceDistributor { - &self.service_distributor - } - - pub fn service_distributor_mut(&mut self) -> &mut ServiceDistributor { - &mut self.service_distributor - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::queue::GenericSendError; - use crate::tmtc::ccsds_distrib::tests::{ - generate_ping_tc, generate_ping_tc_as_vec, BasicApidHandlerOwnedQueue, - BasicApidHandlerSharedQueue, - }; - use crate::tmtc::ccsds_distrib::{CcsdsDistributor, CcsdsPacketHandler}; - use crate::ValidatorU16Id; - use alloc::format; - use alloc::vec::Vec; - use spacepackets::ecss::PusError; - use spacepackets::CcsdsPacket; - #[cfg(feature = "std")] - use std::collections::VecDeque; - #[cfg(feature = "std")] - use std::sync::{Arc, Mutex}; - - fn is_send(_: &T) {} - - pub struct PacketInfo { - pub service: u8, - pub apid: u16, - pub packet: Vec, - } - - struct PusHandlerSharedQueue(Arc>>); - - #[derive(Default)] - struct PusHandlerOwnedQueue(VecDeque); - - impl PusServiceDistributor for PusHandlerSharedQueue { - type Error = PusError; - fn distribute_packet( - &mut self, - service: u8, - sp_header: &SpHeader, - pus_tc: &PusTcReader, - ) -> Result<(), Self::Error> { - let mut packet: Vec = Vec::new(); - packet.extend_from_slice(pus_tc.raw_data()); - self.0 - .lock() - .expect("Mutex lock failed") - .push_back(PacketInfo { - service, - apid: sp_header.apid(), - packet, - }); - Ok(()) - } - } - - impl PusServiceDistributor for PusHandlerOwnedQueue { - type Error = PusError; - fn distribute_packet( - &mut self, - service: u8, - sp_header: &SpHeader, - pus_tc: &PusTcReader, - ) -> Result<(), Self::Error> { - let mut packet: Vec = Vec::new(); - packet.extend_from_slice(pus_tc.raw_data()); - self.0.push_back(PacketInfo { - service, - apid: sp_header.apid(), - packet, - }); - Ok(()) - } - } - - struct ApidHandlerShared { - pub pus_distrib: PusDistributor, - pub handler_base: BasicApidHandlerSharedQueue, - } - - struct ApidHandlerOwned { - pub pus_distrib: PusDistributor, - handler_base: BasicApidHandlerOwnedQueue, - } - - macro_rules! apid_handler_impl { - () => { - type Error = PusError; - - fn handle_packet_with_valid_apid( - &mut self, - sp_header: &SpHeader, - tc_raw: &[u8], - ) -> Result<(), Self::Error> { - self.handler_base - .handle_packet_with_valid_apid(&sp_header, tc_raw) - .ok() - .expect("Unexpected error"); - match self.pus_distrib.pass_ccsds(&sp_header, tc_raw) { - Ok(_) => Ok(()), - Err(e) => match e { - PusDistribError::CustomError(_) => Ok(()), - PusDistribError::PusError(e) => Err(e), - }, - } - } - - fn handle_packet_with_unknown_apid( - &mut self, - sp_header: &SpHeader, - tc_raw: &[u8], - ) -> Result<(), Self::Error> { - self.handler_base - .handle_packet_with_unknown_apid(&sp_header, tc_raw) - .ok() - .expect("Unexpected error"); - Ok(()) - } - }; - } - - impl ValidatorU16Id for ApidHandlerOwned { - fn validate(&self, packet_id: u16) -> bool { - [0x000, 0x002].contains(&packet_id) - } - } - - impl ValidatorU16Id for ApidHandlerShared { - fn validate(&self, packet_id: u16) -> bool { - [0x000, 0x002].contains(&packet_id) - } - } - - impl CcsdsPacketHandler for ApidHandlerOwned { - apid_handler_impl!(); - } - - impl CcsdsPacketHandler for ApidHandlerShared { - apid_handler_impl!(); - } - - #[test] - fn test_pus_distribution_as_raw_packet() { - let mut pus_distrib = PusDistributor::new(PusHandlerOwnedQueue::default()); - let tc = generate_ping_tc_as_vec(); - let result = pus_distrib.pass_tc(&tc); - assert!(result.is_ok()); - assert_eq!(pus_distrib.service_distributor_mut().0.len(), 1); - let packet_info = pus_distrib.service_distributor_mut().0.pop_front().unwrap(); - assert_eq!(packet_info.service, 17); - assert_eq!(packet_info.apid, 0x002); - assert_eq!(packet_info.packet, tc); - } - - #[test] - fn test_pus_distribution_combined_handler() { - let known_packet_queue = Arc::new(Mutex::default()); - let unknown_packet_queue = Arc::new(Mutex::default()); - let pus_queue = Arc::new(Mutex::default()); - let pus_handler = PusHandlerSharedQueue(pus_queue.clone()); - let handler_base = BasicApidHandlerSharedQueue { - known_packet_queue: known_packet_queue.clone(), - unknown_packet_queue: unknown_packet_queue.clone(), - }; - - let pus_distrib = PusDistributor::new(pus_handler); - is_send(&pus_distrib); - let apid_handler = ApidHandlerShared { - pus_distrib, - handler_base, - }; - let mut ccsds_distrib = CcsdsDistributor::new(apid_handler); - let mut test_buf: [u8; 32] = [0; 32]; - let tc_slice = generate_ping_tc(test_buf.as_mut_slice()); - - // Pass packet to distributor - ccsds_distrib - .pass_tc(tc_slice) - .expect("Passing TC slice failed"); - let recvd_ccsds = known_packet_queue.lock().unwrap().pop_front(); - assert!(unknown_packet_queue.lock().unwrap().is_empty()); - assert!(recvd_ccsds.is_some()); - let (apid, packet) = recvd_ccsds.unwrap(); - assert_eq!(apid, 0x002); - assert_eq!(packet.as_slice(), tc_slice); - let recvd_pus = pus_queue.lock().unwrap().pop_front(); - assert!(recvd_pus.is_some()); - let packet_info = recvd_pus.unwrap(); - assert_eq!(packet_info.service, 17); - assert_eq!(packet_info.apid, 0x002); - assert_eq!(packet_info.packet, tc_slice); - } - - #[test] - fn test_accessing_combined_distributor() { - let pus_handler = PusHandlerOwnedQueue::default(); - let handler_base = BasicApidHandlerOwnedQueue::default(); - let pus_distrib = PusDistributor::new(pus_handler); - - let apid_handler = ApidHandlerOwned { - pus_distrib, - handler_base, - }; - let mut ccsds_distrib = CcsdsDistributor::new(apid_handler); - - let mut test_buf: [u8; 32] = [0; 32]; - let tc_slice = generate_ping_tc(test_buf.as_mut_slice()); - - ccsds_distrib - .pass_tc(tc_slice) - .expect("Passing TC slice failed"); - - let apid_handler_casted_back = ccsds_distrib.packet_handler_mut(); - assert!(!apid_handler_casted_back - .handler_base - .known_packet_queue - .is_empty()); - let handler_owned_queue = apid_handler_casted_back - .pus_distrib - .service_distributor_mut(); - assert!(!handler_owned_queue.0.is_empty()); - let packet_info = handler_owned_queue.0.pop_front().unwrap(); - assert_eq!(packet_info.service, 17); - assert_eq!(packet_info.apid, 0x002); - assert_eq!(packet_info.packet, tc_slice); - } - - #[test] - fn test_pus_distrib_error_custom_error() { - let error = PusDistribError::CustomError(GenericSendError::RxDisconnected); - let error_string = format!("{}", error); - assert_eq!( - error_string, - "pus distribution error: rx side has disconnected" - ); - } - - #[test] - fn test_pus_distrib_error_pus_error() { - let error = PusDistribError::::PusError(PusError::CrcCalculationMissing); - let error_string = format!("{}", error); - assert_eq!( - error_string, - "pus distribution error: crc16 was not calculated" - ); - } -} diff --git a/satrs/src/tmtc/tc_helper.rs b/satrs/src/tmtc/tc_helper.rs new file mode 100644 index 0000000..dd22989 --- /dev/null +++ b/satrs/src/tmtc/tc_helper.rs @@ -0,0 +1,109 @@ +use std::sync::mpsc; + +use spacepackets::{ecss::tc::PusTcReader, SpHeader}; +use thiserror::Error; + +use crate::{ + pool::{PoolProvider, SharedStaticMemoryPool, StoreAddr, StoreError}, + pus::ReceivesEcssPusTc, + queue::GenericSendError, +}; + +use super::{ReceivesCcsdsTc, ReceivesTc}; + +#[derive(Debug, Clone, PartialEq, Eq, Error)] +pub enum StoreAndSendError { + #[error("Store error: {0}")] + Store(#[from] StoreError), + #[error("Genreric send error: {0}")] + Send(#[from] GenericSendError), +} + +#[derive(Clone)] +pub struct SharedTcPool(pub SharedStaticMemoryPool); + +impl SharedTcPool { + pub fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result { + let mut pg = self.0.write().expect("error locking TC store"); + let addr = pg.free_element(pus_tc.len_packed(), |buf| { + buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data()); + })?; + Ok(addr) + } + + pub fn add_ccsds_tc(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result { + self.add_raw_tc(tc_raw) + } + + pub fn add_raw_tc(&mut self, tc_raw: &[u8]) -> Result { + let mut pg = self.0.write().expect("error locking TC store"); + let addr = pg.free_element(tc_raw.len(), |buf| { + buf[0..tc_raw.len()].copy_from_slice(tc_raw); + })?; + Ok(addr) + } +} + +#[derive(Clone)] +pub struct TcSenderSharedPool { + pub tc_source: mpsc::SyncSender, + pub shared_pool: SharedTcPool, +} + +impl TcSenderSharedPool { + pub fn new(tc_source: mpsc::SyncSender, shared_pool: SharedTcPool) -> Self { + Self { + tc_source, + shared_pool, + } + } + + #[allow(dead_code)] + pub fn shared_pool(&self) -> SharedStaticMemoryPool { + self.shared_pool.0.clone() + } +} + +impl ReceivesTc for TcSenderSharedPool { + type Error = StoreAndSendError; + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + let addr = self.shared_pool.add_raw_tc(tc_raw)?; + self.tc_source.try_send(addr).map_err(|e| match e { + mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), + mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, + })?; + Ok(()) + } +} + +impl ReceivesEcssPusTc for TcSenderSharedPool { + type Error = StoreAndSendError; + + fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { + let addr = self.shared_pool.add_pus_tc(pus_tc)?; + self.tc_source.try_send(addr).map_err(|e| match e { + mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), + mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, + })?; + Ok(()) + } +} + +impl ReceivesCcsdsTc for TcSenderSharedPool { + type Error = StoreAndSendError; + + fn pass_ccsds(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { + let addr = self.shared_pool.add_ccsds_tc(sp_header, tc_raw)?; + self.tc_source.try_send(addr).map_err(|e| match e { + mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), + mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, + })?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + // TODO: Add tests for shared pool TC sender component. +} diff --git a/satrs/tests/tcp_servers.rs b/satrs/tests/tcp_servers.rs index 65d124d..9995e63 100644 --- a/satrs/tests/tcp_servers.rs +++ b/satrs/tests/tcp_servers.rs @@ -28,7 +28,7 @@ use satrs::{ ConnectionResult, HandledConnectionHandler, HandledConnectionInfo, ServerConfig, TcpSpacepacketsServer, TcpTmtcInCobsServer, }, - tmtc::{ReceivesTcCore, TmPacketSourceCore}, + tmtc::{ReceivesTc, TmPacketSource}, }; use spacepackets::{ ecss::{tc::PusTcCreator, WritablePusPacket}, @@ -66,7 +66,7 @@ struct SyncTcCacher { tc_queue: Arc>>>, } -impl ReceivesTcCore for SyncTcCacher { +impl ReceivesTc for SyncTcCacher { type Error = (); fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { @@ -89,7 +89,7 @@ impl SyncTmSource { } } -impl TmPacketSourceCore for SyncTmSource { +impl TmPacketSource for SyncTmSource { type Error = (); fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result {