diff --git a/satrs-example/src/config.rs b/satrs-example/src/config.rs index 7e474e9..5168927 100644 --- a/satrs-example/src/config.rs +++ b/satrs-example/src/config.rs @@ -132,6 +132,7 @@ pub mod components { GenericPus = 2, Acs = 3, Cfdp = 4, + Tmtc = 5, } // Component IDs for components with the PUS APID. @@ -150,6 +151,12 @@ pub mod components { Mgm0 = 0, } + #[derive(Copy, Clone, PartialEq, Eq)] + pub enum TmtcId { + UdpServer = 0, + TcpServer = 1, + } + pub const PUS_ACTION_SERVICE: UniqueApidTargetId = UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusAction as u32); pub const PUS_EVENT_MANAGEMENT: UniqueApidTargetId = @@ -166,6 +173,10 @@ pub mod components { UniqueApidTargetId::new(Apid::Sched as u16, 0); pub const MGM_HANDLER_0: UniqueApidTargetId = UniqueApidTargetId::new(Apid::Acs as u16, AcsId::Mgm0 as u32); + pub const UDP_SERVER: UniqueApidTargetId = + UniqueApidTargetId::new(Apid::Tmtc as u16, TmtcId::UdpServer as u32); + pub const TCP_SERVER: UniqueApidTargetId = + UniqueApidTargetId::new(Apid::Tmtc as u16, TmtcId::TcpServer as u32); } pub mod pool { diff --git a/satrs-example/src/interface/tcp.rs b/satrs-example/src/interface/tcp.rs index 3a0741d..df5616e 100644 --- a/satrs-example/src/interface/tcp.rs +++ b/satrs-example/src/interface/tcp.rs @@ -9,7 +9,7 @@ use log::{info, warn}; use satrs::{ hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer}, spacepackets::PacketId, - tmtc::{PacketSenderRaw, TmPacketSource}, + tmtc::{PacketSenderRaw, PacketSource}, }; #[derive(Default)] @@ -52,7 +52,7 @@ impl SyncTcpTmSource { } } -impl TmPacketSource for SyncTcpTmSource { +impl PacketSource for SyncTcpTmSource { type Error = (); fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { diff --git a/satrs-example/src/interface/udp.rs b/satrs-example/src/interface/udp.rs index 128dde2..8ba3675 100644 --- a/satrs-example/src/interface/udp.rs +++ b/satrs-example/src/interface/udp.rs @@ -3,7 +3,7 @@ use std::net::{SocketAddr, UdpSocket}; use std::sync::mpsc; use log::{info, warn}; -use satrs::pus::{PusTmAsVec, PusTmInPool}; +use satrs::pus::{PacketAsVec, PacketInPool}; use satrs::tmtc::PacketSenderRaw; use satrs::{ hal::std::udp_server::{ReceiveResult, UdpTcServer}, @@ -17,7 +17,7 @@ pub trait UdpTmHandler { } pub struct StaticUdpTmHandler { - pub tm_rx: mpsc::Receiver, + pub tm_rx: mpsc::Receiver, pub tm_store: SharedStaticMemoryPool, } @@ -46,7 +46,7 @@ impl UdpTmHandler for StaticUdpTmHandler { } pub struct DynamicUdpTmHandler { - pub tm_rx: mpsc::Receiver, + pub tm_rx: mpsc::Receiver, } impl UdpTmHandler for DynamicUdpTmHandler { @@ -128,21 +128,25 @@ mod tests { SpHeader, }, tmtc::PacketSenderRaw, + ComponentId, }; use satrs_example::config::{components, OBSW_SERVER_ADDR}; use super::*; + const UDP_SERVER_ID: ComponentId = 0x05; + #[derive(Default, Debug)] pub struct TestSender { - tc_vec: RefCell>>, + tc_vec: RefCell>, } impl PacketSenderRaw for TestSender { type Error = (); - fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> { + + fn send_packet(&self, sender_id: ComponentId, tc_raw: &[u8]) -> Result<(), Self::Error> { let mut mut_queue = self.tc_vec.borrow_mut(); - mut_queue.push_back(tc_raw.to_vec()); + mut_queue.push_back(PacketAsVec::new(sender_id, tc_raw.to_vec())); Ok(()) } } @@ -163,7 +167,8 @@ mod tests { let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); let test_receiver = TestSender::default(); // let tc_queue = test_receiver.tc_vec.clone(); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, test_receiver).unwrap(); + let udp_tc_server = + UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, test_receiver).unwrap(); let tm_handler = TestTmHandler::default(); let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); let mut udp_dyn_server = UdpTmtcServer { @@ -181,7 +186,8 @@ mod tests { let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); let test_receiver = TestSender::default(); // let tc_queue = test_receiver.tc_vec.clone(); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, test_receiver).unwrap(); + let udp_tc_server = + UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, test_receiver).unwrap(); let server_addr = udp_tc_server.socket.local_addr().unwrap(); let tm_handler = TestTmHandler::default(); let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); @@ -201,7 +207,9 @@ mod tests { { let mut queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow_mut(); assert!(!queue.is_empty()); - assert_eq!(queue.pop_front().unwrap(), ping_tc); + let packet_with_sender = queue.pop_front().unwrap(); + assert_eq!(packet_with_sender.packet, ping_tc); + assert_eq!(packet_with_sender.sender_id, UDP_SERVER_ID); } { diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 2cf1b52..cf8e050 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -17,7 +17,7 @@ use pus::test::create_test_service_dynamic; use satrs::hal::std::tcp_server::ServerConfig; use satrs::hal::std::udp_server::UdpTcServer; use satrs::request::GenericMessage; -use satrs::tmtc::tc_helper::{PacketSenderSharedPool, SharedPacketPool}; +use satrs::tmtc::{PacketSenderWithSharedPool, SharedPacketPool}; use satrs_example::config::pool::{create_sched_tc_pool, create_static_pools}; use satrs_example::config::tasks::{ FREQ_MS_AOCS, FREQ_MS_EVENT_HANDLING, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC, @@ -38,9 +38,8 @@ use crate::pus::{PusTcDistributor, PusTcMpscRouter}; use crate::requests::{CompositeRequest, GenericRequestRouter}; use satrs::mode::ModeRequest; use satrs::pus::event_man::EventRequestWithToken; -use satrs::pus::TmInSharedPoolSender; use satrs::spacepackets::{time::cds::CdsTime, time::TimeWriter}; -use satrs_example::config::components::MGM_HANDLER_0; +use satrs_example::config::components::{MGM_HANDLER_0, TCP_SERVER, UDP_SERVER}; use std::net::{IpAddr, SocketAddr}; use std::sync::mpsc; use std::sync::{Arc, RwLock}; @@ -50,14 +49,16 @@ use std::time::Duration; #[allow(dead_code)] fn static_tmtc_pool_main() { let (tm_pool, tc_pool) = create_static_pools(); - let shared_tm_pool = (Arc::new(RwLock::new(tm_pool))); - let shared_tc_pool = (Arc::new(RwLock::new(tc_pool))); + let shared_tm_pool = Arc::new(RwLock::new(tm_pool)); + let shared_tc_pool = Arc::new(RwLock::new(tc_pool)); + let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool); + let shared_tc_pool_wrapper = SharedPacketPool::new(&shared_tc_pool); let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50); let (tm_funnel_tx, tm_funnel_rx) = mpsc::sync_channel(50); let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50); let tm_funnel_tx_sender = - TmInSharedPoolSender::new(shared_tm_pool.clone(), tm_funnel_tx.clone()); + PacketSenderWithSharedPool::new(tm_funnel_tx.clone(), shared_tm_pool_wrapper.clone()); let (mgm_handler_composite_tx, mgm_handler_composite_rx) = mpsc::channel::>(); @@ -74,7 +75,7 @@ fn static_tmtc_pool_main() { // This helper structure is used by all telecommand providers which need to send telecommands // to the TC source. - let tc_source = PacketSenderSharedPool::new(tc_source_tx, shared_tc_pool.clone()); + let tc_source = PacketSenderWithSharedPool::new(tc_source_tx, shared_tc_pool_wrapper.clone()); // Create event handling components // These sender handles are used to send event requests, for example to enable or disable @@ -106,7 +107,7 @@ fn static_tmtc_pool_main() { }; let pus_test_service = create_test_service_static( tm_funnel_tx_sender.clone(), - shared_tc_pool.0.clone(), + shared_tc_pool.clone(), event_handler.clone_event_sender(), pus_test_rx, ); @@ -118,27 +119,27 @@ fn static_tmtc_pool_main() { ); let pus_event_service = create_event_service_static( tm_funnel_tx_sender.clone(), - shared_tc_pool.0.clone(), + shared_tc_pool.clone(), pus_event_rx, event_request_tx, ); let pus_action_service = create_action_service_static( tm_funnel_tx_sender.clone(), - shared_tc_pool.0.clone(), + shared_tc_pool.clone(), pus_action_rx, request_map.clone(), pus_action_reply_rx, ); let pus_hk_service = create_hk_service_static( tm_funnel_tx_sender.clone(), - shared_tc_pool.0.clone(), + shared_tc_pool.clone(), pus_hk_rx, request_map.clone(), pus_hk_reply_rx, ); let pus_mode_service = create_mode_service_static( tm_funnel_tx_sender.clone(), - shared_tc_pool.0.clone(), + shared_tc_pool.clone(), pus_mode_rx, request_map, pus_mode_reply_rx, @@ -153,23 +154,29 @@ fn static_tmtc_pool_main() { ); let mut tmtc_task = TcSourceTaskStatic::new( - shared_tc_pool.clone(), + shared_tc_pool_wrapper.clone(), tc_source_rx, PusTcDistributor::new(tm_funnel_tx_sender, pus_router), ); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, tc_source.clone()) + let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_source.clone()) .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, tm_handler: StaticUdpTmHandler { tm_rx: tm_server_rx, - tm_store: shared_tm_pool.clone_backing_pool(), + tm_store: shared_tm_pool.clone(), }, }; - let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); + let tcp_server_cfg = ServerConfig::new( + TCP_SERVER.id(), + sock_addr, + Duration::from_millis(400), + 4096, + 8192, + ); let sync_tm_tcp_source = SyncTcpTmSource::new(200); let mut tcp_server = TcpTask::new( tcp_server_cfg, @@ -180,7 +187,7 @@ fn static_tmtc_pool_main() { .expect("tcp server creation failed"); let mut tm_funnel = TmFunnelStatic::new( - shared_tm_pool, + shared_tm_pool_wrapper, sync_tm_tcp_source, tm_funnel_rx, tm_server_tx, @@ -379,7 +386,7 @@ fn dyn_tmtc_pool_main() { ); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, tc_source_tx.clone()) + let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_source_tx.clone()) .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, @@ -388,7 +395,13 @@ fn dyn_tmtc_pool_main() { }, }; - let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); + let tcp_server_cfg = ServerConfig::new( + TCP_SERVER.id(), + sock_addr, + Duration::from_millis(400), + 4096, + 8192, + ); let sync_tm_tcp_source = SyncTcpTmSource::new(200); let mut tcp_server = TcpTask::new( tcp_server_cfg, diff --git a/satrs-example/src/pus/action.rs b/satrs-example/src/pus/action.rs index 39b7a54..06677fe 100644 --- a/satrs-example/src/pus/action.rs +++ b/satrs-example/src/pus/action.rs @@ -12,12 +12,13 @@ use satrs::pus::verification::{ use satrs::pus::{ ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, MpscTcReceiver, - MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded, PusPacketHandlerResult, PusReplyHandler, - PusServiceHelper, PusTcToRequestConverter, PusTmAsVec, PusTmInPool, TmInSharedPoolSender, + MpscTmAsVecSender, PacketAsVec, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, + PusTcToRequestConverter, }; use satrs::request::{GenericMessage, UniqueApidTargetId}; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket}; +use satrs::tmtc::PacketSenderWithSharedPool; use satrs_example::config::components::PUS_ACTION_SERVICE; use satrs_example::config::tmtc_err; use std::sync::mpsc; @@ -195,12 +196,12 @@ impl PusTcToRequestConverter for Actio } pub fn create_action_service_static( - tm_sender: TmInSharedPoolSender>, + tm_sender: PacketSenderWithSharedPool, tc_pool: SharedStaticMemoryPool, pus_action_rx: mpsc::Receiver, action_router: GenericRequestRouter, reply_receiver: mpsc::Receiver>, -) -> ActionServiceWrapper { +) -> ActionServiceWrapper { let action_request_handler = PusTargetedRequestService::new( PusServiceHelper::new( PUS_ACTION_SERVICE.id(), @@ -223,7 +224,7 @@ pub fn create_action_service_static( } pub fn create_action_service_dynamic( - tm_funnel_tx: mpsc::Sender, + tm_funnel_tx: mpsc::Sender, pus_action_rx: mpsc::Receiver, action_router: GenericRequestRouter, reply_receiver: mpsc::Receiver>, diff --git a/satrs-example/src/pus/event.rs b/satrs-example/src/pus/event.rs index bd29c93..7e35c13 100644 --- a/satrs-example/src/pus/event.rs +++ b/satrs-example/src/pus/event.rs @@ -8,19 +8,20 @@ use satrs::pus::event_srv::PusEventServiceHandler; use satrs::pus::verification::VerificationReporter; use satrs::pus::{ EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, - EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded, - PusPacketHandlerResult, PusServiceHelper, PusTmAsVec, PusTmInPool, TmInSharedPoolSender, + EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, PacketAsVec, PusPacketHandlerResult, + PusServiceHelper, }; +use satrs::tmtc::PacketSenderWithSharedPool; use satrs_example::config::components::PUS_EVENT_MANAGEMENT; use super::HandlingStatus; pub fn create_event_service_static( - tm_sender: TmInSharedPoolSender>, + tm_sender: PacketSenderWithSharedPool, tc_pool: SharedStaticMemoryPool, pus_event_rx: mpsc::Receiver, event_request_tx: mpsc::Sender, -) -> EventServiceWrapper { +) -> EventServiceWrapper { let pus_5_handler = PusEventServiceHandler::new( PusServiceHelper::new( PUS_EVENT_MANAGEMENT.id(), @@ -37,7 +38,7 @@ pub fn create_event_service_static( } pub fn create_event_service_dynamic( - tm_funnel_tx: mpsc::Sender, + tm_funnel_tx: mpsc::Sender, pus_event_rx: mpsc::Receiver, event_request_tx: mpsc::Sender, ) -> EventServiceWrapper { diff --git a/satrs-example/src/pus/hk.rs b/satrs-example/src/pus/hk.rs index 6436a7a..8900967 100644 --- a/satrs-example/src/pus/hk.rs +++ b/satrs-example/src/pus/hk.rs @@ -9,13 +9,13 @@ use satrs::pus::verification::{ use satrs::pus::{ ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTmSender, - EcssTmtcError, GenericConversionError, MpscTcReceiver, MpscTmAsVecSender, - MpscTmInSharedPoolSenderBounded, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, - PusTcToRequestConverter, PusTmAsVec, PusTmInPool, TmInSharedPoolSender, + EcssTmtcError, GenericConversionError, MpscTcReceiver, MpscTmAsVecSender, PacketAsVec, + PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter, }; use satrs::request::{GenericMessage, UniqueApidTargetId}; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::{hk, PusPacket}; +use satrs::tmtc::PacketSenderWithSharedPool; use satrs_example::config::components::PUS_HK_SERVICE; use satrs_example::config::{hk_err, tmtc_err}; use std::sync::mpsc; @@ -232,12 +232,12 @@ impl PusTcToRequestConverter for HkRequestConver } pub fn create_hk_service_static( - tm_sender: TmInSharedPoolSender>, + tm_sender: PacketSenderWithSharedPool, tc_pool: SharedStaticMemoryPool, pus_hk_rx: mpsc::Receiver, request_router: GenericRequestRouter, reply_receiver: mpsc::Receiver>, -) -> HkServiceWrapper { +) -> HkServiceWrapper { let pus_3_handler = PusTargetedRequestService::new( PusServiceHelper::new( PUS_HK_SERVICE.id(), @@ -258,7 +258,7 @@ pub fn create_hk_service_static( } pub fn create_hk_service_dynamic( - tm_funnel_tx: mpsc::Sender, + tm_funnel_tx: mpsc::Sender, pus_hk_rx: mpsc::Receiver, request_router: GenericRequestRouter, reply_receiver: mpsc::Receiver>, diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index 9156486..675284d 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -6,7 +6,7 @@ use satrs::pus::verification::{ }; use satrs::pus::{ ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, - EcssTcReceiverCore, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError, + EcssTcReceiver, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError, PusPacketHandlerResult, PusPacketHandlingError, PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory, }; @@ -185,7 +185,7 @@ pub trait TargetedPusService { /// 2. [Self::poll_and_check_next_reply] which tries to poll and handle one reply, covering step 6. /// 3. [Self::check_for_request_timeouts] which checks for request timeouts, covering step 7. pub struct PusTargetedRequestService< - TcReceiver: EcssTcReceiverCore, + TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, @@ -207,7 +207,7 @@ pub struct PusTargetedRequestService< } impl< - TcReceiver: EcssTcReceiverCore, + TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, @@ -461,7 +461,7 @@ pub(crate) mod tests { use std::time::Duration; use satrs::pus::test_util::TEST_COMPONENT_ID_0; - use satrs::pus::{MpscTmAsVecSender, PusTmAsVec, PusTmVariant}; + use satrs::pus::{MpscTmAsVecSender, PacketAsVec, PusTmVariant}; use satrs::request::RequestId; use satrs::{ pus::{ @@ -491,7 +491,7 @@ pub(crate) mod tests { pub id: ComponentId, pub verif_reporter: TestVerificationReporter, pub reply_handler: ReplyHandler, - pub tm_receiver: mpsc::Receiver, + pub tm_receiver: mpsc::Receiver, pub default_timeout: Duration, tm_sender: MpscTmAsVecSender, phantom: std::marker::PhantomData<(ActiveRequestInfo, Reply)>, @@ -698,7 +698,7 @@ pub(crate) mod tests { ReplyType, >, pub request_id: Option, - pub tm_funnel_rx: mpsc::Receiver, + pub tm_funnel_rx: mpsc::Receiver, pub pus_packet_tx: mpsc::Sender, pub reply_tx: mpsc::Sender>, pub request_rx: mpsc::Receiver>, diff --git a/satrs-example/src/pus/mode.rs b/satrs-example/src/pus/mode.rs index 6750ec4..42054b6 100644 --- a/satrs-example/src/pus/mode.rs +++ b/satrs-example/src/pus/mode.rs @@ -1,5 +1,6 @@ use derive_new::new; use log::{error, warn}; +use satrs::tmtc::PacketSenderWithSharedPool; use std::sync::mpsc; use std::time::Duration; @@ -8,8 +9,8 @@ use satrs::pool::SharedStaticMemoryPool; use satrs::pus::verification::VerificationReporter; use satrs::pus::{ DefaultActiveRequestMap, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, - EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded, - PusPacketHandlerResult, PusServiceHelper, PusTmAsVec, PusTmInPool, TmInSharedPoolSender, + EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, PacketAsVec, PusPacketHandlerResult, + PusServiceHelper, }; use satrs::request::GenericMessage; use satrs::{ @@ -203,12 +204,12 @@ impl PusTcToRequestConverter for ModeRequestCo } pub fn create_mode_service_static( - tm_sender: TmInSharedPoolSender>, + tm_sender: PacketSenderWithSharedPool, tc_pool: SharedStaticMemoryPool, pus_action_rx: mpsc::Receiver, mode_router: GenericRequestRouter, reply_receiver: mpsc::Receiver>, -) -> ModeServiceWrapper { +) -> ModeServiceWrapper { let mode_request_handler = PusTargetedRequestService::new( PusServiceHelper::new( PUS_MODE_SERVICE.id(), @@ -229,7 +230,7 @@ pub fn create_mode_service_static( } pub fn create_mode_service_dynamic( - tm_funnel_tx: mpsc::Sender, + tm_funnel_tx: mpsc::Sender, pus_action_rx: mpsc::Receiver, mode_router: GenericRequestRouter, reply_receiver: mpsc::Receiver>, diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index 83a383d..51fb595 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -9,20 +9,27 @@ use satrs::pus::scheduler_srv::PusSchedServiceHandler; use satrs::pus::verification::VerificationReporter; use satrs::pus::{ EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, - EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded, - PusPacketHandlerResult, PusServiceHelper, PusTmAsVec, PusTmInPool, TmInSharedPoolSender, + EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, PacketAsVec, PacketInPool, + PusPacketHandlerResult, PusServiceHelper, }; -use satrs::tmtc::tc_helper::{PacketSenderSharedPool, SharedPacketPool}; +use satrs::tmtc::PacketSenderWithSharedPool; +use satrs::ComponentId; use satrs_example::config::components::PUS_SCHED_SERVICE; use super::HandlingStatus; pub trait TcReleaser { - fn release(&mut self, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool; + fn release(&mut self, sender_id: ComponentId, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool; } -impl TcReleaser for PacketSenderSharedPool { - fn release(&mut self, enabled: bool, _info: &TcInfo, tc: &[u8]) -> bool { +impl TcReleaser for PacketSenderWithSharedPool { + fn release( + &mut self, + sender_id: ComponentId, + enabled: bool, + _info: &TcInfo, + tc: &[u8], + ) -> bool { if enabled { let shared_pool = self.shared_pool.get_mut(); // Transfer TC from scheduler TC pool to shared TC pool. @@ -32,19 +39,25 @@ impl TcReleaser for PacketSenderSharedPool { .expect("locking pool failed") .add(tc) .expect("adding TC to shared pool failed"); - self.tc_source - .send(released_tc_addr) + self.sender + .send(PacketInPool::new(sender_id, released_tc_addr)) .expect("sending TC to TC source failed"); } true } } -impl TcReleaser for mpsc::Sender> { - fn release(&mut self, enabled: bool, _info: &TcInfo, tc: &[u8]) -> bool { +impl TcReleaser for mpsc::Sender { + fn release( + &mut self, + sender_id: ComponentId, + enabled: bool, + _info: &TcInfo, + tc: &[u8], + ) -> bool { if enabled { // Send released TC to centralized TC source. - self.send(tc.to_vec()) + self.send(PacketAsVec::new(sender_id, tc.to_vec())) .expect("sending TC to TC source failed"); } true @@ -69,8 +82,9 @@ impl SchedulingServiceWrapper { pub fn release_tcs(&mut self) { + let id = self.pus_11_handler.service_helper.id(); let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool { - self.tc_releaser.release(enabled, info, tc) + self.tc_releaser.release(id, enabled, info, tc) }; self.pus_11_handler @@ -118,11 +132,11 @@ impl } pub fn create_scheduler_service_static( - tm_sender: TmInSharedPoolSender>, - tc_releaser: PacketSenderSharedPool, + tm_sender: PacketSenderWithSharedPool, + tc_releaser: PacketSenderWithSharedPool, pus_sched_rx: mpsc::Receiver, sched_tc_pool: StaticMemoryPool, -) -> SchedulingServiceWrapper { +) -> SchedulingServiceWrapper { let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) .expect("Creating PUS Scheduler failed"); let pus_11_handler = PusSchedServiceHandler::new( @@ -144,8 +158,8 @@ pub fn create_scheduler_service_static( } pub fn create_scheduler_service_dynamic( - tm_funnel_tx: mpsc::Sender, - tc_source_sender: mpsc::Sender>, + tm_funnel_tx: mpsc::Sender, + tc_source_sender: mpsc::Sender, pus_sched_rx: mpsc::Receiver, sched_tc_pool: StaticMemoryPool, ) -> SchedulingServiceWrapper { diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index f232823..7a745a0 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -7,13 +7,13 @@ use satrs::pus::verification::{FailParams, VerificationReporter, VerificationRep use satrs::pus::EcssTcInSharedStoreConverter; use satrs::pus::{ EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, EcssTmSender, MpscTcReceiver, - MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded, PusPacketHandlerResult, PusServiceHelper, - PusTmAsVec, PusTmInPool, TmInSharedPoolSender, + MpscTmAsVecSender, PacketAsVec, PusPacketHandlerResult, PusServiceHelper, }; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::PusPacket; use satrs::spacepackets::time::cds::CdsTime; use satrs::spacepackets::time::TimeWriter; +use satrs::tmtc::PacketSenderWithSharedPool; use satrs_example::config::components::PUS_TEST_SERVICE; use satrs_example::config::{tmtc_err, TEST_EVENT}; use std::sync::mpsc; @@ -21,11 +21,11 @@ use std::sync::mpsc; use super::HandlingStatus; pub fn create_test_service_static( - tm_sender: TmInSharedPoolSender>, + tm_sender: PacketSenderWithSharedPool, tc_pool: SharedStaticMemoryPool, event_sender: mpsc::Sender, pus_test_rx: mpsc::Receiver, -) -> TestCustomServiceWrapper { +) -> TestCustomServiceWrapper { let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( PUS_TEST_SERVICE.id(), pus_test_rx, @@ -40,7 +40,7 @@ pub fn create_test_service_static( } pub fn create_test_service_dynamic( - tm_funnel_tx: mpsc::Sender, + tm_funnel_tx: mpsc::Sender, event_sender: mpsc::Sender, pus_test_rx: mpsc::Receiver, ) -> TestCustomServiceWrapper { diff --git a/satrs-example/src/tmtc/tc_source.rs b/satrs-example/src/tmtc/tc_source.rs index ccacce4..de4c3f5 100644 --- a/satrs-example/src/tmtc/tc_source.rs +++ b/satrs-example/src/tmtc/tc_source.rs @@ -1,9 +1,12 @@ -use satrs::{pool::PoolProvider, tmtc::tc_helper::SharedPacketPool}; +use satrs::{ + pool::PoolProvider, + pus::{PacketAsVec, PacketInPool}, + tmtc::{PacketSenderWithSharedPool, SharedPacketPool}, +}; use std::sync::mpsc::{self, TryRecvError}; use satrs::{ - pool::StoreAddr, - pus::{MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded}, + pus::MpscTmAsVecSender, spacepackets::ecss::{tc::PusTcReader, PusPacket}, }; @@ -12,16 +15,16 @@ use crate::pus::PusTcDistributor; // TC source components where static pools are the backing memory of the received telecommands. pub struct TcSourceTaskStatic { shared_tc_pool: SharedPacketPool, - tc_receiver: mpsc::Receiver, + tc_receiver: mpsc::Receiver, tc_buf: [u8; 4096], - pus_receiver: PusTcDistributor, + pus_receiver: PusTcDistributor, } impl TcSourceTaskStatic { pub fn new( shared_tc_pool: SharedPacketPool, - tc_receiver: mpsc::Receiver, - pus_receiver: PusTcDistributor, + tc_receiver: mpsc::Receiver, + pus_receiver: PusTcDistributor, ) -> Self { Self { shared_tc_pool, @@ -37,20 +40,20 @@ impl TcSourceTaskStatic { pub fn poll_tc(&mut self) -> bool { match self.tc_receiver.try_recv() { - Ok(addr) => { + Ok(packet_in_pool) => { let pool = self .shared_tc_pool .0 .read() .expect("locking tc pool failed"); - pool.read(&addr, &mut self.tc_buf) + pool.read(&packet_in_pool.store_addr, &mut self.tc_buf) .expect("reading pool failed"); drop(pool); match PusTcReader::new(&self.tc_buf) { Ok((pus_tc, _)) => { self.pus_receiver .handle_tc_packet( - satrs::pus::TcInMemory::StoreAddr(addr), + satrs::pus::TcInMemory::StoreAddr(packet_in_pool.store_addr), pus_tc.service(), &pus_tc, ) @@ -77,13 +80,13 @@ impl TcSourceTaskStatic { // TC source components where the heap is the backing memory of the received telecommands. pub struct TcSourceTaskDynamic { - pub tc_receiver: mpsc::Receiver>, + pub tc_receiver: mpsc::Receiver, pus_receiver: PusTcDistributor, } impl TcSourceTaskDynamic { pub fn new( - tc_receiver: mpsc::Receiver>, + tc_receiver: mpsc::Receiver, pus_receiver: PusTcDistributor, ) -> Self { Self { @@ -99,11 +102,11 @@ impl TcSourceTaskDynamic { pub fn poll_tc(&mut self) -> bool { // Right now, we only expect PUS packets. match self.tc_receiver.try_recv() { - Ok(tc) => match PusTcReader::new(&tc) { + Ok(packet_as_vec) => match PusTcReader::new(&packet_as_vec.packet) { Ok((pus_tc, _)) => { self.pus_receiver .handle_tc_packet( - satrs::pus::TcInMemory::Vec(tc.clone()), + satrs::pus::TcInMemory::Vec(packet_as_vec.packet.clone()), pus_tc.service(), &pus_tc, ) @@ -112,7 +115,7 @@ impl TcSourceTaskDynamic { } Err(e) => { log::warn!("error creating PUS TC from raw data: {e}"); - log::warn!("raw data: {:x?}", tc); + log::warn!("raw data: {:x?}", packet_as_vec.packet); true } }, diff --git a/satrs-example/src/tmtc/tm_sink.rs b/satrs-example/src/tmtc/tm_sink.rs index f81d6bf..813d0d0 100644 --- a/satrs-example/src/tmtc/tm_sink.rs +++ b/satrs-example/src/tmtc/tm_sink.rs @@ -4,7 +4,6 @@ use std::{ }; use log::info; -use satrs::pus::{PusTmAsVec, PusTmInPool}; use satrs::{ pool::PoolProvider, seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, @@ -13,7 +12,10 @@ use satrs::{ time::cds::MIN_CDS_FIELD_LEN, CcsdsPacket, }, - tmtc::tm_helper::SharedTmPool, +}; +use satrs::{ + pus::{PacketAsVec, PacketInPool}, + tmtc::SharedPacketPool, }; use crate::interface::tcp::SyncTcpTmSource; @@ -77,17 +79,17 @@ impl TmFunnelCommon { pub struct TmFunnelStatic { common: TmFunnelCommon, - shared_tm_store: SharedTmPool, - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::SyncSender, + shared_tm_store: SharedPacketPool, + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::SyncSender, } impl TmFunnelStatic { pub fn new( - shared_tm_store: SharedTmPool, + shared_tm_store: SharedPacketPool, sync_tm_tcp_source: SyncTcpTmSource, - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::SyncSender, + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::SyncSender, ) -> Self { Self { common: TmFunnelCommon::new(sync_tm_tcp_source), @@ -101,7 +103,7 @@ impl TmFunnelStatic { if let Ok(pus_tm_in_pool) = self.tm_funnel_rx.recv() { // Read the TM, set sequence counter and message counter, and finally update // the CRC. - let shared_pool = self.shared_tm_store.clone_backing_pool(); + let shared_pool = self.shared_tm_store.0.clone(); let mut pool_guard = shared_pool.write().expect("Locking TM pool failed"); let mut tm_copy = Vec::new(); pool_guard @@ -124,15 +126,15 @@ impl TmFunnelStatic { pub struct TmFunnelDynamic { common: TmFunnelCommon, - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::Sender, + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::Sender, } impl TmFunnelDynamic { pub fn new( sync_tm_tcp_source: SyncTcpTmSource, - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::Sender, + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::Sender, ) -> Self { Self { common: TmFunnelCommon::new(sync_tm_tcp_source), diff --git a/satrs/src/encoding/ccsds.rs b/satrs/src/encoding/ccsds.rs index 971c25f..fc1418f 100644 --- a/satrs/src/encoding/ccsds.rs +++ b/satrs/src/encoding/ccsds.rs @@ -1,4 +1,4 @@ -use crate::{tmtc::PacketSenderRaw, ValidatorU16Id}; +use crate::{tmtc::PacketSenderRaw, ComponentId, ValidatorU16Id}; /// This function parses a given buffer for tightly packed CCSDS space packets. It uses the /// [spacepackets::PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet @@ -10,11 +10,12 @@ use crate::{tmtc::PacketSenderRaw, ValidatorU16Id}; /// index for future write operations will be written to the `next_write_idx` argument. /// /// The parser will forward all packets which were decoded successfully to the given -/// `packet_sender` and return the number of packets found. If the [PacketSenderRaw::send_raw_tc] +/// `packet_sender` and return the number of packets found. If the [PacketSenderRaw::send_packet] /// calls fails, the error will be returned. pub fn parse_buffer_for_ccsds_space_packets( buf: &mut [u8], packet_id_validator: &(impl ValidatorU16Id + ?Sized), + sender_id: ComponentId, packet_sender: &(impl PacketSenderRaw + ?Sized), next_write_idx: &mut usize, ) -> Result { @@ -32,7 +33,10 @@ pub fn parse_buffer_for_ccsds_space_packets( u16::from_be_bytes(buf[current_idx + 4..current_idx + 6].try_into().unwrap()); let packet_size = length_field + 7; if (current_idx + packet_size as usize) <= buf_len { - packet_sender.send_raw_tc(&buf[current_idx..current_idx + packet_size as usize])?; + packet_sender.send_packet( + sender_id, + &buf[current_idx..current_idx + packet_size as usize], + )?; packets_found += 1; } else { // Move packet to start of buffer if applicable. @@ -56,10 +60,11 @@ mod tests { PacketId, SpHeader, }; - use crate::encoding::tests::TcCacher; + use crate::{encoding::tests::TcCacher, ComponentId}; use super::parse_buffer_for_ccsds_space_packets; + const PARSER_ID: ComponentId = 0x05; const TEST_APID_0: u16 = 0x02; const TEST_APID_1: u16 = 0x10; const TEST_PACKET_ID_0: PacketId = PacketId::new_for_tc(true, TEST_APID_0); @@ -79,6 +84,7 @@ mod tests { let parse_result = parse_buffer_for_ccsds_space_packets( &mut buffer, valid_packet_ids.as_slice(), + PARSER_ID, &tc_cacher, &mut next_write_idx, ); @@ -87,7 +93,9 @@ mod tests { assert_eq!(parsed_packets, 1); let mut queue = tc_cacher.tc_queue.borrow_mut(); assert_eq!(queue.len(), 1); - assert_eq!(queue.pop_front().unwrap(), buffer[..packet_len]); + let packet_with_sender = queue.pop_front().unwrap(); + assert_eq!(packet_with_sender.packet, buffer[..packet_len]); + assert_eq!(packet_with_sender.sender_id, PARSER_ID); } #[test] @@ -108,6 +116,7 @@ mod tests { let parse_result = parse_buffer_for_ccsds_space_packets( &mut buffer, valid_packet_ids.as_slice(), + PARSER_ID, &tc_cacher, &mut next_write_idx, ); @@ -116,9 +125,13 @@ mod tests { assert_eq!(parsed_packets, 2); let mut queue = tc_cacher.tc_queue.borrow_mut(); assert_eq!(queue.len(), 2); - assert_eq!(queue.pop_front().unwrap(), buffer[..packet_len_ping]); + let packet_with_addr = queue.pop_front().unwrap(); + assert_eq!(packet_with_addr.packet, buffer[..packet_len_ping]); + assert_eq!(packet_with_addr.sender_id, PARSER_ID); + let packet_with_addr = queue.pop_front().unwrap(); + assert_eq!(packet_with_addr.sender_id, PARSER_ID); assert_eq!( - queue.pop_front().unwrap(), + packet_with_addr.packet, buffer[packet_len_ping..packet_len_ping + packet_len_action] ); } @@ -142,6 +155,7 @@ mod tests { let parse_result = parse_buffer_for_ccsds_space_packets( &mut buffer, valid_packet_ids.as_slice(), + PARSER_ID, &tc_cacher, &mut next_write_idx, ); @@ -150,9 +164,11 @@ mod tests { assert_eq!(parsed_packets, 2); let mut queue = tc_cacher.tc_queue.borrow_mut(); assert_eq!(queue.len(), 2); - assert_eq!(queue.pop_front().unwrap(), buffer[..packet_len_ping]); + let packet_with_addr = queue.pop_front().unwrap(); + assert_eq!(packet_with_addr.packet, buffer[..packet_len_ping]); + let packet_with_addr = queue.pop_front().unwrap(); assert_eq!( - queue.pop_front().unwrap(), + packet_with_addr.packet, buffer[packet_len_ping..packet_len_ping + packet_len_action] ); } @@ -176,6 +192,7 @@ mod tests { let parse_result = parse_buffer_for_ccsds_space_packets( &mut buffer[..packet_len_ping + packet_len_action - 4], valid_packet_ids.as_slice(), + PARSER_ID, &tc_cacher, &mut next_write_idx, ); @@ -204,6 +221,7 @@ mod tests { let parse_result = parse_buffer_for_ccsds_space_packets( &mut buffer[..packet_len_ping - 4], valid_packet_ids.as_slice(), + PARSER_ID, &tc_cacher, &mut next_write_idx, ); diff --git a/satrs/src/encoding/cobs.rs b/satrs/src/encoding/cobs.rs index cea4162..f4377a2 100644 --- a/satrs/src/encoding/cobs.rs +++ b/satrs/src/encoding/cobs.rs @@ -1,4 +1,4 @@ -use crate::tmtc::PacketSenderRaw; +use crate::{tmtc::PacketSenderRaw, ComponentId}; use cobs::{decode_in_place, encode, max_encoding_length}; /// This function encodes the given packet with COBS and also wraps the encoded packet with @@ -57,6 +57,7 @@ pub fn encode_packet_with_cobs( /// The parser will write all packets which were decoded successfully to the given `tc_receiver`. pub fn parse_buffer_for_cobs_encoded_packets( buf: &mut [u8], + sender_id: ComponentId, packet_sender: &(impl PacketSenderRaw + ?Sized), next_write_idx: &mut usize, ) -> Result { @@ -78,8 +79,10 @@ pub fn parse_buffer_for_cobs_encoded_packets( let decode_result = decode_in_place(&mut buf[start_index_packet..i]); if let Ok(packet_len) = decode_result { packets_found += 1; - packet_sender - .send_raw_tc(&buf[start_index_packet..start_index_packet + packet_len])?; + packet_sender.send_packet( + sender_id, + &buf[start_index_packet..start_index_packet + packet_len], + )?; } start_found = false; } else { @@ -100,10 +103,15 @@ pub fn parse_buffer_for_cobs_encoded_packets( pub(crate) mod tests { use cobs::encode; - use crate::encoding::tests::{encode_simple_packet, TcCacher, INVERTED_PACKET, SIMPLE_PACKET}; + use crate::{ + encoding::tests::{encode_simple_packet, TcCacher, INVERTED_PACKET, SIMPLE_PACKET}, + ComponentId, + }; use super::parse_buffer_for_cobs_encoded_packets; + const PARSER_ID: ComponentId = 0x05; + #[test] fn test_parsing_simple_packet() { let test_sender = TcCacher::default(); @@ -113,6 +121,7 @@ pub(crate) mod tests { let mut next_read_idx = 0; let packets = parse_buffer_for_cobs_encoded_packets( &mut encoded_buf[0..current_idx], + PARSER_ID, &test_sender, &mut next_read_idx, ) @@ -121,7 +130,7 @@ pub(crate) mod tests { let queue = test_sender.tc_queue.borrow(); assert_eq!(queue.len(), 1); let packet = &queue[0]; - assert_eq!(packet, &SIMPLE_PACKET); + assert_eq!(packet.packet, &SIMPLE_PACKET); } #[test] @@ -140,6 +149,7 @@ pub(crate) mod tests { let mut next_read_idx = 0; let packets = parse_buffer_for_cobs_encoded_packets( &mut encoded_buf[0..current_idx], + PARSER_ID, &test_sender, &mut next_read_idx, ) @@ -148,9 +158,9 @@ pub(crate) mod tests { let queue = test_sender.tc_queue.borrow(); assert_eq!(queue.len(), 2); let packet0 = &queue[0]; - assert_eq!(packet0, &SIMPLE_PACKET); + assert_eq!(packet0.packet, &SIMPLE_PACKET); let packet1 = &queue[1]; - assert_eq!(packet1, &INVERTED_PACKET); + assert_eq!(packet1.packet, &INVERTED_PACKET); } #[test] @@ -163,6 +173,7 @@ pub(crate) mod tests { let packets = parse_buffer_for_cobs_encoded_packets( // Cut off the sentinel byte at the end. &mut encoded_buf[0..current_idx - 1], + PARSER_ID, &test_sender, &mut next_read_idx, ) @@ -196,6 +207,7 @@ pub(crate) mod tests { let packets = parse_buffer_for_cobs_encoded_packets( // Cut off the sentinel byte at the end. &mut encoded_buf[0..current_idx - cut_off], + PARSER_ID, &test_sender, &mut next_write_idx, ) @@ -203,7 +215,7 @@ pub(crate) mod tests { assert_eq!(packets, 1); let queue = test_sender.tc_queue.borrow(); assert_eq!(queue.len(), 1); - assert_eq!(&queue[0], &SIMPLE_PACKET); + assert_eq!(&queue[0].packet, &SIMPLE_PACKET); assert_eq!(next_write_idx, next_expected_write_idx); assert_eq!(encoded_buf[..next_expected_write_idx], expected_at_start); } @@ -237,6 +249,7 @@ pub(crate) mod tests { let packets = parse_buffer_for_cobs_encoded_packets( // Cut off the sentinel byte at the end. &mut encoded_buf[0..current_idx], + PARSER_ID, &test_sender, &mut next_write_idx, ) @@ -244,7 +257,7 @@ pub(crate) mod tests { assert_eq!(packets, 1); let queue = test_sender.tc_queue.borrow_mut(); assert_eq!(queue.len(), 1); - assert_eq!(&queue[0], &SIMPLE_PACKET); + assert_eq!(&queue[0].packet, &SIMPLE_PACKET); assert_eq!(next_write_idx, 1); assert_eq!(encoded_buf[0], 0); } @@ -257,6 +270,7 @@ pub(crate) mod tests { let packets = parse_buffer_for_cobs_encoded_packets( // Cut off the sentinel byte at the end. &mut all_zeroes, + PARSER_ID, &test_sender, &mut next_write_idx, ) diff --git a/satrs/src/encoding/mod.rs b/satrs/src/encoding/mod.rs index ded8c9f..9f9e90f 100644 --- a/satrs/src/encoding/mod.rs +++ b/satrs/src/encoding/mod.rs @@ -8,9 +8,9 @@ pub use crate::encoding::cobs::{encode_packet_with_cobs, parse_buffer_for_cobs_e pub(crate) mod tests { use core::cell::RefCell; - use alloc::{collections::VecDeque, vec::Vec}; + use alloc::collections::VecDeque; - use crate::tmtc::PacketSenderRaw; + use crate::{pus::PacketAsVec, tmtc::PacketSenderRaw, ComponentId}; use super::cobs::encode_packet_with_cobs; @@ -19,15 +19,15 @@ pub(crate) mod tests { #[derive(Default)] pub(crate) struct TcCacher { - pub(crate) tc_queue: RefCell>>, + pub(crate) tc_queue: RefCell>, } impl PacketSenderRaw for TcCacher { type Error = (); - fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> { + fn send_packet(&self, sender_id: ComponentId, tc_raw: &[u8]) -> Result<(), Self::Error> { let mut mut_queue = self.tc_queue.borrow_mut(); - mut_queue.push_back(tc_raw.to_vec()); + mut_queue.push_back(PacketAsVec::new(sender_id, tc_raw.to_vec())); Ok(()) } } diff --git a/satrs/src/hal/std/tcp_cobs_server.rs b/satrs/src/hal/std/tcp_cobs_server.rs index fbf1022..f720d92 100644 --- a/satrs/src/hal/std/tcp_cobs_server.rs +++ b/satrs/src/hal/std/tcp_cobs_server.rs @@ -11,11 +11,12 @@ use std::vec::Vec; use crate::encoding::parse_buffer_for_cobs_encoded_packets; use crate::tmtc::PacketSenderRaw; -use crate::tmtc::TmPacketSource; +use crate::tmtc::PacketSource; use crate::hal::std::tcp_server::{ ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer, }; +use crate::ComponentId; use super::tcp_server::HandledConnectionHandler; use super::tcp_server::HandledConnectionInfo; @@ -28,6 +29,7 @@ impl TcpTcParser for CobsTcParser { fn handle_tc_parsing( &mut self, tc_buffer: &mut [u8], + sender_id: ComponentId, tc_sender: &(impl PacketSenderRaw + ?Sized), conn_result: &mut HandledConnectionInfo, current_write_idx: usize, @@ -35,6 +37,7 @@ impl TcpTcParser for CobsTcParser { ) -> Result<(), TcpTmtcError> { conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets( &mut tc_buffer[..current_write_idx], + sender_id, tc_sender, next_write_idx, ) @@ -62,7 +65,7 @@ impl TcpTmSender for CobsTmSender { fn handle_tm_sending( &mut self, tm_buffer: &mut [u8], - tm_source: &mut (impl TmPacketSource + ?Sized), + tm_source: &mut (impl PacketSource + ?Sized), conn_result: &mut HandledConnectionInfo, stream: &mut TcpStream, ) -> Result> { @@ -101,7 +104,7 @@ impl TcpTmSender for CobsTmSender { /// Telemetry will be encoded with the COBS protocol using [cobs::encode] in addition to being /// wrapped with the sentinel value 0 as the packet delimiter as well before being sent back to /// the client. Please note that the server will send as much data as it can retrieve from the -/// [TmPacketSource] in its current implementation. +/// [PacketSource] in its current implementation. /// /// Using a framing protocol like COBS imposes minimal restrictions on the type of TMTC data /// exchanged while also allowing packets with flexible size and a reliable way to reconstruct full @@ -115,7 +118,7 @@ impl TcpTmSender for CobsTmSender { /// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/tcp_servers.rs) /// test also serves as the example application for this module. pub struct TcpTmtcInCobsServer< - TmSource: TmPacketSource, + TmSource: PacketSource, TcSender: PacketSenderRaw, HandledConnection: HandledConnectionHandler, TmError, @@ -133,7 +136,7 @@ pub struct TcpTmtcInCobsServer< } impl< - TmSource: TmPacketSource, + TmSource: PacketSource, TcReceiver: PacketSenderRaw, HandledConnection: HandledConnectionHandler, TmError: 'static, @@ -208,13 +211,17 @@ mod tests { tests::{ConnectionFinishedHandler, SyncTmSource}, ConnectionResult, ServerConfig, }, + pus::PacketAsVec, queue::GenericSendError, + ComponentId, }; - use alloc::{sync::Arc, vec::Vec}; + use alloc::sync::Arc; use cobs::encode; use super::TcpTmtcInCobsServer; + const TCP_SERVER_ID: ComponentId = 0x05; + fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) { encode_packet(&SIMPLE_PACKET, encoded_buf, current_idx) } @@ -233,18 +240,18 @@ mod tests { fn generic_tmtc_server( addr: &SocketAddr, - tc_sender: mpsc::Sender>, + tc_sender: mpsc::Sender, tm_source: SyncTmSource, stop_signal: Option>, ) -> TcpTmtcInCobsServer< SyncTmSource, - mpsc::Sender>, + mpsc::Sender, ConnectionFinishedHandler, (), GenericSendError, > { TcpTmtcInCobsServer::new( - ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), + ServerConfig::new(TCP_SERVER_ID, *addr, Duration::from_millis(2), 1024, 1024), tm_source, tc_sender, ConnectionFinishedHandler::default(), @@ -302,8 +309,8 @@ mod tests { panic!("connection was not handled properly"); } // Check that the packet was received and decoded successfully. - let packet = tc_receiver.recv().expect("receiving TC failed"); - assert_eq!(packet, &SIMPLE_PACKET); + let packet_with_sender = tc_receiver.recv().expect("receiving TC failed"); + assert_eq!(packet_with_sender.packet, &SIMPLE_PACKET); matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)); } @@ -410,9 +417,11 @@ mod tests { panic!("connection was not handled properly"); } // Check that the packet was received and decoded successfully. - let packet = tc_receiver.recv().expect("receiving TC failed"); + let packet_with_sender = tc_receiver.recv().expect("receiving TC failed"); + let packet = &packet_with_sender.packet; assert_eq!(packet, &SIMPLE_PACKET); - let packet = tc_receiver.recv().expect("receiving TC failed"); + let packet_with_sender = tc_receiver.recv().expect("receiving TC failed"); + let packet = &packet_with_sender.packet; assert_eq!(packet, &INVERTED_PACKET); matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)); } diff --git a/satrs/src/hal/std/tcp_server.rs b/satrs/src/hal/std/tcp_server.rs index 4e7b918..9839a22 100644 --- a/satrs/src/hal/std/tcp_server.rs +++ b/satrs/src/hal/std/tcp_server.rs @@ -13,7 +13,8 @@ use std::net::SocketAddr; // use std::net::{SocketAddr, TcpStream}; use std::thread; -use crate::tmtc::{PacketSenderRaw, TmPacketSource}; +use crate::tmtc::{PacketSenderRaw, PacketSource}; +use crate::ComponentId; use thiserror::Error; // Re-export the TMTC in COBS server. @@ -30,7 +31,7 @@ pub use crate::hal::std::tcp_spacepackets_server::{ /// * `inner_loop_delay` - If a client connects for a longer period, but no TC is received or /// no TM needs to be sent, the TCP server will delay for the specified amount of time /// to reduce CPU load. -/// * `tm_buffer_size` - Size of the TM buffer used to read TM from the [TmPacketSource] and +/// * `tm_buffer_size` - Size of the TM buffer used to read TM from the [PacketSource] and /// encoding of that data. This buffer should at large enough to hold the maximum expected /// TM size read from the packet source. /// * `tc_buffer_size` - Size of the TC buffer used to read encoded telecommands sent from @@ -46,6 +47,7 @@ pub use crate::hal::std::tcp_spacepackets_server::{ /// default. #[derive(Debug, Copy, Clone)] pub struct ServerConfig { + pub id: ComponentId, pub addr: SocketAddr, pub inner_loop_delay: Duration, pub tm_buffer_size: usize, @@ -56,12 +58,14 @@ pub struct ServerConfig { impl ServerConfig { pub fn new( + id: ComponentId, addr: SocketAddr, inner_loop_delay: Duration, tm_buffer_size: usize, tc_buffer_size: usize, ) -> Self { Self { + id, addr, inner_loop_delay, tm_buffer_size, @@ -122,6 +126,7 @@ pub trait TcpTcParser { fn handle_tc_parsing( &mut self, tc_buffer: &mut [u8], + sender_id: ComponentId, tc_sender: &(impl PacketSenderRaw + ?Sized), conn_result: &mut HandledConnectionInfo, current_write_idx: usize, @@ -130,14 +135,14 @@ pub trait TcpTcParser { } /// Generic sender abstraction for an object which can pull telemetry from a given TM source -/// using a [TmPacketSource] and then send them back to a client using a given [TcpStream]. +/// using a [PacketSource] and then send them back to a client using a given [TcpStream]. /// The concrete implementation can also perform any encoding steps which are necessary before /// sending back the data to a client. pub trait TcpTmSender { fn handle_tm_sending( &mut self, tm_buffer: &mut [u8], - tm_source: &mut (impl TmPacketSource + ?Sized), + tm_source: &mut (impl PacketSource + ?Sized), conn_result: &mut HandledConnectionInfo, stream: &mut TcpStream, ) -> Result>; @@ -152,7 +157,7 @@ pub trait TcpTmSender { /// 1. [TcpTcParser] to parse for telecommands from the raw bytestream received from a client. /// 2. Parsed telecommands will be sent using the [PacketSenderRaw] object. /// 3. [TcpTmSender] to send telemetry pulled from a TM source back to the client. -/// 4. [TmPacketSource] as a generic TM source used by the [TcpTmSender]. +/// 4. [PacketSource] as a generic TM source used by the [TcpTmSender]. /// /// It is possible to specify custom abstractions to build a dedicated TCP TMTC server without /// having to re-implement common logic. @@ -161,7 +166,7 @@ pub trait TcpTmSender { /// /// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol. pub struct TcpTmtcGenericServer< - TmSource: TmPacketSource, + TmSource: PacketSource, TcSender: PacketSenderRaw, TmSender: TcpTmSender, TcParser: TcpTcParser, @@ -169,12 +174,13 @@ pub struct TcpTmtcGenericServer< TmError, TcSendError, > { + pub id: ComponentId, pub finished_handler: HandledConnection, pub(crate) listener: TcpListener, pub(crate) inner_loop_delay: Duration, pub(crate) tm_source: TmSource, pub(crate) tm_buffer: Vec, - pub(crate) tc_receiver: TcSender, + pub(crate) tc_sender: TcSender, pub(crate) tc_buffer: Vec, poll: Poll, events: Events, @@ -184,7 +190,7 @@ pub struct TcpTmtcGenericServer< } impl< - TmSource: TmPacketSource, + TmSource: PacketSource, TcSender: PacketSenderRaw, TmSender: TcpTmSender, TcParser: TcpTcParser, @@ -248,6 +254,7 @@ impl< .register(&mut mio_listener, Token(0), Interest::READABLE)?; Ok(Self { + id: cfg.id, tc_handler: tc_parser, tm_handler: tm_sender, poll, @@ -256,7 +263,7 @@ impl< inner_loop_delay: cfg.inner_loop_delay, tm_source, tm_buffer: vec![0; cfg.tm_buffer_size], - tc_receiver, + tc_sender: tc_receiver, tc_buffer: vec![0; cfg.tc_buffer_size], stop_signal, finished_handler, @@ -343,7 +350,8 @@ impl< if current_write_idx > 0 { self.tc_handler.handle_tc_parsing( &mut self.tc_buffer, - &self.tc_receiver, + self.id, + &self.tc_sender, &mut connection_result, current_write_idx, &mut next_write_idx, @@ -357,7 +365,8 @@ impl< if current_write_idx == self.tc_buffer.capacity() { self.tc_handler.handle_tc_parsing( &mut self.tc_buffer, - &self.tc_receiver, + self.id, + &self.tc_sender, &mut connection_result, current_write_idx, &mut next_write_idx, @@ -371,7 +380,8 @@ impl< std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => { self.tc_handler.handle_tc_parsing( &mut self.tc_buffer, - &self.tc_receiver, + self.id, + &self.tc_sender, &mut connection_result, current_write_idx, &mut next_write_idx, @@ -424,7 +434,7 @@ pub(crate) mod tests { use alloc::{collections::VecDeque, sync::Arc, vec::Vec}; - use crate::tmtc::TmPacketSource; + use crate::tmtc::PacketSource; use super::*; @@ -440,7 +450,7 @@ pub(crate) mod tests { } } - impl TmPacketSource for SyncTmSource { + impl PacketSource for SyncTmSource { type Error = (); fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { diff --git a/satrs/src/hal/std/tcp_spacepackets_server.rs b/satrs/src/hal/std/tcp_spacepackets_server.rs index 6e0eae2..7124119 100644 --- a/satrs/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs/src/hal/std/tcp_spacepackets_server.rs @@ -6,8 +6,8 @@ use std::{io::Write, net::SocketAddr}; use crate::{ encoding::parse_buffer_for_ccsds_space_packets, - tmtc::{PacketSenderRaw, TmPacketSource}, - ValidatorU16Id, + tmtc::{PacketSenderRaw, PacketSource}, + ComponentId, ValidatorU16Id, }; use super::tcp_server::{ @@ -32,6 +32,7 @@ impl TcpTcParser + ?Sized), conn_result: &mut HandledConnectionInfo, current_write_idx: usize, @@ -41,6 +42,7 @@ impl TcpTcParser TcpTmSender for SpacepacketsTmSender { fn handle_tm_sending( &mut self, tm_buffer: &mut [u8], - tm_source: &mut (impl TmPacketSource + ?Sized), + tm_source: &mut (impl PacketSource + ?Sized), conn_result: &mut HandledConnectionInfo, stream: &mut TcpStream, ) -> Result> { @@ -93,7 +95,7 @@ impl TcpTmSender for SpacepacketsTmSender { /// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/tcp_servers.rs) /// also serves as the example application for this module. pub struct TcpSpacepacketsServer< - TmSource: TmPacketSource, + TmSource: PacketSource, TcSender: PacketSenderRaw, PacketIdChecker: ValidatorU16Id, HandledConnection: HandledConnectionHandler, @@ -112,7 +114,7 @@ pub struct TcpSpacepacketsServer< } impl< - TmSource: TmPacketSource, + TmSource: PacketSource, TcReceiver: PacketSenderRaw, PacketIdChecker: ValidatorU16Id, HandledConnection: HandledConnectionHandler, @@ -191,7 +193,7 @@ mod tests { thread, }; - use alloc::{sync::Arc, vec::Vec}; + use alloc::sync::Arc; use hashbrown::HashSet; use spacepackets::{ ecss::{tc::PusTcCreator, WritablePusPacket}, @@ -203,11 +205,14 @@ mod tests { tests::{ConnectionFinishedHandler, SyncTmSource}, ConnectionResult, ServerConfig, }, + pus::PacketAsVec, queue::GenericSendError, + ComponentId, }; use super::TcpSpacepacketsServer; + const TCP_SERVER_ID: ComponentId = 0x05; const TEST_APID_0: u16 = 0x02; const TEST_PACKET_ID_0: PacketId = PacketId::new_for_tc(true, TEST_APID_0); const TEST_APID_1: u16 = 0x10; @@ -215,22 +220,22 @@ mod tests { fn generic_tmtc_server( addr: &SocketAddr, - tc_receiver: mpsc::Sender>, + tc_sender: mpsc::Sender, tm_source: SyncTmSource, packet_id_lookup: HashSet, stop_signal: Option>, ) -> TcpSpacepacketsServer< SyncTmSource, - mpsc::Sender>, + mpsc::Sender, HashSet, ConnectionFinishedHandler, (), GenericSendError, > { TcpSpacepacketsServer::new( - ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), + ServerConfig::new(TCP_SERVER_ID, *addr, Duration::from_millis(2), 1024, 1024), tm_source, - tc_receiver, + tc_sender, packet_id_lookup, ConnectionFinishedHandler::default(), stop_signal, @@ -294,7 +299,7 @@ mod tests { panic!("connection was not handled properly"); } let packet = tc_receiver.try_recv().expect("receiving TC failed"); - assert_eq!(packet, tc_0); + assert_eq!(packet.packet, tc_0); matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)); } @@ -401,9 +406,9 @@ mod tests { } // Check that TC has arrived. let packet_0 = tc_receiver.try_recv().expect("receiving TC failed"); - assert_eq!(packet_0, tc_0); + assert_eq!(packet_0.packet, tc_0); let packet_1 = tc_receiver.try_recv().expect("receiving TC failed"); - assert_eq!(packet_1, tc_1); + assert_eq!(packet_1.packet, tc_1); matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)); } } diff --git a/satrs/src/hal/std/udp_server.rs b/satrs/src/hal/std/udp_server.rs index 90353c9..4211c77 100644 --- a/satrs/src/hal/std/udp_server.rs +++ b/satrs/src/hal/std/udp_server.rs @@ -1,5 +1,6 @@ //! Generic UDP TC server. use crate::tmtc::PacketSenderRaw; +use crate::ComponentId; use core::fmt::Debug; use std::io::{self, ErrorKind}; use std::net::{SocketAddr, ToSocketAddrs, UdpSocket}; @@ -22,13 +23,16 @@ use std::vec::Vec; /// use std::sync::mpsc; /// use spacepackets::ecss::WritablePusPacket; /// use satrs::hal::std::udp_server::UdpTcServer; +/// use satrs::ComponentId; /// use satrs::tmtc::PacketSenderRaw; /// use spacepackets::SpHeader; /// use spacepackets::ecss::tc::PusTcCreator; /// +/// const UDP_SERVER_ID: ComponentId = 0x05; +/// /// let (packet_sender, packet_receiver) = mpsc::channel(); /// let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777); -/// let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, packet_sender) +/// let mut udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, dest_addr, 2048, packet_sender) /// .expect("Creating UDP TMTC server failed"); /// let sph = SpHeader::new_from_apid(0x02); /// let pus_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true); @@ -43,8 +47,9 @@ use std::vec::Vec; /// let recv_result = udp_tc_server.try_recv_tc(); /// assert!(recv_result.is_ok()); /// // The packet is received by the UDP TC server and sent via the mpsc channel. -/// let sent_packet = packet_receiver.try_recv().expect("expected telecommand"); -/// assert_eq!(sent_packet, ping_tc_raw); +/// let sent_packet_with_sender = packet_receiver.try_recv().expect("expected telecommand"); +/// assert_eq!(sent_packet_with_sender.packet, ping_tc_raw); +/// assert_eq!(sent_packet_with_sender.sender_id, UDP_SERVER_ID); /// // No more packets received. /// matches!(packet_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)); /// ``` @@ -55,6 +60,7 @@ use std::vec::Vec; /// on how to use this TC server. It uses the server to receive PUS telecommands on a specific port /// and then forwards them to a generic CCSDS packet receiver. pub struct UdpTcServer, SendError> { + pub id: ComponentId, pub socket: UdpSocket, recv_buf: Vec, sender_addr: Option, @@ -75,11 +81,13 @@ impl, SendError: Debug + 'static> UdpTcServer { pub fn new( + id: ComponentId, addr: A, max_recv_size: usize, tc_sender: TcSender, ) -> Result { let server = Self { + id, socket: UdpSocket::bind(addr)?, recv_buf: vec![0; max_recv_size], sender_addr: None, @@ -103,7 +111,7 @@ impl, SendError: Debug + 'static> let (num_bytes, from) = res; self.sender_addr = Some(from); self.tc_sender - .send_raw_tc(&self.recv_buf[0..num_bytes]) + .send_packet(self.id, &self.recv_buf[0..num_bytes]) .map_err(ReceiveResult::Send)?; Ok(res) } @@ -118,6 +126,7 @@ mod tests { use crate::hal::std::udp_server::{ReceiveResult, UdpTcServer}; use crate::queue::GenericSendError; use crate::tmtc::PacketSenderRaw; + use crate::ComponentId; use core::cell::RefCell; use spacepackets::ecss::tc::PusTcCreator; use spacepackets::ecss::WritablePusPacket; @@ -128,6 +137,8 @@ mod tests { fn is_send(_: &T) {} + const UDP_SERVER_ID: ComponentId = 0x05; + #[derive(Default)] struct PingReceiver { pub sent_cmds: RefCell>>, @@ -136,7 +147,8 @@ mod tests { impl PacketSenderRaw for PingReceiver { type Error = GenericSendError; - fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> { + fn send_packet(&self, sender_id: ComponentId, tc_raw: &[u8]) -> Result<(), Self::Error> { + assert_eq!(sender_id, UDP_SERVER_ID); let mut sent_data = Vec::new(); sent_data.extend_from_slice(tc_raw); let mut queue = self.sent_cmds.borrow_mut(); @@ -151,7 +163,7 @@ mod tests { let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777); let ping_receiver = PingReceiver::default(); is_send(&ping_receiver); - let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, ping_receiver) + let mut udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, dest_addr, 2048, ping_receiver) .expect("Creating UDP TMTC server failed"); is_send(&udp_tc_server); let sph = SpHeader::new_from_apid(0x02); @@ -182,7 +194,7 @@ mod tests { fn test_nothing_received() { let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7779); let ping_receiver = PingReceiver::default(); - let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, ping_receiver) + let mut udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, dest_addr, 2048, ping_receiver) .expect("Creating UDP TMTC server failed"); let res = udp_tc_server.try_recv_tc(); assert!(res.is_err()); diff --git a/satrs/src/pus/action.rs b/satrs/src/pus/action.rs index 07713a0..ee26330 100644 --- a/satrs/src/pus/action.rs +++ b/satrs/src/pus/action.rs @@ -195,610 +195,7 @@ pub mod std_mod { mpsc::SyncSender>, mpsc::Receiver>, >; - - /* - pub type ModeRequestorAndHandlerMpsc = ModeInterface< - mpsc::Sender>, - mpsc::Receiver>, - mpsc::Sender>, - mpsc::Receiver>, - >; - pub type ModeRequestorAndHandlerMpscBounded = ModeInterface< - mpsc::SyncSender>, - mpsc::Receiver>, - mpsc::SyncSender>, - mpsc::Receiver>, - >; - */ } #[cfg(test)] -mod tests { - /* - use core::{cell::RefCell, time::Duration}; - use std::{sync::mpsc, time::SystemTimeError}; - - use alloc::{collections::VecDeque, vec::Vec}; - use delegate::delegate; - - use spacepackets::{ - ecss::{ - tc::{PusTcCreator, PusTcReader}, - tm::PusTmReader, - PusPacket, - }, - time::{cds, TimeWriter}, - CcsdsPacket, - }; - - use crate::{ - action::ActionRequestVariant, - params::{self, ParamsRaw, WritableToBeBytes}, - pus::{ - tests::{ - PusServiceHandlerWithVecCommon, PusTestHarness, SimplePusPacketHandler, - TestConverter, TestRouter, APP_DATA_TOO_SHORT, - }, - verification::{ - self, - tests::{SharedVerificationMap, TestVerificationReporter, VerificationStatus}, - FailParams, TcStateAccepted, TcStateNone, TcStateStarted, - VerificationReportingProvider, - }, - EcssTcInMemConverter, EcssTcInVecConverter, EcssTmtcError, GenericRoutingError, - MpscTcReceiver, PusPacketHandlerResult, PusPacketHandlingError, PusRequestRouter, - PusServiceHelper, PusTcToRequestConverter, TmAsVecSenderWithMpsc, - }, - }; - - use super::*; - - impl PusRequestRouter for TestRouter { - type Error = GenericRoutingError; - - fn route( - &self, - target_id: TargetId, - request: Request, - _token: VerificationToken, - ) -> Result<(), Self::Error> { - self.routing_requests - .borrow_mut() - .push_back((target_id, request)); - self.check_for_injected_error() - } - - fn handle_error( - &self, - target_id: TargetId, - token: VerificationToken, - tc: &PusTcReader, - error: Self::Error, - time_stamp: &[u8], - verif_reporter: &impl VerificationReportingProvider, - ) { - self.routing_errors - .borrow_mut() - .push_back((target_id, error)); - } - } - - impl PusTcToRequestConverter for TestConverter<8> { - type Error = PusPacketHandlingError; - fn convert( - &mut self, - token: VerificationToken, - tc: &PusTcReader, - time_stamp: &[u8], - verif_reporter: &impl VerificationReportingProvider, - ) -> Result<(TargetId, ActionRequest), Self::Error> { - self.conversion_request.push_back(tc.raw_data().to_vec()); - self.check_service(tc)?; - let target_id = tc.apid(); - if tc.user_data().len() < 4 { - verif_reporter - .start_failure( - token, - FailParams::new( - time_stamp, - &APP_DATA_TOO_SHORT, - (tc.user_data().len() as u32).to_be_bytes().as_ref(), - ), - ) - .expect("start success failure"); - return Err(PusPacketHandlingError::NotEnoughAppData { - expected: 4, - found: tc.user_data().len(), - }); - } - if tc.subservice() == 1 { - verif_reporter - .start_success(token, time_stamp) - .expect("start success failure"); - return Ok(( - target_id.into(), - ActionRequest { - action_id: u32::from_be_bytes(tc.user_data()[0..4].try_into().unwrap()), - variant: ActionRequestVariant::VecData(tc.user_data()[4..].to_vec()), - }, - )); - } - Err(PusPacketHandlingError::InvalidAppData( - "unexpected app data".into(), - )) - } - } - - pub struct PusDynRequestHandler { - srv_helper: PusServiceHelper< - MpscTcReceiver, - TmAsVecSenderWithMpsc, - EcssTcInVecConverter, - TestVerificationReporter, - >, - request_converter: TestConverter, - request_router: TestRouter, - } - - struct Pus8RequestTestbenchWithVec { - common: PusServiceHandlerWithVecCommon, - handler: PusDynRequestHandler<8, ActionRequest>, - } - - impl Pus8RequestTestbenchWithVec { - pub fn new() -> Self { - let (common, srv_helper) = PusServiceHandlerWithVecCommon::new_with_test_verif_sender(); - Self { - common, - handler: PusDynRequestHandler { - srv_helper, - request_converter: TestConverter::default(), - request_router: TestRouter::default(), - }, - } - } - - delegate! { - to self.handler.request_converter { - pub fn check_next_conversion(&mut self, tc: &PusTcCreator); - } - } - delegate! { - to self.handler.request_router { - pub fn retrieve_next_request(&mut self) -> (TargetId, ActionRequest); - } - } - delegate! { - to self.handler.request_router { - pub fn retrieve_next_routing_error(&mut self) -> (TargetId, GenericRoutingError); - } - } - } - - impl PusTestHarness for Pus8RequestTestbenchWithVec { - delegate! { - to self.common { - fn send_tc(&mut self, tc: &PusTcCreator) -> VerificationToken; - fn read_next_tm(&mut self) -> PusTmReader<'_>; - fn check_no_tm_available(&self) -> bool; - fn check_next_verification_tm( - &self, - subservice: u8, - expected_request_id: verification::RequestId, - ); - } - } - } - impl SimplePusPacketHandler for Pus8RequestTestbenchWithVec { - fn handle_one_tc(&mut self) -> Result { - let possible_packet = self.handler.srv_helper.retrieve_and_accept_next_packet()?; - if possible_packet.is_none() { - return Ok(PusPacketHandlerResult::Empty); - } - let ecss_tc_and_token = possible_packet.unwrap(); - let tc = self - .handler - .srv_helper - .tc_in_mem_converter - .convert_ecss_tc_in_memory_to_reader(&ecss_tc_and_token.tc_in_memory)?; - let time_stamp = cds::TimeProvider::from_now_with_u16_days() - .expect("timestamp generation failed") - .to_vec() - .unwrap(); - let (target_id, action_request) = self.handler.request_converter.convert( - ecss_tc_and_token.token, - &tc, - &time_stamp, - &self.handler.srv_helper.common.verification_handler, - )?; - if let Err(e) = self.handler.request_router.route( - target_id, - action_request, - ecss_tc_and_token.token, - ) { - self.handler.request_router.handle_error( - target_id, - ecss_tc_and_token.token, - &tc, - e.clone(), - &time_stamp, - &self.handler.srv_helper.common.verification_handler, - ); - return Err(e.into()); - } - Ok(PusPacketHandlerResult::RequestHandled) - } - } - - const TIMEOUT_ERROR_CODE: ResultU16 = ResultU16::new(1, 2); - const COMPLETION_ERROR_CODE: ResultU16 = ResultU16::new(2, 0); - const COMPLETION_ERROR_CODE_STEP: ResultU16 = ResultU16::new(2, 1); - - #[derive(Default)] - pub struct TestReplyHandlerHook { - pub unexpected_replies: VecDeque, - pub timeouts: RefCell>, - } - - impl ReplyHandlerHook for TestReplyHandlerHook { - fn handle_unexpected_reply(&mut self, reply: &GenericActionReplyPus) { - self.unexpected_replies.push_back(reply.clone()); - } - - fn timeout_callback(&self, active_request: &ActivePusActionRequest) { - self.timeouts.borrow_mut().push_back(active_request.clone()); - } - - fn timeout_error_code(&self) -> ResultU16 { - TIMEOUT_ERROR_CODE - } - } - - pub struct Pus8ReplyTestbench { - verif_reporter: TestVerificationReporter, - #[allow(dead_code)] - ecss_tm_receiver: mpsc::Receiver>, - handler: PusService8ReplyHandler< - TestVerificationReporter, - DefaultActiveActionRequestMap, - TestReplyHandlerHook, - mpsc::Sender>, - >, - } - - impl Pus8ReplyTestbench { - pub fn new(normal_ctor: bool) -> Self { - let reply_handler_hook = TestReplyHandlerHook::default(); - let shared_verif_map = SharedVerificationMap::default(); - let test_verif_reporter = TestVerificationReporter::new(shared_verif_map.clone()); - let (ecss_tm_sender, ecss_tm_receiver) = mpsc::channel(); - let reply_handler = if normal_ctor { - PusService8ReplyHandler::new_from_now_with_default_map( - test_verif_reporter.clone(), - 128, - reply_handler_hook, - ecss_tm_sender, - ) - .expect("creating reply handler failed") - } else { - PusService8ReplyHandler::new_from_now( - test_verif_reporter.clone(), - DefaultActiveActionRequestMap::default(), - 128, - reply_handler_hook, - ecss_tm_sender, - ) - .expect("creating reply handler failed") - }; - Self { - verif_reporter: test_verif_reporter, - ecss_tm_receiver, - handler: reply_handler, - } - } - - pub fn init_handling_for_request( - &mut self, - request_id: RequestId, - _action_id: ActionId, - ) -> VerificationToken { - assert!(!self.handler.request_active(request_id)); - // let action_req = ActionRequest::new(action_id, ActionRequestVariant::NoData); - let token = self.add_tc_with_req_id(request_id.into()); - let token = self - .verif_reporter - .acceptance_success(token, &[]) - .expect("acceptance success failure"); - let token = self - .verif_reporter - .start_success(token, &[]) - .expect("start success failure"); - let verif_info = self - .verif_reporter - .verification_info(&verification::RequestId::from(request_id)) - .expect("no verification info found"); - assert!(verif_info.started.expect("request was not started")); - assert!(verif_info.accepted.expect("request was not accepted")); - token - } - - pub fn next_unrequested_reply(&self) -> Option { - self.handler.user_hook.unexpected_replies.front().cloned() - } - - pub fn assert_request_completion_success(&self, step: Option, request_id: RequestId) { - let verif_info = self - .verif_reporter - .verification_info(&verification::RequestId::from(request_id)) - .expect("no verification info found"); - self.assert_request_completion_common(request_id, &verif_info, step, true); - } - - pub fn assert_request_completion_failure( - &self, - step: Option, - request_id: RequestId, - fail_enum: ResultU16, - fail_data: &[u8], - ) { - let verif_info = self - .verif_reporter - .verification_info(&verification::RequestId::from(request_id)) - .expect("no verification info found"); - self.assert_request_completion_common(request_id, &verif_info, step, false); - assert_eq!(verif_info.fail_enum.unwrap(), fail_enum.raw() as u64); - assert_eq!(verif_info.failure_data.unwrap(), fail_data); - } - - pub fn assert_request_completion_common( - &self, - request_id: RequestId, - verif_info: &VerificationStatus, - step: Option, - completion_success: bool, - ) { - if let Some(step) = step { - assert!(verif_info.step_status.is_some()); - assert!(verif_info.step_status.unwrap()); - assert_eq!(step, verif_info.step); - } - assert_eq!( - verif_info.completed.expect("request is not completed"), - completion_success - ); - assert!(!self.handler.request_active(request_id)); - } - - pub fn assert_request_step_failure(&self, step: u16, request_id: RequestId) { - let verif_info = self - .verif_reporter - .verification_info(&verification::RequestId::from(request_id)) - .expect("no verification info found"); - assert!(verif_info.step_status.is_some()); - assert!(!verif_info.step_status.unwrap()); - assert_eq!(step, verif_info.step); - } - pub fn add_routed_request( - &mut self, - request_id: verification::RequestId, - target_id: TargetId, - action_id: ActionId, - token: VerificationToken, - timeout: Duration, - ) { - if self.handler.request_active(request_id.into()) { - panic!("request already present"); - } - self.handler - .add_routed_action_request(request_id, target_id, action_id, token, timeout); - if !self.handler.request_active(request_id.into()) { - panic!("request should be active now"); - } - } - - delegate! { - to self.handler { - pub fn request_active(&self, request_id: RequestId) -> bool; - - pub fn handle_action_reply( - &mut self, - action_reply_with_ids: GenericMessage, - time_stamp: &[u8] - ) -> Result<(), EcssTmtcError>; - - pub fn update_time_from_now(&mut self) -> Result<(), SystemTimeError>; - - pub fn check_for_timeouts(&mut self, time_stamp: &[u8]) -> Result<(), EcssTmtcError>; - } - to self.verif_reporter { - fn add_tc_with_req_id(&mut self, req_id: verification::RequestId) -> VerificationToken; - } - } - } - - #[test] - fn test_reply_handler_completion_success() { - let mut reply_testbench = Pus8ReplyTestbench::new(true); - let sender_id = 0x06; - let request_id = 0x02; - let target_id = 0x05; - let action_id = 0x03; - let token = reply_testbench.init_handling_for_request(request_id, action_id); - reply_testbench.add_routed_request( - request_id.into(), - target_id, - action_id, - token, - Duration::from_millis(1), - ); - assert!(reply_testbench.request_active(request_id)); - let action_reply = GenericMessage::new( - request_id, - sender_id, - ActionReplyPusWithActionId { - action_id, - variant: ActionReplyPus::Completed, - }, - ); - reply_testbench - .handle_action_reply(action_reply, &[]) - .expect("reply handling failure"); - reply_testbench.assert_request_completion_success(None, request_id); - } - - #[test] - fn test_reply_handler_step_success() { - let mut reply_testbench = Pus8ReplyTestbench::new(false); - let request_id = 0x02; - let target_id = 0x05; - let action_id = 0x03; - let token = reply_testbench.init_handling_for_request(request_id, action_id); - reply_testbench.add_routed_request( - request_id.into(), - target_id, - action_id, - token, - Duration::from_millis(1), - ); - let action_reply = GenericActionReplyPus::new_action_reply( - request_id, - action_id, - action_id, - ActionReplyPus::StepSuccess { step: 1 }, - ); - reply_testbench - .handle_action_reply(action_reply, &[]) - .expect("reply handling failure"); - let action_reply = GenericActionReplyPus::new_action_reply( - request_id, - action_id, - action_id, - ActionReplyPus::Completed, - ); - reply_testbench - .handle_action_reply(action_reply, &[]) - .expect("reply handling failure"); - reply_testbench.assert_request_completion_success(Some(1), request_id); - } - - #[test] - fn test_reply_handler_completion_failure() { - let mut reply_testbench = Pus8ReplyTestbench::new(true); - let sender_id = 0x01; - let request_id = 0x02; - let target_id = 0x05; - let action_id = 0x03; - let token = reply_testbench.init_handling_for_request(request_id, action_id); - reply_testbench.add_routed_request( - request_id.into(), - target_id, - action_id, - token, - Duration::from_millis(1), - ); - let params_raw = ParamsRaw::U32(params::U32(5)); - let action_reply = GenericActionReplyPus::new_action_reply( - request_id, - sender_id, - action_id, - ActionReplyPus::CompletionFailed { - error_code: COMPLETION_ERROR_CODE, - params: params_raw.into(), - }, - ); - reply_testbench - .handle_action_reply(action_reply, &[]) - .expect("reply handling failure"); - reply_testbench.assert_request_completion_failure( - None, - request_id, - COMPLETION_ERROR_CODE, - ¶ms_raw.to_vec().unwrap(), - ); - } - - #[test] - fn test_reply_handler_step_failure() { - let mut reply_testbench = Pus8ReplyTestbench::new(false); - let sender_id = 0x01; - let request_id = 0x02; - let target_id = 0x05; - let action_id = 0x03; - let token = reply_testbench.init_handling_for_request(request_id, action_id); - reply_testbench.add_routed_request( - request_id.into(), - target_id, - action_id, - token, - Duration::from_millis(1), - ); - let action_reply = GenericActionReplyPus::new_action_reply( - request_id, - sender_id, - action_id, - ActionReplyPus::StepFailed { - error_code: COMPLETION_ERROR_CODE_STEP, - step: 2, - params: ParamsRaw::U32(crate::params::U32(5)).into(), - }, - ); - reply_testbench - .handle_action_reply(action_reply, &[]) - .expect("reply handling failure"); - reply_testbench.assert_request_step_failure(2, request_id); - } - - #[test] - fn test_reply_handler_timeout_handling() { - let mut reply_testbench = Pus8ReplyTestbench::new(true); - let request_id = 0x02; - let target_id = 0x06; - let action_id = 0x03; - let token = reply_testbench.init_handling_for_request(request_id, action_id); - reply_testbench.add_routed_request( - request_id.into(), - target_id, - action_id, - token, - Duration::from_millis(1), - ); - let timeout_param = Duration::from_millis(1).as_millis() as u64; - let timeout_param_raw = timeout_param.to_be_bytes(); - std::thread::sleep(Duration::from_millis(2)); - reply_testbench - .update_time_from_now() - .expect("time update failure"); - reply_testbench.check_for_timeouts(&[]).unwrap(); - reply_testbench.assert_request_completion_failure( - None, - request_id, - TIMEOUT_ERROR_CODE, - &timeout_param_raw, - ); - } - - #[test] - fn test_unrequested_reply() { - let mut reply_testbench = Pus8ReplyTestbench::new(true); - let sender_id = 0x01; - let request_id = 0x02; - let action_id = 0x03; - - let action_reply = GenericActionReplyPus::new_action_reply( - request_id, - sender_id, - action_id, - ActionReplyPus::Completed, - ); - reply_testbench - .handle_action_reply(action_reply, &[]) - .expect("reply handling failure"); - let reply = reply_testbench.next_unrequested_reply(); - assert!(reply.is_some()); - let reply = reply.unwrap(); - assert_eq!(reply.message.action_id, action_id); - assert_eq!(reply.request_id, request_id); - assert_eq!(reply.message.variant, ActionReplyPus::Completed); - } - */ -} +mod tests {} diff --git a/satrs/src/pus/event_man.rs b/satrs/src/pus/event_man.rs index c69bbab..e859ea6 100644 --- a/satrs/src/pus/event_man.rs +++ b/satrs/src/pus/event_man.rs @@ -258,7 +258,7 @@ pub mod alloc_mod { mod tests { use super::*; use crate::events::SeverityInfo; - use crate::pus::PusTmAsVec; + use crate::pus::PacketAsVec; use crate::request::UniqueApidTargetId; use std::sync::mpsc::{self, TryRecvError}; @@ -284,7 +284,7 @@ mod tests { #[test] fn test_basic() { let event_man = create_basic_man_1(); - let (event_tx, event_rx) = mpsc::channel::(); + let (event_tx, event_rx) = mpsc::channel::(); let event_sent = event_man .generate_pus_event_tm(&event_tx, &EMPTY_STAMP, INFO_EVENT, None) .expect("Sending info event failed"); @@ -297,7 +297,7 @@ mod tests { #[test] fn test_disable_event() { let mut event_man = create_basic_man_2(); - let (event_tx, event_rx) = mpsc::channel::(); + let (event_tx, event_rx) = mpsc::channel::(); // let mut sender = TmAsVecSenderWithMpsc::new(0, "test", event_tx); let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT); assert!(res.is_ok()); @@ -320,7 +320,7 @@ mod tests { #[test] fn test_reenable_event() { let mut event_man = create_basic_man_1(); - let (event_tx, event_rx) = mpsc::channel::(); + let (event_tx, event_rx) = mpsc::channel::(); let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT); assert!(res.is_ok()); assert!(res.unwrap()); diff --git a/satrs/src/pus/event_srv.rs b/satrs/src/pus/event_srv.rs index 6656b69..0bff0bc 100644 --- a/satrs/src/pus/event_srv.rs +++ b/satrs/src/pus/event_srv.rs @@ -9,12 +9,12 @@ use std::sync::mpsc::Sender; use super::verification::VerificationReportingProvider; use super::{ - EcssTcInMemConverter, EcssTcReceiverCore, EcssTmSender, GenericConversionError, + EcssTcInMemConverter, EcssTcReceiver, EcssTmSender, GenericConversionError, GenericRoutingError, PusServiceHelper, }; pub struct PusEventServiceHandler< - TcReceiver: EcssTcReceiverCore, + TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, @@ -25,7 +25,7 @@ pub struct PusEventServiceHandler< } impl< - TcReceiver: EcssTcReceiverCore, + TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, @@ -167,7 +167,8 @@ mod tests { use crate::pus::verification::{ RequestId, VerificationReporter, VerificationReportingProvider, }; - use crate::pus::{GenericConversionError, MpscTcReceiver, MpscTmInSharedPoolSenderBounded}; + use crate::pus::{GenericConversionError, MpscTcReceiver}; + use crate::tmtc::PacketSenderWithSharedPool; use crate::{ events::EventU32, pus::{ @@ -186,7 +187,7 @@ mod tests { common: PusServiceHandlerWithSharedStoreCommon, handler: PusEventServiceHandler< MpscTcReceiver, - MpscTmInSharedPoolSenderBounded, + PacketSenderWithSharedPool, EcssTcInSharedStoreConverter, VerificationReporter, >, diff --git a/satrs/src/pus/mod.rs b/satrs/src/pus/mod.rs index ee78c3b..9bd79fd 100644 --- a/satrs/src/pus/mod.rs +++ b/satrs/src/pus/mod.rs @@ -154,7 +154,7 @@ pub trait ChannelWithId: Send { /// /// This sender object is responsible for sending PUS telemetry to a TM sink. pub trait EcssTmSender: Send { - fn send_tm(&self, source_id: ComponentId, tm: PusTmVariant) -> Result<(), EcssTmtcError>; + fn send_tm(&self, sender_id: ComponentId, tm: PusTmVariant) -> Result<(), EcssTmtcError>; } /// Generic trait for a user supplied sender object. @@ -269,14 +269,19 @@ impl From for TryRecvTmtcError { } /// Generic trait for a user supplied receiver object. -pub trait EcssTcReceiverCore { +pub trait EcssTcReceiver { fn recv_tc(&self) -> Result; } -/// Generic trait for objects which can receive ECSS PUS telecommands. -pub trait ReceivesEcssPusTc: Send { +/// Generic trait for objects which can send ECSS PUS telecommands. +pub trait PacketSenderPusTc: Send { type Error; - fn pass_pus_tc(&mut self, header: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error>; + fn send_pus_tc( + &self, + sender_id: ComponentId, + header: &SpHeader, + pus_tc: &PusTcReader, + ) -> Result<(), Self::Error>; } pub trait ActiveRequestMapProvider: Sized { @@ -349,7 +354,7 @@ pub mod alloc_mod { use super::*; - /// Extension trait for [EcssTmSenderCore]. + /// Extension trait for [EcssTmSender]. /// /// It provides additional functionality, for example by implementing the [Downcast] trait /// and the [DynClone] trait. @@ -370,7 +375,7 @@ pub mod alloc_mod { fn upcast_mut(&mut self) -> &mut dyn EcssTmSender; } - /// Blanket implementation for all types which implement [EcssTmSenderCore] and are clonable. + /// Blanket implementation for all types which implement [EcssTmSender] and are clonable. impl EcssTmSenderExt for T where T: EcssTmSender + Clone + 'static, @@ -390,7 +395,7 @@ pub mod alloc_mod { dyn_clone::clone_trait_object!(EcssTmSenderExt); impl_downcast!(EcssTmSenderExt); - /// Extension trait for [EcssTcSenderCore]. + /// Extension trait for [EcssTcSender]. /// /// It provides additional functionality, for example by implementing the [Downcast] trait /// and the [DynClone] trait. @@ -404,13 +409,13 @@ pub mod alloc_mod { #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] pub trait EcssTcSenderExt: EcssTcSender + Downcast + DynClone {} - /// Blanket implementation for all types which implement [EcssTcSenderCore] and are clonable. + /// Blanket implementation for all types which implement [EcssTcSender] and are clonable. impl EcssTcSenderExt for T where T: EcssTcSender + Clone + 'static {} dyn_clone::clone_trait_object!(EcssTcSenderExt); impl_downcast!(EcssTcSenderExt); - /// Extension trait for [EcssTcReceiverCore]. + /// Extension trait for [EcssTcReceiver]. /// /// It provides additional functionality, for example by implementing the [Downcast] trait /// and the [DynClone] trait. @@ -422,12 +427,12 @@ pub mod alloc_mod { /// [Clone]. #[cfg(feature = "alloc")] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] - pub trait EcssTcReceiver: EcssTcReceiverCore + Downcast {} + pub trait EcssTcReceiverExt: EcssTcReceiver + Downcast {} - /// Blanket implementation for all types which implement [EcssTcReceiverCore] and are clonable. - impl EcssTcReceiver for T where T: EcssTcReceiverCore + 'static {} + /// Blanket implementation for all types which implement [EcssTcReceiver] and are clonable. + impl EcssTcReceiverExt for T where T: EcssTcReceiver + 'static {} - impl_downcast!(EcssTcReceiver); + impl_downcast!(EcssTcReceiverExt); /// This trait is an abstraction for the conversion of a PUS telecommand into a generic request /// type. @@ -652,15 +657,14 @@ pub mod std_mod { }; use crate::pus::verification::{TcStateAccepted, VerificationToken}; use crate::pus::{ - EcssTcAndToken, EcssTcReceiverCore, EcssTmSender, EcssTmtcError, GenericReceiveError, + EcssTcAndToken, EcssTcReceiver, EcssTmSender, EcssTmtcError, GenericReceiveError, GenericSendError, PusTmVariant, TryRecvTmtcError, }; - use crate::tmtc::{PacketSenderSharedPool, PusTmPool, SharedPacketPool}; + use crate::tmtc::PacketSenderWithSharedPool; use crate::ComponentId; use alloc::vec::Vec; use core::time::Duration; use spacepackets::ecss::tc::PusTcReader; - use spacepackets::ecss::tm::PusTmCreator; use spacepackets::ecss::WritablePusPacket; use spacepackets::time::StdTimestampError; use spacepackets::ByteConversionError; @@ -676,23 +680,32 @@ pub mod std_mod { use super::{AcceptedEcssTcAndToken, ActiveRequestProvider, TcInMemory}; #[derive(Debug)] - pub struct PusTmInPool { - pub source_id: ComponentId, + pub struct PacketInPool { + pub sender_id: ComponentId, pub store_addr: StoreAddr, } + impl PacketInPool { + pub fn new(sender_id: ComponentId, store_addr: StoreAddr) -> Self { + Self { + sender_id, + store_addr, + } + } + } + impl From> for EcssTmtcError { fn from(_: mpsc::SendError) -> Self { Self::Send(GenericSendError::RxDisconnected) } } - impl EcssTmSender for mpsc::Sender { + impl EcssTmSender for mpsc::Sender { fn send_tm(&self, source_id: ComponentId, tm: PusTmVariant) -> Result<(), EcssTmtcError> { match tm { PusTmVariant::InStore(store_addr) => self - .send(PusTmInPool { - source_id, + .send(PacketInPool { + sender_id: source_id, store_addr, }) .map_err(|_| GenericSendError::RxDisconnected)?, @@ -702,12 +715,12 @@ pub mod std_mod { } } - impl EcssTmSender for mpsc::SyncSender { + impl EcssTmSender for mpsc::SyncSender { fn send_tm(&self, source_id: ComponentId, tm: PusTmVariant) -> Result<(), EcssTmtcError> { match tm { PusTmVariant::InStore(store_addr) => self - .try_send(PusTmInPool { - source_id, + .try_send(PacketInPool { + sender_id: source_id, store_addr, }) .map_err(|e| EcssTmtcError::Send(e.into()))?, @@ -718,20 +731,26 @@ pub mod std_mod { } #[derive(Debug)] - pub struct PusTmAsVec { - pub source_id: ComponentId, + pub struct PacketAsVec { + pub sender_id: ComponentId, pub packet: Vec, } - pub type MpscTmAsVecSender = mpsc::Sender; + impl PacketAsVec { + pub fn new(sender_id: ComponentId, packet: Vec) -> Self { + Self { sender_id, packet } + } + } + + pub type MpscTmAsVecSender = mpsc::Sender; impl EcssTmSender for MpscTmAsVecSender { fn send_tm(&self, source_id: ComponentId, tm: PusTmVariant) -> Result<(), EcssTmtcError> { match tm { PusTmVariant::InStore(addr) => return Err(EcssTmtcError::CantSendAddr(addr)), PusTmVariant::Direct(tm) => self - .send(PusTmAsVec { - source_id, + .send(PacketAsVec { + sender_id: source_id, packet: tm.to_vec()?, }) .map_err(|e| EcssTmtcError::Send(e.into()))?, @@ -740,15 +759,15 @@ pub mod std_mod { } } - pub type MpscTmAsVecSenderBounded = mpsc::SyncSender; + pub type MpscTmAsVecSenderBounded = mpsc::SyncSender; impl EcssTmSender for MpscTmAsVecSenderBounded { fn send_tm(&self, source_id: ComponentId, tm: PusTmVariant) -> Result<(), EcssTmtcError> { match tm { PusTmVariant::InStore(addr) => return Err(EcssTmtcError::CantSendAddr(addr)), PusTmVariant::Direct(tm) => self - .send(PusTmAsVec { - source_id, + .send(PacketAsVec { + sender_id: source_id, packet: tm.to_vec()?, }) .map_err(|e| EcssTmtcError::Send(e.into()))?, @@ -757,50 +776,9 @@ pub mod std_mod { } } - // TODO: This is a duplication of an existing shared store packet sender. Try to remove it.. - /* - #[derive(Clone)] - pub struct TmInSharedPoolSender { - shared_tm_store: SharedPacketPool, - sender: Sender, - } - - impl TmInSharedPoolSender { - pub fn send_direct_tm( - &self, - source_id: ComponentId, - tm: PusTmCreator, - ) -> Result<(), EcssTmtcError> { - let addr = self.shared_tm_store.add_pus_tm_from_creator(&tm)?; - self.sender.send_tm(source_id, PusTmVariant::InStore(addr)) - } - } - - impl EcssTmSenderCore for TmInSharedPoolSender { - fn send_tm(&self, source_id: ComponentId, tm: PusTmVariant) -> Result<(), EcssTmtcError> { - if let PusTmVariant::Direct(tm) = tm { - return self.send_direct_tm(source_id, tm); - } - self.sender.send_tm(source_id, tm) - } - } - - impl TmInSharedPoolSender { - pub fn new(shared_tm_store: SharedPacketPool, sender: Sender) -> Self { - Self { - shared_tm_store, - sender, - } - } - } - - pub type MpscTmInSharedPoolSender = TmInSharedPoolSender>; - pub type MpscTmInSharedPoolSenderBounded = TmInSharedPoolSender>; - */ - pub type MpscTcReceiver = mpsc::Receiver; - impl EcssTcReceiverCore for MpscTcReceiver { + impl EcssTcReceiver for MpscTcReceiver { fn recv_tc(&self) -> Result { self.try_recv().map_err(|e| match e { TryRecvError::Empty => TryRecvTmtcError::Empty, @@ -816,8 +794,6 @@ pub mod std_mod { use super::*; use crossbeam_channel as cb; - pub type TmInSharedPoolSenderWithCrossbeam = TmInSharedPoolSender>; - impl From> for EcssTmtcError { fn from(_: cb::SendError) -> Self { Self::Send(GenericSendError::RxDisconnected) @@ -835,37 +811,31 @@ pub mod std_mod { } } - impl EcssTmSender for cb::Sender { + impl EcssTmSender for cb::Sender { fn send_tm( &self, - source_id: ComponentId, + sender_id: ComponentId, tm: PusTmVariant, ) -> Result<(), EcssTmtcError> { match tm { PusTmVariant::InStore(addr) => self - .try_send(PusTmInPool { - source_id, - store_addr: addr, - }) + .try_send(PacketInPool::new(sender_id, addr)) .map_err(|e| EcssTmtcError::Send(e.into()))?, PusTmVariant::Direct(_) => return Err(EcssTmtcError::CantSendDirectTm), }; Ok(()) } } - impl EcssTmSender for cb::Sender { + impl EcssTmSender for cb::Sender { fn send_tm( &self, - source_id: ComponentId, + sender_id: ComponentId, tm: PusTmVariant, ) -> Result<(), EcssTmtcError> { match tm { PusTmVariant::InStore(addr) => return Err(EcssTmtcError::CantSendAddr(addr)), PusTmVariant::Direct(tm) => self - .send(PusTmAsVec { - source_id, - packet: tm.to_vec()?, - }) + .send(PacketAsVec::new(sender_id, tm.to_vec()?)) .map_err(|e| EcssTmtcError::Send(e.into()))?, }; Ok(()) @@ -1112,7 +1082,7 @@ pub mod std_mod { } pub struct PusServiceBase< - TcReceiver: EcssTcReceiverCore, + TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, VerificationReporter: VerificationReportingProvider, > { @@ -1132,7 +1102,7 @@ pub mod std_mod { /// by using the [EcssTcInMemConverter] abstraction. This object provides some convenience /// methods to make the generic parts of TC handling easier. pub struct PusServiceHelper< - TcReceiver: EcssTcReceiverCore, + TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, @@ -1142,7 +1112,7 @@ pub mod std_mod { } impl< - TcReceiver: EcssTcReceiverCore, + TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, @@ -1174,7 +1144,7 @@ pub mod std_mod { &self.common.tm_sender } - /// This function can be used to poll the internal [EcssTcReceiverCore] object for the next + /// This function can be used to poll the internal [EcssTcReceiver] object for the next /// telecommand packet. It will return `Ok(None)` if there are not packets available. /// In any other case, it will perform the acceptance of the ECSS TC packet using the /// internal [VerificationReportingProvider] object. It will then return the telecommand @@ -1233,14 +1203,14 @@ pub mod std_mod { pub type PusServiceHelperStaticWithMpsc = PusServiceHelper< MpscTcReceiver, - PacketSenderSharedPool, + PacketSenderWithSharedPool, TcInMemConverter, VerificationReporter, >; pub type PusServiceHelperStaticWithBoundedMpsc = PusServiceHelper< MpscTcReceiver, - PacketSenderSharedPool, + PacketSenderWithSharedPool, TcInMemConverter, VerificationReporter, >; @@ -1310,7 +1280,7 @@ pub mod tests { use crate::pool::{PoolProvider, SharedStaticMemoryPool, StaticMemoryPool, StaticPoolConfig}; use crate::pus::verification::{RequestId, VerificationReporter}; - use crate::tmtc::SharedPacketPool; + use crate::tmtc::{PacketSenderWithSharedPool, SharedPacketPool}; use crate::ComponentId; use super::test_util::{TEST_APID, TEST_COMPONENT_ID_0}; @@ -1369,12 +1339,12 @@ pub mod tests { tc_pool: SharedStaticMemoryPool, tm_pool: SharedPacketPool, tc_sender: mpsc::SyncSender, - tm_receiver: mpsc::Receiver, + tm_receiver: mpsc::Receiver, } pub type PusServiceHelperStatic = PusServiceHelper< MpscTcReceiver, - MpscTmInSharedPoolSenderBounded, + PacketSenderWithSharedPool, EcssTcInSharedStoreConverter, VerificationReporter, >; @@ -1390,13 +1360,15 @@ pub mod tests { let tm_pool = StaticMemoryPool::new(pool_cfg); let shared_tc_pool = SharedStaticMemoryPool::new(RwLock::new(tc_pool)); let shared_tm_pool = SharedStaticMemoryPool::new(RwLock::new(tm_pool)); + let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool); let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::sync_channel(10); let (tm_tx, tm_rx) = mpsc::sync_channel(10); let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let verification_handler = VerificationReporter::new(TEST_COMPONENT_ID_0.id(), &verif_cfg); - let test_srv_tm_sender = TmInSharedPoolSender::new(shared_tm_pool.clone(), tm_tx); + let test_srv_tm_sender = + PacketSenderWithSharedPool::new(tm_tx, shared_tm_pool_wrapper.clone()); let in_store_converter = EcssTcInSharedStoreConverter::new(shared_tc_pool.clone(), 2048); ( @@ -1404,7 +1376,7 @@ pub mod tests { pus_buf: RefCell::new([0; 2048]), tm_buf: [0; 2048], tc_pool: shared_tc_pool, - tm_pool: shared_tm_pool, + tm_pool: shared_tm_pool_wrapper, tc_sender: test_srv_tc_tx, tm_receiver: tm_rx, }, @@ -1466,7 +1438,7 @@ pub mod tests { pub struct PusServiceHandlerWithVecCommon { current_tm: Option>, tc_sender: mpsc::Sender, - tm_receiver: mpsc::Receiver, + tm_receiver: mpsc::Receiver, } pub type PusServiceHelperDynamic = PusServiceHelper< MpscTcReceiver, diff --git a/satrs/src/pus/scheduler_srv.rs b/satrs/src/pus/scheduler_srv.rs index 081b873..e339b64 100644 --- a/satrs/src/pus/scheduler_srv.rs +++ b/satrs/src/pus/scheduler_srv.rs @@ -1,12 +1,12 @@ use super::scheduler::PusSchedulerProvider; use super::verification::{VerificationReporter, VerificationReportingProvider}; use super::{ - EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcReceiverCore, - EcssTmSender, MpscTcReceiver, PusServiceHelper, PusTmAsVec, + EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcReceiver, + EcssTmSender, MpscTcReceiver, PacketAsVec, PusServiceHelper, }; use crate::pool::PoolProvider; use crate::pus::{PusPacketHandlerResult, PusPacketHandlingError}; -use crate::tmtc::PacketSenderSharedPool; +use crate::tmtc::PacketSenderWithSharedPool; use alloc::string::ToString; use spacepackets::ecss::{scheduling, PusPacket}; use spacepackets::time::cds::CdsTime; @@ -21,7 +21,7 @@ use std::sync::mpsc; /// [Self::scheduler] and [Self::scheduler_mut] function and then use the scheduler API to release /// telecommands when applicable. pub struct PusSchedServiceHandler< - TcReceiver: EcssTcReceiverCore, + TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, @@ -33,7 +33,7 @@ pub struct PusSchedServiceHandler< } impl< - TcReceiver: EcssTcReceiverCore, + TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, @@ -212,7 +212,7 @@ impl< /// mpsc queues. pub type PusService11SchedHandlerDynWithMpsc = PusSchedServiceHandler< MpscTcReceiver, - mpsc::Sender, + mpsc::Sender, EcssTcInVecConverter, VerificationReporter, PusScheduler, @@ -221,7 +221,7 @@ pub type PusService11SchedHandlerDynWithMpsc = PusSchedServiceHand /// queues. pub type PusService11SchedHandlerDynWithBoundedMpsc = PusSchedServiceHandler< MpscTcReceiver, - mpsc::SyncSender, + mpsc::SyncSender, EcssTcInVecConverter, VerificationReporter, PusScheduler, @@ -230,7 +230,7 @@ pub type PusService11SchedHandlerDynWithBoundedMpsc = PusSchedServ /// mpsc queues. pub type PusService11SchedHandlerStaticWithMpsc = PusSchedServiceHandler< MpscTcReceiver, - PacketSenderSharedPool, + PacketSenderWithSharedPool, EcssTcInSharedStoreConverter, VerificationReporter, PusScheduler, @@ -239,7 +239,7 @@ pub type PusService11SchedHandlerStaticWithMpsc = PusSchedServiceH /// mpsc queues. pub type PusService11SchedHandlerStaticWithBoundedMpsc = PusSchedServiceHandler< MpscTcReceiver, - PacketSenderSharedPool, + PacketSenderWithSharedPool, EcssTcInSharedStoreConverter, VerificationReporter, PusScheduler, @@ -249,7 +249,7 @@ pub type PusService11SchedHandlerStaticWithBoundedMpsc = PusSchedS mod tests { use crate::pool::{StaticMemoryPool, StaticPoolConfig}; use crate::pus::test_util::{PusTestHarness, TEST_APID}; - use crate::pus::verification::VerificationReporter; + use crate::pus::verification::{VerificationReporter, VerificationReportingProvider}; use crate::pus::{ scheduler::{self, PusSchedulerProvider, TcInfo}, @@ -258,7 +258,7 @@ mod tests { EcssTcInSharedStoreConverter, }; use crate::pus::{MpscTcReceiver, PusPacketHandlerResult, PusPacketHandlingError}; - use crate::tmtc::PacketSenderSharedPool; + use crate::tmtc::PacketSenderWithSharedPool; use alloc::collections::VecDeque; use delegate::delegate; use spacepackets::ecss::scheduling::Subservice; @@ -277,7 +277,7 @@ mod tests { common: PusServiceHandlerWithSharedStoreCommon, handler: PusSchedServiceHandler< MpscTcReceiver, - PacketSenderSharedPool, + PacketSenderWithSharedPool, EcssTcInSharedStoreConverter, VerificationReporter, TestScheduler, diff --git a/satrs/src/pus/test.rs b/satrs/src/pus/test.rs index 25d200f..4b52e5a 100644 --- a/satrs/src/pus/test.rs +++ b/satrs/src/pus/test.rs @@ -1,7 +1,8 @@ use crate::pus::{ - PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError, PusTmAsVec, - PusTmInPool, PusTmVariant, + PacketAsVec, PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError, + PusTmVariant, }; +use crate::tmtc::PacketSenderWithSharedPool; use spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader}; use spacepackets::ecss::PusPacket; use spacepackets::SpHeader; @@ -9,15 +10,14 @@ use std::sync::mpsc; use super::verification::{VerificationReporter, VerificationReportingProvider}; use super::{ - EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcReceiverCore, - EcssTmSender, GenericConversionError, MpscTcReceiver, MpscTmInSharedPoolSender, - MpscTmInSharedPoolSenderBounded, PusServiceHelper, + EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcReceiver, + EcssTmSender, GenericConversionError, MpscTcReceiver, PusServiceHelper, }; /// This is a helper class for [std] environments to handle generic PUS 17 (test service) packets. /// This handler only processes ping requests and generates a ping reply for them accordingly. pub struct PusService17TestHandler< - TcReceiver: EcssTcReceiverCore, + TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, @@ -27,7 +27,7 @@ pub struct PusService17TestHandler< } impl< - TcReceiver: EcssTcReceiverCore, + TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, @@ -127,7 +127,7 @@ impl< /// mpsc queues. pub type PusService17TestHandlerDynWithMpsc = PusService17TestHandler< MpscTcReceiver, - mpsc::Sender, + mpsc::Sender, EcssTcInVecConverter, VerificationReporter, >; @@ -135,23 +135,15 @@ pub type PusService17TestHandlerDynWithMpsc = PusService17TestHandler< /// queues. pub type PusService17TestHandlerDynWithBoundedMpsc = PusService17TestHandler< MpscTcReceiver, - mpsc::SyncSender, + mpsc::SyncSender, EcssTcInVecConverter, VerificationReporter, >; -/// Helper type definition for a PUS 17 handler with a shared store TMTC memory backend and regular -/// mpsc queues. -pub type PusService17TestHandlerStaticWithMpsc = PusService17TestHandler< - MpscTcReceiver, - MpscTmInSharedPoolSender, - EcssTcInSharedStoreConverter, - VerificationReporter, ->; /// Helper type definition for a PUS 17 handler with a shared store TMTC memory backend and bounded /// mpsc queues. pub type PusService17TestHandlerStaticWithBoundedMpsc = PusService17TestHandler< MpscTcReceiver, - MpscTmInSharedPoolSenderBounded, + PacketSenderWithSharedPool, EcssTcInSharedStoreConverter, VerificationReporter, >; @@ -168,9 +160,9 @@ mod tests { use crate::pus::verification::{TcStateAccepted, VerificationToken}; use crate::pus::{ EcssTcInSharedStoreConverter, EcssTcInVecConverter, GenericConversionError, MpscTcReceiver, - MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded, PusPacketHandlerResult, - PusPacketHandlingError, + MpscTmAsVecSender, PusPacketHandlerResult, PusPacketHandlingError, }; + use crate::tmtc::PacketSenderWithSharedPool; use crate::ComponentId; use delegate::delegate; use spacepackets::ecss::tc::{PusTcCreator, PusTcSecondaryHeader}; @@ -185,7 +177,7 @@ mod tests { common: PusServiceHandlerWithSharedStoreCommon, handler: PusService17TestHandler< MpscTcReceiver, - MpscTmInSharedPoolSenderBounded, + PacketSenderWithSharedPool, EcssTcInSharedStoreConverter, VerificationReporter, >, diff --git a/satrs/src/pus/verification.rs b/satrs/src/pus/verification.rs index 085f8fd..ffd6fe4 100644 --- a/satrs/src/pus/verification.rs +++ b/satrs/src/pus/verification.rs @@ -19,10 +19,9 @@ //! use satrs::pus::verification::{ //! VerificationReportingProvider, VerificationReporterCfg, VerificationReporter //! }; +//! use satrs::tmtc::{SharedStaticMemoryPool, PacketSenderWithSharedPool}; //! use satrs::seq_count::SeqCountProviderSimple; //! use satrs::request::UniqueApidTargetId; -//! use satrs::pus::MpscTmInSharedPoolSender; -//! use satrs::tmtc::tm_helper::SharedTmPool; //! use spacepackets::ecss::PusPacket; //! use spacepackets::SpHeader; //! use spacepackets::ecss::tc::{PusTcCreator, PusTcSecondaryHeader}; @@ -34,10 +33,9 @@ //! //! let pool_cfg = StaticPoolConfig::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)], false); //! let tm_pool = StaticMemoryPool::new(pool_cfg.clone()); -//! let shared_tm_store = SharedTmPool::new(tm_pool); -//! let tm_store = shared_tm_store.clone_backing_pool(); -//! let (verif_tx, verif_rx) = mpsc::channel(); -//! let sender = MpscTmInSharedPoolSender::new(shared_tm_store, verif_tx); +//! let shared_tm_pool = SharedStaticMemoryPool::new(RwLock::new(tm_pool)); +//! let (verif_tx, verif_rx) = mpsc::sync_channel(10); +//! let sender = PacketSenderWithSharedPool::new_with_shared_packet_pool(verif_tx, &shared_tm_pool); //! let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); //! let mut reporter = VerificationReporter::new(TEST_COMPONENT_ID.id(), &cfg); //! @@ -61,7 +59,7 @@ //! let tm_in_store = verif_rx.recv_timeout(Duration::from_millis(10)).unwrap(); //! let tm_len; //! { -//! let mut rg = tm_store.write().expect("Error locking shared pool"); +//! let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); //! let store_guard = rg.read_with_guard(tm_in_store.store_addr); //! tm_len = store_guard.read(&mut tm_buf).expect("Error reading TM slice"); //! } @@ -1636,17 +1634,17 @@ pub mod test_util { #[cfg(test)] pub mod tests { - use crate::pool::{StaticMemoryPool, StaticPoolConfig}; + use crate::pool::{SharedStaticMemoryPool, StaticMemoryPool, StaticPoolConfig}; use crate::pus::test_util::{TEST_APID, TEST_COMPONENT_ID_0}; use crate::pus::tests::CommonTmInfo; use crate::pus::verification::{ EcssTmSender, EcssTmtcError, FailParams, FailParamsWithStep, RequestId, TcStateNone, VerificationReporter, VerificationReporterCfg, VerificationToken, }; - use crate::pus::{ChannelWithId, MpscTmInSharedPoolSender, PusTmVariant}; + use crate::pus::{ChannelWithId, PusTmVariant}; use crate::request::MessageMetadata; use crate::seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}; - use crate::tmtc::tm_helper::SharedTmPool; + use crate::tmtc::{PacketSenderWithSharedPool, SharedPacketPool}; use crate::ComponentId; use alloc::format; use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader}; @@ -1658,7 +1656,7 @@ pub mod tests { use spacepackets::{ByteConversionError, SpHeader}; use std::cell::RefCell; use std::collections::VecDeque; - use std::sync::mpsc; + use std::sync::{mpsc, RwLock}; use std::vec; use std::vec::Vec; @@ -2128,9 +2126,10 @@ pub mod tests { #[test] fn test_mpsc_verif_send() { let pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![(8, 8)], false)); - let shared_tm_store = SharedTmPool::new(pool); - let (tx, _) = mpsc::channel(); - let mpsc_verif_sender = MpscTmInSharedPoolSender::new(shared_tm_store, tx); + let shared_tm_store = + SharedPacketPool::new(&SharedStaticMemoryPool::new(RwLock::new(pool))); + let (tx, _) = mpsc::sync_channel(10); + let mpsc_verif_sender = PacketSenderWithSharedPool::new(tx, shared_tm_store); is_send(&mpsc_verif_sender); } diff --git a/satrs/src/tmtc/mod.rs b/satrs/src/tmtc/mod.rs index e72fb39..fcff8db 100644 --- a/satrs/src/tmtc/mod.rs +++ b/satrs/src/tmtc/mod.rs @@ -7,9 +7,13 @@ //! all received telecommands are sent to a special handler object called TC source. Using //! a design like this makes it simpler to add new TC packet sources or new telemetry generators: //! They only need to send the received and generated data to these objects. -use crate::pool::{PoolProvider, SharedStaticMemoryPool, StoreAddr, StoreError}; #[cfg(feature = "std")] use crate::queue::GenericSendError; +use crate::{ + pool::{PoolProvider, StoreAddr, StoreError}, + pus::PacketAsVec, + ComponentId, +}; use core::cell::RefCell; #[cfg(feature = "alloc")] use downcast_rs::{impl_downcast, Downcast}; @@ -28,35 +32,33 @@ pub use std_mod::*; pub mod tm_helper; -/// Generic trait for object which can receive any telecommands in form of a raw bytestream, with +/// Generic trait for object which can send any packets in form of a raw bytestream, with /// no assumptions about the received protocol. -/// -/// This trait can also be implemented for sender components which forward the packet. -/// It is implemented for common types like [mpsc::Sender] and [mpsc::SyncSender]. pub trait PacketSenderRaw: Send { type Error; - fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error>; + fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error>; } #[cfg(feature = "std")] -impl PacketSenderRaw for mpsc::Sender> { +impl PacketSenderRaw for mpsc::Sender { type Error = GenericSendError; - fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.send(tc_raw.to_vec()) + fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> { + self.send(PacketAsVec::new(sender_id, packet.to_vec())) .map_err(|_| GenericSendError::RxDisconnected) } } #[cfg(feature = "std")] -impl PacketSenderRaw for mpsc::SyncSender> { +impl PacketSenderRaw for mpsc::SyncSender { type Error = GenericSendError; - fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.try_send(tc_raw.to_vec()).map_err(|e| match e { - mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), - mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, - }) + fn send_packet(&self, sender_id: ComponentId, tc_raw: &[u8]) -> Result<(), Self::Error> { + self.try_send(PacketAsVec::new(sender_id, tc_raw.to_vec())) + .map_err(|e| match e { + mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), + mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, + }) } } @@ -93,75 +95,90 @@ where #[cfg(feature = "alloc")] impl_downcast!(PacketSenderRawExt assoc Error); -/// Generic trait for object which can receive CCSDS space packets, for example ECSS PUS packets -/// for CCSDS File Delivery Protocol (CFDP) packets. -/// -/// This trait can also be implemented for sender components which forward the packet. -/// It is implemented for common types like [mpsc::Sender] and [mpsc::SyncSender]. -pub trait ReceivesCcsdsTc: Send { +/// Generic trait for object which can send CCSDS space packets, for example ECSS PUS packets +/// or CCSDS File Delivery Protocol (CFDP) packets wrapped in space packets. +pub trait PacketSenderCcsds: Send { type Error; - fn pass_ccsds(&mut self, header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error>; + fn send_ccsds( + &self, + sender_id: ComponentId, + header: &SpHeader, + tc_raw: &[u8], + ) -> Result<(), Self::Error>; } #[cfg(feature = "std")] -impl ReceivesCcsdsTc for mpsc::Sender> { +impl PacketSenderCcsds for mpsc::Sender { type Error = GenericSendError; - fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.send(tc_raw.to_vec()) + fn send_ccsds( + &self, + sender_id: ComponentId, + _: &SpHeader, + tc_raw: &[u8], + ) -> Result<(), Self::Error> { + self.send(PacketAsVec::new(sender_id, tc_raw.to_vec())) .map_err(|_| GenericSendError::RxDisconnected) } } #[cfg(feature = "std")] -impl ReceivesCcsdsTc for mpsc::SyncSender> { +impl PacketSenderCcsds for mpsc::SyncSender { type Error = GenericSendError; - fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.try_send(tc_raw.to_vec()).map_err(|e| match e { - mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), - mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, - }) + fn send_ccsds( + &self, + sender_id: ComponentId, + _: &SpHeader, + packet_raw: &[u8], + ) -> Result<(), Self::Error> { + self.try_send(PacketAsVec::new(sender_id, packet_raw.to_vec())) + .map_err(|e| match e { + mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), + mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, + }) } } -/// Generic trait for a TM packet source, with no restrictions on the type of TM. +/// Generic trait for a packet receiver, with no restrictions on the type of packet. /// Implementors write the telemetry into the provided buffer and return the size of the telemetry. -pub trait TmPacketSource: Send { +pub trait PacketSource: Send { type Error; fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result; } -/// Extension trait of [TmPacketSource] which allows downcasting by implementing [Downcast]. +/// Extension trait of [PacketSource] which allows downcasting by implementing [Downcast]. #[cfg(feature = "alloc")] -pub trait TmPacketSourceExt: TmPacketSource + Downcast { +pub trait PacketSourceExt: PacketSource + Downcast { // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast(&self) -> &dyn TmPacketSource; + fn upcast(&self) -> &dyn PacketSource; // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast_mut(&mut self) -> &mut dyn TmPacketSource; + fn upcast_mut(&mut self) -> &mut dyn PacketSource; } -/// Blanket implementation to automatically implement [TmPacketSourceExt] when the [alloc] feature +/// Blanket implementation to automatically implement [PacketSourceExt] when the [alloc] feature /// is enabled. #[cfg(feature = "alloc")] -impl TmPacketSourceExt for T +impl PacketSourceExt for T where - T: TmPacketSource + 'static, + T: PacketSource + 'static, { // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast(&self) -> &dyn TmPacketSource { + fn upcast(&self) -> &dyn PacketSource { self } // Remove this once trait upcasting coercion has been implemented. // Tracking issue: https://github.com/rust-lang/rust/issues/65991 - fn upcast_mut(&mut self) -> &mut dyn TmPacketSource { + fn upcast_mut(&mut self) -> &mut dyn PacketSource { self } } +/// Newtype wrapper around the [SharedStaticMemoryPool] to enable extension helper traits on +/// top of the regular shared memory pool API. #[derive(Clone)] pub struct SharedPacketPool(pub SharedStaticMemoryPool); @@ -171,6 +188,7 @@ impl SharedPacketPool { } } +/// Helper trait for any generic (static) store which allows storing raw or CCSDS packets. pub trait CcsdsPacketPool { fn add_ccsds_tc(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result { self.add_raw_tc(tc_raw) @@ -179,10 +197,12 @@ pub trait CcsdsPacketPool { fn add_raw_tc(&mut self, tc_raw: &[u8]) -> Result; } +/// Helper trait for any generic (static) store which allows storing ECSS PUS Telecommand packets. pub trait PusTcPool { fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result; } +/// Helper trait for any generic (static) store which allows storing ECSS PUS Telemetry packets. pub trait PusTmPool { fn add_pus_tm_from_reader(&mut self, pus_tm: &PusTmReader) -> Result; fn add_pus_tm_from_creator(&mut self, pus_tm: &PusTmCreator) -> Result; @@ -228,13 +248,23 @@ impl CcsdsPacketPool for SharedPacketPool { } } +/// Generic trait for any sender component able to send packets stored inside a pool structure. +pub trait PacketInPoolSender: Send { + fn send_packet( + &self, + sender_id: ComponentId, + store_addr: StoreAddr, + ) -> Result<(), GenericSendError>; +} + #[cfg(feature = "std")] pub mod std_mod { - use std::sync::mpsc; + #[cfg(feature = "crossbeam")] + use crossbeam_channel as cb; use thiserror::Error; - use crate::pus::{EcssTmSender, ReceivesEcssPusTc}; + use crate::pus::{EcssTmSender, EcssTmtcError, PacketInPool, PacketSenderPusTc}; use super::*; @@ -246,77 +276,164 @@ pub mod std_mod { Send(#[from] GenericSendError), } - #[derive(Clone)] - pub struct PacketSenderSharedPool { - pub tc_source: mpsc::SyncSender, - pub shared_pool: RefCell, + pub use crate::pool::SharedStaticMemoryPool; + + impl PacketInPoolSender for mpsc::Sender { + fn send_packet( + &self, + sender_id: ComponentId, + store_addr: StoreAddr, + ) -> Result<(), GenericSendError> { + self.send(PacketInPool::new(sender_id, store_addr)) + .map_err(|_| GenericSendError::RxDisconnected) + } } - impl PacketSenderSharedPool { - pub fn new(tc_sender: mpsc::SyncSender, shared_pool: PacketStore) -> Self { + impl PacketInPoolSender for mpsc::SyncSender { + fn send_packet( + &self, + sender_id: ComponentId, + store_addr: StoreAddr, + ) -> Result<(), GenericSendError> { + self.try_send(PacketInPool::new(sender_id, store_addr)) + .map_err(|e| match e { + mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), + mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, + }) + } + } + + #[cfg(feature = "crossbeam")] + impl PacketInPoolSender for cb::Sender { + fn send_packet( + &self, + sender_id: ComponentId, + store_addr: StoreAddr, + ) -> Result<(), GenericSendError> { + self.try_send(PacketInPool::new(sender_id, store_addr)) + .map_err(|e| match e { + cb::TrySendError::Full(_) => GenericSendError::QueueFull(None), + cb::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, + }) + } + } + + /// This is the primary structure used to send packets stored in a dedicated memory pool + /// structure. + #[derive(Clone)] + pub struct PacketSenderWithSharedPool< + Sender: PacketInPoolSender = mpsc::SyncSender, + PacketPool: CcsdsPacketPool = SharedPacketPool, + > { + pub sender: Sender, + pub shared_pool: RefCell, + } + + impl PacketSenderWithSharedPool { + pub fn new_with_shared_packet_pool( + packet_sender: Sender, + shared_pool: &SharedStaticMemoryPool, + ) -> Self { Self { - tc_source: tc_sender, + sender: packet_sender, + shared_pool: RefCell::new(SharedPacketPool::new(shared_pool)), + } + } + } + + impl + PacketSenderWithSharedPool + { + pub fn new(packet_sender: Sender, shared_pool: PacketStore) -> Self { + Self { + sender: packet_sender, shared_pool: RefCell::new(shared_pool), } } } - impl PacketSenderSharedPool { + impl + PacketSenderWithSharedPool + { pub fn shared_packet_store(&self) -> PacketStore { let pool = self.shared_pool.borrow(); pool.clone() } } - impl PacketSenderRaw for PacketSenderSharedPool { - type Error = StoreAndSendError; - - fn send_raw_tc(&self, tc_raw: &[u8]) -> Result<(), Self::Error> { - let mut shared_pool = self.shared_pool.borrow_mut(); - let addr = shared_pool.add_raw_tc(tc_raw)?; - drop(shared_pool); - self.tc_source.try_send(addr).map_err(|e| match e { - mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), - mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, - })?; - Ok(()) - } - } - - impl ReceivesEcssPusTc - for PacketSenderSharedPool + impl PacketSenderRaw + for PacketSenderWithSharedPool { type Error = StoreAndSendError; - fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { + fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> { let mut shared_pool = self.shared_pool.borrow_mut(); - let addr = shared_pool.add_raw_tc(pus_tc.raw_data())?; + let store_addr = shared_pool.add_raw_tc(packet)?; drop(shared_pool); - self.tc_source.try_send(addr).map_err(|e| match e { - mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), - mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, - })?; + self.sender + .send_packet(sender_id, store_addr) + .map_err(StoreAndSendError::Send)?; Ok(()) } } - impl ReceivesCcsdsTc for PacketSenderSharedPool { + impl + PacketSenderPusTc for PacketSenderWithSharedPool + { type Error = StoreAndSendError; - fn pass_ccsds(&mut self, _sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.send_raw_tc(tc_raw) + fn send_pus_tc( + &self, + sender_id: ComponentId, + _: &SpHeader, + pus_tc: &PusTcReader, + ) -> Result<(), Self::Error> { + let mut shared_pool = self.shared_pool.borrow_mut(); + let store_addr = shared_pool.add_raw_tc(pus_tc.raw_data())?; + drop(shared_pool); + self.sender + .send_packet(sender_id, store_addr) + .map_err(StoreAndSendError::Send)?; + Ok(()) } } - impl EcssTmSender - for PacketSenderSharedPool + impl PacketSenderCcsds + for PacketSenderWithSharedPool + { + type Error = StoreAndSendError; + + fn send_ccsds( + &self, + sender_id: ComponentId, + _sp_header: &SpHeader, + tc_raw: &[u8], + ) -> Result<(), Self::Error> { + self.send_packet(sender_id, tc_raw) + } + } + + impl EcssTmSender + for PacketSenderWithSharedPool { fn send_tm( &self, - source_id: crate::ComponentId, + sender_id: crate::ComponentId, tm: crate::pus::PusTmVariant, ) -> Result<(), crate::pus::EcssTmtcError> { - todo!() + let send_addr = |store_addr: StoreAddr| { + self.sender + .send_packet(sender_id, store_addr) + .map_err(EcssTmtcError::Send) + }; + match tm { + crate::pus::PusTmVariant::InStore(store_addr) => send_addr(store_addr), + crate::pus::PusTmVariant::Direct(tm_creator) => { + let mut pool = self.shared_pool.borrow_mut(); + let store_addr = pool.add_pus_tm_from_creator(&tm_creator)?; + send_addr(store_addr) + } + } } } } @@ -333,19 +450,21 @@ pub(crate) mod tests { use std::sync::mpsc; pub(crate) fn send_with_sender( + sender_id: ComponentId, packet_sender: &(impl PacketSenderRaw + ?Sized), packet: &[u8], ) -> Result<(), SendError> { - packet_sender.send_raw_tc(packet) + packet_sender.send_packet(sender_id, packet) } #[test] - fn test_basic_mpsc_channel_sender() { + fn test_basic_mpsc_channel_sender_bounded() { let (tx, rx) = mpsc::channel(); let some_packet = vec![1, 2, 3, 4, 5]; - send_with_sender(&tx, &some_packet).expect("failed to send packet"); + send_with_sender(1, &tx, &some_packet).expect("failed to send packet"); let rx_packet = rx.try_recv().unwrap(); - assert_eq!(some_packet, rx_packet); + assert_eq!(some_packet, rx_packet.packet); + assert_eq!(1, rx_packet.sender_id); } #[test] @@ -353,7 +472,7 @@ pub(crate) mod tests { let (tx, rx) = mpsc::channel(); let some_packet = vec![1, 2, 3, 4, 5]; drop(rx); - let result = send_with_sender(&tx, &some_packet); + let result = send_with_sender(2, &tx, &some_packet); assert!(result.is_err()); matches!(result.unwrap_err(), GenericSendError::RxDisconnected); } @@ -362,9 +481,10 @@ pub(crate) mod tests { fn test_basic_mpsc_sync_sender() { let (tx, rx) = mpsc::sync_channel(3); let some_packet = vec![1, 2, 3, 4, 5]; - send_with_sender(&tx, &some_packet).expect("failed to send packet"); + send_with_sender(3, &tx, &some_packet).expect("failed to send packet"); let rx_packet = rx.try_recv().unwrap(); - assert_eq!(some_packet, rx_packet); + assert_eq!(some_packet, rx_packet.packet); + assert_eq!(3, rx_packet.sender_id); } #[test] @@ -372,7 +492,7 @@ pub(crate) mod tests { let (tx, rx) = mpsc::sync_channel(3); let some_packet = vec![1, 2, 3, 4, 5]; drop(rx); - let result = send_with_sender(&tx, &some_packet); + let result = send_with_sender(0, &tx, &some_packet); assert!(result.is_err()); matches!(result.unwrap_err(), GenericSendError::RxDisconnected); } @@ -381,13 +501,31 @@ pub(crate) mod tests { fn test_basic_mpsc_sync_sender_queue_full() { let (tx, rx) = mpsc::sync_channel(1); let some_packet = vec![1, 2, 3, 4, 5]; - send_with_sender(&tx, &some_packet).expect("failed to send packet"); - let result = send_with_sender(&tx, &some_packet); + send_with_sender(0, &tx, &some_packet).expect("failed to send packet"); + let result = send_with_sender(1, &tx, &some_packet); assert!(result.is_err()); matches!(result.unwrap_err(), GenericSendError::QueueFull(None)); let rx_packet = rx.try_recv().unwrap(); - assert_eq!(some_packet, rx_packet); + assert_eq!(some_packet, rx_packet.packet); } + + #[test] + fn test_basic_shared_store_sender_unbounded_sender() { + let (tc_tx, tc_rx) = mpsc::channel(); + let pool_cfg = StaticPoolConfig::new(vec![(2, 8)], true); + let shared_pool = SharedPacketPool::new(&SharedStaticMemoryPool::new(RwLock::new( + StaticMemoryPool::new(pool_cfg), + ))); + let some_packet = vec![1, 2, 3, 4, 5]; + let tc_sender = PacketSenderWithSharedPool::new(tc_tx, shared_pool.clone()); + send_with_sender(5, &tc_sender, &some_packet).expect("failed to send packet"); + let packet_in_pool = tc_rx.try_recv().unwrap(); + let mut pool = shared_pool.0.write().unwrap(); + let read_guard = pool.read_with_guard(packet_in_pool.store_addr); + assert_eq!(read_guard.read_as_vec().unwrap(), some_packet); + assert_eq!(packet_in_pool.sender_id, 5) + } + #[test] fn test_basic_shared_store_sender() { let (tc_tx, tc_rx) = mpsc::sync_channel(10); @@ -396,12 +534,13 @@ pub(crate) mod tests { StaticMemoryPool::new(pool_cfg), ))); let some_packet = vec![1, 2, 3, 4, 5]; - let tc_sender = PacketSenderSharedPool::new(tc_tx, shared_pool.clone()); - send_with_sender(&tc_sender, &some_packet).expect("failed to send packet"); - let tc_tx_addr = tc_rx.try_recv().unwrap(); + let tc_sender = PacketSenderWithSharedPool::new(tc_tx, shared_pool.clone()); + send_with_sender(5, &tc_sender, &some_packet).expect("failed to send packet"); + let packet_in_pool = tc_rx.try_recv().unwrap(); let mut pool = shared_pool.0.write().unwrap(); - let read_guard = pool.read_with_guard(tc_tx_addr); + let read_guard = pool.read_with_guard(packet_in_pool.store_addr); assert_eq!(read_guard.read_as_vec().unwrap(), some_packet); + assert_eq!(packet_in_pool.sender_id, 5) } #[test] @@ -413,8 +552,8 @@ pub(crate) mod tests { ))); let some_packet = vec![1, 2, 3, 4, 5]; drop(tc_rx); - let tc_sender = PacketSenderSharedPool::new(tc_tx, shared_pool.clone()); - let result = send_with_sender(&tc_sender, &some_packet); + let tc_sender = PacketSenderWithSharedPool::new(tc_tx, shared_pool.clone()); + let result = send_with_sender(2, &tc_sender, &some_packet); assert!(result.is_err()); matches!( result.unwrap_err(), @@ -430,18 +569,19 @@ pub(crate) mod tests { StaticMemoryPool::new(pool_cfg), ))); let some_packet = vec![1, 2, 3, 4, 5]; - let tc_sender = PacketSenderSharedPool::new(tc_tx, shared_pool.clone()); - send_with_sender(&tc_sender, &some_packet).expect("failed to send packet"); - let result = send_with_sender(&tc_sender, &some_packet); + let tc_sender = PacketSenderWithSharedPool::new(tc_tx, shared_pool.clone()); + send_with_sender(3, &tc_sender, &some_packet).expect("failed to send packet"); + let result = send_with_sender(3, &tc_sender, &some_packet); assert!(result.is_err()); matches!( result.unwrap_err(), StoreAndSendError::Send(GenericSendError::RxDisconnected) ); - let tc_tx_addr = tc_rx.try_recv().unwrap(); + let packet_in_pool = tc_rx.try_recv().unwrap(); let mut pool = shared_pool.0.write().unwrap(); - let read_guard = pool.read_with_guard(tc_tx_addr); + let read_guard = pool.read_with_guard(packet_in_pool.store_addr); assert_eq!(read_guard.read_as_vec().unwrap(), some_packet); + assert_eq!(packet_in_pool.sender_id, 3); } #[test] @@ -452,17 +592,18 @@ pub(crate) mod tests { StaticMemoryPool::new(pool_cfg), ))); let some_packet = vec![1, 2, 3, 4, 5]; - let tc_sender = PacketSenderSharedPool::new(tc_tx, shared_pool.clone()); - send_with_sender(&tc_sender, &some_packet).expect("failed to send packet"); - let result = send_with_sender(&tc_sender, &some_packet); + let tc_sender = PacketSenderWithSharedPool::new(tc_tx, shared_pool.clone()); + send_with_sender(4, &tc_sender, &some_packet).expect("failed to send packet"); + let result = send_with_sender(4, &tc_sender, &some_packet); assert!(result.is_err()); matches!( result.unwrap_err(), StoreAndSendError::Store(StoreError::StoreFull(..)) ); - let tc_tx_addr = tc_rx.try_recv().unwrap(); + let packet_in_pool = tc_rx.try_recv().unwrap(); let mut pool = shared_pool.0.write().unwrap(); - let read_guard = pool.read_with_guard(tc_tx_addr); + let read_guard = pool.read_with_guard(packet_in_pool.store_addr); assert_eq!(read_guard.read_as_vec().unwrap(), some_packet); + assert_eq!(packet_in_pool.sender_id, 4); } } diff --git a/satrs/tests/pus_events.rs b/satrs/tests/pus_events.rs index 6fc518f..2671025 100644 --- a/satrs/tests/pus_events.rs +++ b/satrs/tests/pus_events.rs @@ -7,7 +7,7 @@ use satrs::params::U32Pair; use satrs::params::{Params, ParamsHeapless, WritableToBeBytes}; use satrs::pus::event_man::{DefaultPusEventMgmtBackend, EventReporter, PusEventDispatcher}; use satrs::pus::test_util::TEST_COMPONENT_ID_0; -use satrs::pus::PusTmAsVec; +use satrs::pus::PacketAsVec; use satrs::request::UniqueApidTargetId; use spacepackets::ecss::tm::PusTmReader; use spacepackets::ecss::{PusError, PusPacket}; @@ -37,7 +37,7 @@ fn test_threaded_usage() { let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx); event_man.subscribe_all(pus_event_man_send_provider.target_id()); event_man.add_sender(pus_event_man_send_provider); - let (event_tx, event_rx) = mpsc::channel::(); + let (event_tx, event_rx) = mpsc::channel::(); let reporter = EventReporter::new(TEST_ID.raw(), 0x02, 0, 128).expect("Creating event reporter failed"); let pus_event_man = PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default()); diff --git a/satrs/tests/pus_verification.rs b/satrs/tests/pus_verification.rs index ada5850..4451909 100644 --- a/satrs/tests/pus_verification.rs +++ b/satrs/tests/pus_verification.rs @@ -7,13 +7,12 @@ pub mod crossbeam_test { FailParams, RequestId, VerificationReporter, VerificationReporterCfg, VerificationReportingProvider, }; - use satrs::pus::TmInSharedPoolSenderWithCrossbeam; - use satrs::tmtc::tm_helper::SharedTmPool; + use satrs::tmtc::{PacketSenderWithSharedPool, SharedStaticMemoryPool}; use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader}; use spacepackets::ecss::tm::PusTmReader; use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, WritablePusPacket}; use spacepackets::SpHeader; - use std::sync::{Arc, RwLock}; + use std::sync::RwLock; use std::thread; use std::time::Duration; @@ -36,12 +35,15 @@ pub mod crossbeam_test { // Shared pool object to store the verification PUS telemetry let pool_cfg = StaticPoolConfig::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)], false); - let shared_tm_pool = SharedTmPool::new(StaticMemoryPool::new(pool_cfg.clone())); - let shared_tc_pool_0 = Arc::new(RwLock::new(StaticMemoryPool::new(pool_cfg))); - let shared_tc_pool_1 = shared_tc_pool_0.clone(); + let shared_tm_pool = + SharedStaticMemoryPool::new(RwLock::new(StaticMemoryPool::new(pool_cfg.clone()))); + let shared_tc_pool = + SharedStaticMemoryPool::new(RwLock::new(StaticMemoryPool::new(pool_cfg))); + let shared_tc_pool_1 = shared_tc_pool.clone(); let (tx, rx) = crossbeam_channel::bounded(10); - let sender_0 = TmInSharedPoolSenderWithCrossbeam::new(shared_tm_pool.clone(), tx.clone()); - let sender_1 = sender_0.clone(); + let sender = + PacketSenderWithSharedPool::new_with_shared_packet_pool(tx.clone(), &shared_tm_pool); + let sender_1 = sender.clone(); let mut reporter_with_sender_0 = VerificationReporter::new(TEST_COMPONENT_ID_0.id(), &cfg); let mut reporter_with_sender_1 = reporter_with_sender_0.clone(); // For test purposes, we retrieve the request ID from the TCs and pass them to the receiver @@ -52,7 +54,7 @@ pub mod crossbeam_test { let (tx_tc_0, rx_tc_0) = crossbeam_channel::bounded(3); let (tx_tc_1, rx_tc_1) = crossbeam_channel::bounded(3); { - let mut tc_guard = shared_tc_pool_0.write().unwrap(); + let mut tc_guard = shared_tc_pool.write().unwrap(); let sph = SpHeader::new_for_unseg_tc(TEST_APID, 0, 0); let tc_header = PusTcSecondaryHeader::new_simple(17, 1); let pus_tc_0 = PusTcCreator::new_no_app_data(sph, tc_header, true); @@ -81,7 +83,7 @@ pub mod crossbeam_test { .expect("Receive timeout"); let tc_len; { - let mut tc_guard = shared_tc_pool_0.write().unwrap(); + let mut tc_guard = shared_tc_pool.write().unwrap(); let pg = tc_guard.read_with_guard(tc_addr); tc_len = pg.read(&mut tc_buf).unwrap(); } @@ -89,24 +91,24 @@ pub mod crossbeam_test { let token = reporter_with_sender_0.add_tc_with_req_id(req_id_0); let accepted_token = reporter_with_sender_0 - .acceptance_success(&sender_0, token, &FIXED_STAMP) + .acceptance_success(&sender, token, &FIXED_STAMP) .expect("Acceptance success failed"); // Do some start handling here let started_token = reporter_with_sender_0 - .start_success(&sender_0, accepted_token, &FIXED_STAMP) + .start_success(&sender, accepted_token, &FIXED_STAMP) .expect("Start success failed"); // Do some step handling here reporter_with_sender_0 - .step_success(&sender_0, &started_token, &FIXED_STAMP, EcssEnumU8::new(0)) + .step_success(&sender, &started_token, &FIXED_STAMP, EcssEnumU8::new(0)) .expect("Start success failed"); // Finish up reporter_with_sender_0 - .step_success(&sender_0, &started_token, &FIXED_STAMP, EcssEnumU8::new(1)) + .step_success(&sender, &started_token, &FIXED_STAMP, EcssEnumU8::new(1)) .expect("Start success failed"); reporter_with_sender_0 - .completion_success(&sender_0, started_token, &FIXED_STAMP) + .completion_success(&sender, started_token, &FIXED_STAMP) .expect("Completion success failed"); }); @@ -145,9 +147,8 @@ pub mod crossbeam_test { .recv_timeout(Duration::from_millis(50)) .expect("Packet reception timeout"); let tm_len; - let shared_tm_store = shared_tm_pool.clone_backing_pool(); { - let mut rg = shared_tm_store.write().expect("Error locking shared pool"); + let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); let store_guard = rg.read_with_guard(tm_in_pool.store_addr); tm_len = store_guard .read(&mut tm_buf) diff --git a/satrs/tests/tcp_servers.rs b/satrs/tests/tcp_servers.rs index 209a57c..5a7b840 100644 --- a/satrs/tests/tcp_servers.rs +++ b/satrs/tests/tcp_servers.rs @@ -28,7 +28,8 @@ use satrs::{ ConnectionResult, HandledConnectionHandler, HandledConnectionInfo, ServerConfig, TcpSpacepacketsServer, TcpTmtcInCobsServer, }, - tmtc::TmPacketSource, + tmtc::PacketSource, + ComponentId, }; use spacepackets::{ ecss::{tc::PusTcCreator, WritablePusPacket}, @@ -74,7 +75,7 @@ impl SyncTmSource { } } -impl TmPacketSource for SyncTmSource { +impl PacketSource for SyncTmSource { type Error = (); fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { @@ -96,6 +97,7 @@ impl TmPacketSource for SyncTmSource { } } +const TCP_SERVER_ID: ComponentId = 0x05; const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5]; const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 4, 1]; const AUTO_PORT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); @@ -107,7 +109,13 @@ fn test_cobs_server() { // Insert a telemetry packet which will be read back by the client at a later stage. tm_source.add_tm(&INVERTED_PACKET); let mut tcp_server = TcpTmtcInCobsServer::new( - ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), + ServerConfig::new( + TCP_SERVER_ID, + AUTO_PORT_ADDR, + Duration::from_millis(2), + 1024, + 1024, + ), tm_source, tc_sender.clone(), ConnectionFinishedHandler::default(), @@ -175,8 +183,9 @@ fn test_cobs_server() { panic!("connection was not handled properly"); } // Check that the packet was received and decoded successfully. - let tc = tc_receiver.try_recv().expect("no TC received"); - assert_eq!(tc, SIMPLE_PACKET); + let tc_with_sender = tc_receiver.try_recv().expect("no TC received"); + assert_eq!(tc_with_sender.packet, SIMPLE_PACKET); + assert_eq!(tc_with_sender.sender_id, TCP_SERVER_ID); matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)); } @@ -194,7 +203,13 @@ fn test_ccsds_server() { let mut packet_id_lookup = HashSet::new(); packet_id_lookup.insert(TEST_PACKET_ID_0); let mut tcp_server = TcpSpacepacketsServer::new( - ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), + ServerConfig::new( + TCP_SERVER_ID, + AUTO_PORT_ADDR, + Duration::from_millis(2), + 1024, + 1024, + ), tm_source, tc_sender, packet_id_lookup, @@ -263,7 +278,8 @@ fn test_ccsds_server() { panic!("connection was not handled properly"); } // Check that TC has arrived. - let tc = tc_receiver.try_recv().expect("no TC received"); - assert_eq!(tc, tc_0); + let tc_with_sender = tc_receiver.try_recv().expect("no TC received"); + assert_eq!(tc_with_sender.packet, tc_0); + assert_eq!(tc_with_sender.sender_id, TCP_SERVER_ID); matches!(tc_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)); }