diff --git a/satrs-example/src/acs.rs b/satrs-example/src/acs.rs index 5c400f7..4fbea90 100644 --- a/satrs-example/src/acs.rs +++ b/satrs-example/src/acs.rs @@ -1,7 +1,7 @@ use std::sync::mpsc::{self, TryRecvError}; use log::{info, warn}; -use satrs::pus::verification::{VerificationReporterWithSender, VerificationReportingProvider}; +use satrs::pus::verification::VerificationReportingProvider; use satrs::pus::{EcssTmSender, PusTmWrapper}; use satrs::request::TargetAndApidId; use satrs::spacepackets::ecss::hk::Subservice as HkSubservice; @@ -21,19 +21,19 @@ use crate::{ update_time, }; -pub struct AcsTask { +pub struct AcsTask { timestamp: [u8; 7], time_provider: TimeProvider, - verif_reporter: VerificationReporterWithSender, + verif_reporter: VerificationReporter, tm_sender: Box, request_rx: mpsc::Receiver, } -impl AcsTask { +impl AcsTask { pub fn new( tm_sender: impl EcssTmSender, request_rx: mpsc::Receiver, - verif_reporter: VerificationReporterWithSender, + verif_reporter: VerificationReporter, ) -> Self { Self { timestamp: [0; 7], diff --git a/satrs-example/src/events.rs b/satrs-example/src/events.rs index dbe7c8d..a6a0d86 100644 --- a/satrs-example/src/events.rs +++ b/satrs-example/src/events.rs @@ -11,10 +11,7 @@ use satrs::{ event_man::{ DefaultPusEventU32Dispatcher, EventReporter, EventRequest, EventRequestWithToken, }, - verification::{ - TcStateStarted, VerificationReporterWithSender, VerificationReportingProvider, - VerificationToken, - }, + verification::{TcStateStarted, VerificationReportingProvider, VerificationToken}, EcssTmSender, }, spacepackets::time::cds::{self, TimeProvider}, @@ -23,21 +20,21 @@ use satrs_example::config::PUS_APID; use crate::update_time; -pub struct PusEventHandler { +pub struct PusEventHandler { event_request_rx: mpsc::Receiver, pus_event_dispatcher: DefaultPusEventU32Dispatcher<()>, pus_event_man_rx: mpsc::Receiver<(EventU32, Option)>, tm_sender: Box, time_provider: TimeProvider, timestamp: [u8; 7], - verif_handler: VerificationReporterWithSender, + verif_handler: VerificationReporter, } /* */ -impl PusEventHandler { +impl PusEventHandler { pub fn new( - verif_handler: VerificationReporterWithSender, + verif_handler: VerificationReporter, event_manager: &mut EventManagerWithBoundedMpsc, event_request_rx: mpsc::Receiver, tm_sender: impl EcssTmSender, @@ -147,15 +144,15 @@ impl EventManagerWrapper { } } -pub struct EventHandler { +pub struct EventHandler { pub event_man_wrapper: EventManagerWrapper, - pub pus_event_handler: PusEventHandler, + pub pus_event_handler: PusEventHandler, } -impl EventHandler { +impl EventHandler { pub fn new( tm_sender: impl EcssTmSender, - verif_handler: VerificationReporterWithSender, + verif_handler: VerificationReporter, event_request_rx: mpsc::Receiver, ) -> Self { let mut event_man_wrapper = EventManagerWrapper::new(); diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 76f2ffd..9844424 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -44,7 +44,7 @@ use crate::tmtc::{ use crate::udp::{StaticUdpTmHandler, UdpTmtcServer}; use satrs::pus::event_man::EventRequestWithToken; use satrs::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; -use satrs::pus::{EcssTmSender, MpscTmAsVecSender, MpscTmInSharedPoolSender}; +use satrs::pus::{EcssTmSender, TmAsVecSenderWithId, TmInSharedPoolSenderWithId}; use satrs::spacepackets::{time::cds::TimeProvider, time::TimeWriter}; use satrs::tmtc::CcsdsDistributor; use satrs::ChannelId; @@ -54,11 +54,13 @@ use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; -fn create_verification_reporter(verif_sender: impl EcssTmSender) -> VerificationReporterWithSender { +fn create_verification_reporter( + verif_sender: Sender, +) -> VerificationReporterWithSender { let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap(); // Every software component which needs to generate verification telemetry, gets a cloned // verification reporter. - VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)) + VerificationReporterWithSender::new(&verif_cfg, verif_sender) } #[allow(dead_code)] @@ -68,13 +70,13 @@ fn static_tmtc_pool_main() { let shared_tc_pool = SharedTcPool { pool: Arc::new(RwLock::new(tc_pool)), }; - let (tc_source_tx, tc_source_rx) = channel(); - let (tm_funnel_tx, tm_funnel_rx) = channel(); - let (tm_server_tx, tm_server_rx) = channel(); + let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50); + let (tm_funnel_tx, tm_funnel_rx) = mpsc::sync_channel(50); + let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50); // Every software component which needs to generate verification telemetry, receives a cloned // verification reporter. - let verif_reporter = create_verification_reporter(MpscTmInSharedPoolSender::new( + let verif_reporter = create_verification_reporter(TmInSharedPoolSenderWithId::new( TmSenderId::PusVerification as ChannelId, "verif_sender", shared_tm_pool.clone(), @@ -102,7 +104,7 @@ fn static_tmtc_pool_main() { // The event task is the core handler to perform the event routing and TM handling as specified // in the sat-rs documentation. let mut event_handler = EventHandler::new( - MpscTmInSharedPoolSender::new( + TmInSharedPoolSenderWithId::new( TmSenderId::AllEvents as ChannelId, "ALL_EVENTS_TX", shared_tm_pool.clone(), @@ -202,7 +204,7 @@ fn static_tmtc_pool_main() { .expect("tcp server creation failed"); let mut acs_task = AcsTask::new( - MpscTmInSharedPoolSender::new( + TmInSharedPoolSenderWithId::new( TmSenderId::AcsSubsystem as ChannelId, "ACS_TASK_SENDER", shared_tm_pool.clone(), @@ -303,7 +305,7 @@ fn dyn_tmtc_pool_main() { let (tm_server_tx, tm_server_rx) = channel(); // Every software component which needs to generate verification telemetry, gets a cloned // verification reporter. - let verif_reporter = create_verification_reporter(MpscTmAsVecSender::new( + let verif_reporter = create_verification_reporter(TmAsVecSenderWithId::new( TmSenderId::PusVerification as ChannelId, "verif_sender", tm_funnel_tx.clone(), @@ -324,7 +326,7 @@ fn dyn_tmtc_pool_main() { // The event task is the core handler to perform the event routing and TM handling as specified // in the sat-rs documentation. let mut event_handler = EventHandler::new( - MpscTmAsVecSender::new( + TmAsVecSenderWithId::new( TmSenderId::AllEvents as ChannelId, "ALL_EVENTS_TX", tm_funnel_tx.clone(), @@ -415,7 +417,7 @@ fn dyn_tmtc_pool_main() { .expect("tcp server creation failed"); let mut acs_task = AcsTask::new( - MpscTmAsVecSender::new( + TmAsVecSenderWithId::new( TmSenderId::AcsSubsystem as ChannelId, "ACS_TASK_SENDER", tm_funnel_tx.clone(), diff --git a/satrs-example/src/pus/action.rs b/satrs-example/src/pus/action.rs index 653fe80..eb196b7 100644 --- a/satrs-example/src/pus/action.rs +++ b/satrs-example/src/pus/action.rs @@ -2,14 +2,16 @@ use log::{error, warn}; use satrs::action::ActionRequest; use satrs::pool::{SharedStaticMemoryPool, StoreAddr}; use satrs::pus::action::{PusActionToRequestConverter, PusService8ActionHandler}; +use satrs::pus::verification::std_mod::{ + VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender, +}; use satrs::pus::verification::{ - FailParams, TcStateAccepted, VerificationReporterWithSender, VerificationReportingProvider, - VerificationToken, + FailParams, TcStateAccepted, VerificationReportingProvider, VerificationToken, }; use satrs::pus::{ EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, - MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSender, PusPacketHandlerResult, - PusPacketHandlingError, PusServiceHelper, + MpscTcReceiver, PusPacketHandlerResult, PusPacketHandlingError, PusServiceHelper, + TmAsVecSenderWithId, TmInSharedPoolSenderWithId, }; use satrs::request::TargetAndApidId; use satrs::spacepackets::ecss::tc::PusTcReader; @@ -74,13 +76,14 @@ impl PusActionToRequestConverter for ExampleActionRequestConverter { pub fn create_action_service_static( shared_tm_store: SharedTmPool, - tm_funnel_tx: mpsc::Sender, - verif_reporter: VerificationReporterWithSender, + tm_funnel_tx: mpsc::SyncSender, + verif_reporter: VerificationReporterWithSharedPoolMpscBoundedSender, tc_pool: SharedStaticMemoryPool, pus_action_rx: mpsc::Receiver, action_router: GenericRequestRouter, -) -> Pus8Wrapper { - let action_srv_tm_sender = MpscTmInSharedPoolSender::new( +) -> Pus8Wrapper +{ + let action_srv_tm_sender = TmInSharedPoolSenderWithId::new( TmSenderId::PusAction as ChannelId, "PUS_8_TM_SENDER", shared_tm_store.clone(), @@ -108,11 +111,11 @@ pub fn create_action_service_static( pub fn create_action_service_dynamic( tm_funnel_tx: mpsc::Sender>, - verif_reporter: VerificationReporterWithSender, + verif_reporter: VerificationReporterWithVecMpscSender, pus_action_rx: mpsc::Receiver, action_router: GenericRequestRouter, -) -> Pus8Wrapper { - let action_srv_tm_sender = MpscTmAsVecSender::new( +) -> Pus8Wrapper { + let action_srv_tm_sender = TmAsVecSenderWithId::new( TmSenderId::PusAction as ChannelId, "PUS_8_TM_SENDER", tm_funnel_tx.clone(), @@ -137,17 +140,24 @@ pub fn create_action_service_dynamic( Pus8Wrapper { pus_8_handler } } -pub struct Pus8Wrapper { +pub struct Pus8Wrapper< + TcInMemConverter: EcssTcInMemConverter, + VerificationReporter: VerificationReportingProvider, +> { pub(crate) pus_8_handler: PusService8ActionHandler< TcInMemConverter, - VerificationReporterWithSender, + VerificationReporter, ExampleActionRequestConverter, GenericRequestRouter, GenericRoutingErrorHandler<8>, >, } -impl Pus8Wrapper { +impl< + TcInMemConverter: EcssTcInMemConverter, + VerificationReporter: VerificationReportingProvider, + > Pus8Wrapper +{ pub fn handle_next_packet(&mut self) -> bool { match self.pus_8_handler.handle_one_tc() { Ok(result) => match result { diff --git a/satrs-example/src/pus/event.rs b/satrs-example/src/pus/event.rs index 8256658..59f4949 100644 --- a/satrs-example/src/pus/event.rs +++ b/satrs-example/src/pus/event.rs @@ -4,11 +4,14 @@ use log::{error, warn}; use satrs::pool::{SharedStaticMemoryPool, StoreAddr}; use satrs::pus::event_man::EventRequestWithToken; use satrs::pus::event_srv::PusService5EventHandler; -use satrs::pus::verification::VerificationReporterWithSender; +use satrs::pus::verification::std_mod::{ + VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender, +}; +use satrs::pus::verification::VerificationReportingProvider; use satrs::pus::{ EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, - MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSender, PusPacketHandlerResult, - PusServiceHelper, + MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper, TmAsVecSenderWithId, + TmInSharedPoolSenderWithId, }; use satrs::tmtc::tm_helper::SharedTmPool; use satrs::ChannelId; @@ -16,13 +19,14 @@ use satrs_example::config::{TcReceiverId, TmSenderId, PUS_APID}; pub fn create_event_service_static( shared_tm_store: SharedTmPool, - tm_funnel_tx: mpsc::Sender, - verif_reporter: VerificationReporterWithSender, + tm_funnel_tx: mpsc::SyncSender, + verif_reporter: VerificationReporterWithSharedPoolMpscBoundedSender, tc_pool: SharedStaticMemoryPool, pus_event_rx: mpsc::Receiver, event_request_tx: mpsc::Sender, -) -> Pus5Wrapper { - let event_srv_tm_sender = MpscTmInSharedPoolSender::new( +) -> Pus5Wrapper +{ + let event_srv_tm_sender = TmInSharedPoolSenderWithId::new( TmSenderId::PusEvent as ChannelId, "PUS_5_TM_SENDER", shared_tm_store.clone(), @@ -48,11 +52,11 @@ pub fn create_event_service_static( pub fn create_event_service_dynamic( tm_funnel_tx: mpsc::Sender>, - verif_reporter: VerificationReporterWithSender, + verif_reporter: VerificationReporterWithVecMpscSender, pus_event_rx: mpsc::Receiver, event_request_tx: mpsc::Sender, -) -> Pus5Wrapper { - let event_srv_tm_sender = MpscTmAsVecSender::new( +) -> Pus5Wrapper { + let event_srv_tm_sender = TmAsVecSenderWithId::new( TmSenderId::PusEvent as ChannelId, "PUS_5_TM_SENDER", tm_funnel_tx, @@ -75,11 +79,18 @@ pub fn create_event_service_dynamic( Pus5Wrapper { pus_5_handler } } -pub struct Pus5Wrapper { - pub pus_5_handler: PusService5EventHandler, +pub struct Pus5Wrapper< + TcInMemConverter: EcssTcInMemConverter, + VerificationReporter: VerificationReportingProvider, +> { + pub pus_5_handler: PusService5EventHandler, } -impl Pus5Wrapper { +impl< + TcInMemConverter: EcssTcInMemConverter, + VerificationReporter: VerificationReportingProvider, + > Pus5Wrapper +{ pub fn handle_next_packet(&mut self) -> bool { match self.pus_5_handler.handle_one_tc() { Ok(result) => match result { diff --git a/satrs-example/src/pus/hk.rs b/satrs-example/src/pus/hk.rs index 035bb86..444283a 100644 --- a/satrs-example/src/pus/hk.rs +++ b/satrs-example/src/pus/hk.rs @@ -2,14 +2,16 @@ use log::{error, warn}; use satrs::hk::{CollectionIntervalFactor, HkRequest}; use satrs::pool::{SharedStaticMemoryPool, StoreAddr}; use satrs::pus::hk::{PusHkToRequestConverter, PusService3HkHandler}; +use satrs::pus::verification::std_mod::{ + VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender, +}; use satrs::pus::verification::{ - FailParams, TcStateAccepted, VerificationReporterWithSender, VerificationReportingProvider, - VerificationToken, + FailParams, TcStateAccepted, VerificationReportingProvider, VerificationToken, }; use satrs::pus::{ EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, - MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSender, PusPacketHandlerResult, - PusPacketHandlingError, PusServiceHelper, + MpscTcReceiver, PusPacketHandlerResult, PusPacketHandlingError, PusServiceHelper, + TmAsVecSenderWithId, TmInSharedPoolSenderWithId, }; use satrs::request::TargetAndApidId; use satrs::spacepackets::ecss::tc::PusTcReader; @@ -143,13 +145,14 @@ impl PusHkToRequestConverter for ExampleHkRequestConverter { pub fn create_hk_service_static( shared_tm_store: SharedTmPool, - tm_funnel_tx: mpsc::Sender, - verif_reporter: VerificationReporterWithSender, + tm_funnel_tx: mpsc::SyncSender, + verif_reporter: VerificationReporterWithSharedPoolMpscBoundedSender, tc_pool: SharedStaticMemoryPool, pus_hk_rx: mpsc::Receiver, request_router: GenericRequestRouter, -) -> Pus3Wrapper { - let hk_srv_tm_sender = MpscTmInSharedPoolSender::new( +) -> Pus3Wrapper +{ + let hk_srv_tm_sender = TmInSharedPoolSenderWithId::new( TmSenderId::PusHk as ChannelId, "PUS_3_TM_SENDER", shared_tm_store.clone(), @@ -174,11 +177,11 @@ pub fn create_hk_service_static( pub fn create_hk_service_dynamic( tm_funnel_tx: mpsc::Sender>, - verif_reporter: VerificationReporterWithSender, + verif_reporter: VerificationReporterWithVecMpscSender, pus_hk_rx: mpsc::Receiver, request_router: GenericRequestRouter, -) -> Pus3Wrapper { - let hk_srv_tm_sender = MpscTmAsVecSender::new( +) -> Pus3Wrapper { + let hk_srv_tm_sender = TmAsVecSenderWithId::new( TmSenderId::PusHk as ChannelId, "PUS_3_TM_SENDER", tm_funnel_tx.clone(), @@ -200,17 +203,24 @@ pub fn create_hk_service_dynamic( Pus3Wrapper { pus_3_handler } } -pub struct Pus3Wrapper { +pub struct Pus3Wrapper< + TcInMemConverter: EcssTcInMemConverter, + VerificationReporter: VerificationReportingProvider, +> { pub(crate) pus_3_handler: PusService3HkHandler< TcInMemConverter, - VerificationReporterWithSender, + VerificationReporter, ExampleHkRequestConverter, GenericRequestRouter, GenericRoutingErrorHandler<3>, >, } -impl Pus3Wrapper { +impl< + TcInMemConverter: EcssTcInMemConverter, + VerificationReporter: VerificationReportingProvider, + > Pus3Wrapper +{ pub fn handle_next_packet(&mut self) -> bool { match self.pus_3_handler.handle_one_tc() { Ok(result) => match result { diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index b903cb4..2b6c3ed 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -1,8 +1,6 @@ use crate::tmtc::MpscStoreAndSendError; use log::warn; -use satrs::pus::verification::{ - FailParams, StdVerifReporterWithSender, VerificationReportingProvider, -}; +use satrs::pus::verification::{FailParams, VerificationReportingProvider}; use satrs::pus::{ EcssTcAndToken, GenericRoutingError, PusPacketHandlerResult, PusRoutingErrorHandler, TcInMemory, }; @@ -28,8 +26,8 @@ pub struct PusTcMpscRouter { pub action_service_receiver: Sender, } -pub struct PusReceiver { - pub verif_reporter: StdVerifReporterWithSender, +pub struct PusReceiver { + pub verif_reporter: VerificationReporter, pub pus_router: PusTcMpscRouter, stamp_helper: TimeStampHelper, } @@ -61,8 +59,8 @@ impl TimeStampHelper { } } -impl PusReceiver { - pub fn new(verif_reporter: StdVerifReporterWithSender, pus_router: PusTcMpscRouter) -> Self { +impl PusReceiver { + pub fn new(verif_reporter: VerificationReporter, pus_router: PusTcMpscRouter) -> Self { Self { verif_reporter, pus_router, @@ -71,7 +69,7 @@ impl PusReceiver { } } -impl PusReceiver { +impl PusReceiver { pub fn handle_tc_packet( &mut self, tc_in_memory: TcInMemory, diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index 8b5ec47..abb9e3f 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -5,11 +5,14 @@ use log::{error, info, warn}; use satrs::pool::{PoolProvider, StaticMemoryPool, StoreAddr}; use satrs::pus::scheduler::{PusScheduler, TcInfo}; use satrs::pus::scheduler_srv::PusService11SchedHandler; -use satrs::pus::verification::VerificationReporterWithSender; +use satrs::pus::verification::std_mod::{ + VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender, +}; +use satrs::pus::verification::VerificationReportingProvider; use satrs::pus::{ EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, - MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSender, PusPacketHandlerResult, - PusServiceHelper, + MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper, TmAsVecSenderWithId, + TmInSharedPoolSenderWithId, }; use satrs::tmtc::tm_helper::SharedTmPool; use satrs::ChannelId; @@ -51,15 +54,22 @@ impl TcReleaser for mpsc::Sender> { } } -pub struct Pus11Wrapper { +pub struct Pus11Wrapper< + TcInMemConverter: EcssTcInMemConverter, + VerificationReporter: VerificationReportingProvider, +> { pub pus_11_handler: - PusService11SchedHandler, + PusService11SchedHandler, pub sched_tc_pool: StaticMemoryPool, pub releaser_buf: [u8; 4096], pub tc_releaser: Box, } -impl Pus11Wrapper { +impl< + TcInMemConverter: EcssTcInMemConverter, + VerificationReporter: VerificationReportingProvider, + > Pus11Wrapper +{ pub fn release_tcs(&mut self) { let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool { self.tc_releaser.release(enabled, info, tc) @@ -110,13 +120,14 @@ impl Pus11Wrapper { pub fn create_scheduler_service_static( shared_tm_store: SharedTmPool, - tm_funnel_tx: mpsc::Sender, - verif_reporter: VerificationReporterWithSender, + tm_funnel_tx: mpsc::SyncSender, + verif_reporter: VerificationReporterWithSharedPoolMpscBoundedSender, tc_releaser: PusTcSourceProviderSharedPool, pus_sched_rx: mpsc::Receiver, sched_tc_pool: StaticMemoryPool, -) -> Pus11Wrapper { - let sched_srv_tm_sender = MpscTmInSharedPoolSender::new( +) -> Pus11Wrapper +{ + let sched_srv_tm_sender = TmInSharedPoolSenderWithId::new( TmSenderId::PusSched as ChannelId, "PUS_11_TM_SENDER", shared_tm_store.clone(), @@ -149,12 +160,12 @@ pub fn create_scheduler_service_static( pub fn create_scheduler_service_dynamic( tm_funnel_tx: mpsc::Sender>, - verif_reporter: VerificationReporterWithSender, + verif_reporter: VerificationReporterWithVecMpscSender, tc_source_sender: mpsc::Sender>, pus_sched_rx: mpsc::Receiver, sched_tc_pool: StaticMemoryPool, -) -> Pus11Wrapper { - let sched_srv_tm_sender = MpscTmAsVecSender::new( +) -> Pus11Wrapper { + let sched_srv_tm_sender = TmAsVecSenderWithId::new( TmSenderId::PusSched as ChannelId, "PUS_11_TM_SENDER", tm_funnel_tx, diff --git a/satrs-example/src/pus/stack.rs b/satrs-example/src/pus/stack.rs index 98718be..23061f7 100644 --- a/satrs-example/src/pus/stack.rs +++ b/satrs-example/src/pus/stack.rs @@ -1,25 +1,32 @@ -use satrs::pus::EcssTcInMemConverter; +use satrs::pus::{verification::VerificationReportingProvider, EcssTcInMemConverter}; use super::{ action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11Wrapper, test::Service17CustomWrapper, }; -pub struct PusStack { - event_srv: Pus5Wrapper, - hk_srv: Pus3Wrapper, - action_srv: Pus8Wrapper, - schedule_srv: Pus11Wrapper, - test_srv: Service17CustomWrapper, +pub struct PusStack< + TcInMemConverter: EcssTcInMemConverter, + VerificationReporter: VerificationReportingProvider, +> { + event_srv: Pus5Wrapper, + hk_srv: Pus3Wrapper, + action_srv: Pus8Wrapper, + schedule_srv: Pus11Wrapper, + test_srv: Service17CustomWrapper, } -impl PusStack { +impl< + TcInMemConverter: EcssTcInMemConverter, + VerificationReporter: VerificationReportingProvider, + > PusStack +{ pub fn new( - hk_srv: Pus3Wrapper, - event_srv: Pus5Wrapper, - action_srv: Pus8Wrapper, - schedule_srv: Pus11Wrapper, - test_srv: Service17CustomWrapper, + hk_srv: Pus3Wrapper, + event_srv: Pus5Wrapper, + action_srv: Pus8Wrapper, + schedule_srv: Pus11Wrapper, + test_srv: Service17CustomWrapper, ) -> Self { Self { event_srv, diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index fe52af7..696cfdc 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -2,12 +2,13 @@ use log::{info, warn}; use satrs::params::Params; use satrs::pool::{SharedStaticMemoryPool, StoreAddr}; use satrs::pus::test::PusService17TestHandler; +use satrs::pus::verification::{FailParams, VerificationReportingProvider}; use satrs::pus::verification::{ - FailParams, VerificationReporterWithSender, VerificationReportingProvider, + VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender, }; use satrs::pus::{ - EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, - MpscTmInSharedPoolSender, PusPacketHandlerResult, PusServiceHelper, + EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, + PusPacketHandlerResult, PusServiceHelper, TmAsVecSenderWithId, TmInSharedPoolSenderWithId, }; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::PusPacket; @@ -21,13 +22,16 @@ use std::sync::mpsc::{self, Sender}; pub fn create_test_service_static( shared_tm_store: SharedTmPool, - tm_funnel_tx: mpsc::Sender, - verif_reporter: VerificationReporterWithSender, + tm_funnel_tx: mpsc::SyncSender, + verif_reporter: VerificationReporterWithSharedPoolMpscBoundedSender, tc_pool: SharedStaticMemoryPool, event_sender: mpsc::Sender<(EventU32, Option)>, pus_test_rx: mpsc::Receiver, -) -> Service17CustomWrapper { - let test_srv_tm_sender = MpscTmInSharedPoolSender::new( +) -> Service17CustomWrapper< + EcssTcInSharedStoreConverter, + VerificationReporterWithSharedPoolMpscBoundedSender, +> { + let test_srv_tm_sender = TmInSharedPoolSenderWithId::new( TmSenderId::PusTest as ChannelId, "PUS_17_TM_SENDER", shared_tm_store.clone(), @@ -53,11 +57,11 @@ pub fn create_test_service_static( pub fn create_test_service_dynamic( tm_funnel_tx: mpsc::Sender>, - verif_reporter: VerificationReporterWithSender, + verif_reporter: VerificationReporterWithVecMpscSender, event_sender: mpsc::Sender<(EventU32, Option)>, pus_test_rx: mpsc::Receiver, -) -> Service17CustomWrapper { - let test_srv_tm_sender = MpscTmAsVecSender::new( +) -> Service17CustomWrapper { + let test_srv_tm_sender = TmAsVecSenderWithId::new( TmSenderId::PusTest as ChannelId, "PUS_17_TM_SENDER", tm_funnel_tx.clone(), @@ -80,12 +84,19 @@ pub fn create_test_service_dynamic( } } -pub struct Service17CustomWrapper { - pub pus17_handler: PusService17TestHandler, +pub struct Service17CustomWrapper< + TcInMemConverter: EcssTcInMemConverter, + VerificationReporter: VerificationReportingProvider, +> { + pub pus17_handler: PusService17TestHandler, pub test_srv_event_sender: Sender<(EventU32, Option)>, } -impl Service17CustomWrapper { +impl< + TcInMemConverter: EcssTcInMemConverter, + VerificationReporter: VerificationReportingProvider, + > Service17CustomWrapper +{ pub fn handle_next_packet(&mut self) -> bool { let res = self.pus17_handler.handle_one_tc(); if res.is_err() { diff --git a/satrs-example/src/tm_funnel.rs b/satrs-example/src/tm_funnel.rs index af35045..8b6285f 100644 --- a/satrs-example/src/tm_funnel.rs +++ b/satrs-example/src/tm_funnel.rs @@ -1,6 +1,6 @@ use std::{ collections::HashMap, - sync::mpsc::{Receiver, Sender}, + sync::mpsc::{self}, }; use log::info; @@ -77,16 +77,16 @@ impl TmFunnelCommon { pub struct TmFunnelStatic { common: TmFunnelCommon, shared_tm_store: SharedTmPool, - tm_funnel_rx: Receiver, - tm_server_tx: Sender, + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::SyncSender, } impl TmFunnelStatic { pub fn new( shared_tm_store: SharedTmPool, sync_tm_tcp_source: SyncTcpTmSource, - tm_funnel_rx: Receiver, - tm_server_tx: Sender, + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::SyncSender, ) -> Self { Self { common: TmFunnelCommon::new(sync_tm_tcp_source), @@ -123,15 +123,15 @@ impl TmFunnelStatic { pub struct TmFunnelDynamic { common: TmFunnelCommon, - tm_funnel_rx: Receiver>, - tm_server_tx: Sender>, + tm_funnel_rx: mpsc::Receiver>, + tm_server_tx: mpsc::Sender>, } impl TmFunnelDynamic { pub fn new( sync_tm_tcp_source: SyncTcpTmSource, - tm_funnel_rx: Receiver>, - tm_server_tx: Sender>, + tm_funnel_rx: mpsc::Receiver>, + tm_server_tx: mpsc::Sender>, ) -> Self { Self { common: TmFunnelCommon::new(sync_tm_tcp_source), diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index f615f23..0a43504 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -1,7 +1,10 @@ use log::warn; +use satrs::pus::verification::std_mod::{ + VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender, +}; use satrs::pus::{EcssTcAndToken, ReceivesEcssPusTc}; use satrs::spacepackets::SpHeader; -use std::sync::mpsc::{self, Receiver, SendError, Sender, TryRecvError}; +use std::sync::mpsc::{self, Receiver, SendError, Sender, SyncSender, TryRecvError}; use thiserror::Error; use crate::pus::PusReceiver; @@ -37,7 +40,7 @@ impl SharedTcPool { #[derive(Clone)] pub struct PusTcSourceProviderSharedPool { - pub tc_source: Sender, + pub tc_source: SyncSender, pub shared_pool: SharedTcPool, } @@ -97,14 +100,14 @@ pub struct TcSourceTaskStatic { shared_tc_pool: SharedTcPool, tc_receiver: Receiver, tc_buf: [u8; 4096], - pus_receiver: PusReceiver, + pus_receiver: PusReceiver, } impl TcSourceTaskStatic { pub fn new( shared_tc_pool: SharedTcPool, tc_receiver: Receiver, - pus_receiver: PusReceiver, + pus_receiver: PusReceiver, ) -> Self { Self { shared_tc_pool, @@ -161,11 +164,14 @@ impl TcSourceTaskStatic { // TC source components where the heap is the backing memory of the received telecommands. pub struct TcSourceTaskDynamic { pub tc_receiver: Receiver>, - pus_receiver: PusReceiver, + pus_receiver: PusReceiver, } impl TcSourceTaskDynamic { - pub fn new(tc_receiver: Receiver>, pus_receiver: PusReceiver) -> Self { + pub fn new( + tc_receiver: Receiver>, + pus_receiver: PusReceiver, + ) -> Self { Self { tc_receiver, pus_receiver, diff --git a/satrs/CHANGELOG.md b/satrs/CHANGELOG.md index 290d551..b640167 100644 --- a/satrs/CHANGELOG.md +++ b/satrs/CHANGELOG.md @@ -15,6 +15,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - `ListenerTable` -> `ListenerMapProvider` - `SenderTable` -> `SenderMapProvider` - There is an `EventManagerWithMpsc` and a `EventManagerWithBoundedMpsc` helper type now. +- Refactored ECSS TM sender abstractions to be generic over different message queue backends. +- Refactored Verification Reporter abstractions and implementation to be generic over the sender + instead of using trait objects. ## Fixed diff --git a/satrs/src/cfdp/filestore.rs b/satrs/src/cfdp/filestore.rs index 48ebd88..528d1b8 100644 --- a/satrs/src/cfdp/filestore.rs +++ b/satrs/src/cfdp/filestore.rs @@ -7,7 +7,7 @@ use spacepackets::ByteConversionError; use std::error::Error; use std::path::Path; #[cfg(feature = "std")] -pub use stdmod::*; +pub use std_mod::*; pub const CRC_32: Crc = Crc::::new(&CRC_32_CKSUM); @@ -148,12 +148,11 @@ pub trait VirtualFilestore { } #[cfg(feature = "std")] -pub mod stdmod { +pub mod std_mod { use super::*; use std::{ fs::{self, File, OpenOptions}, io::{BufReader, Read, Seek, SeekFrom, Write}, - path::Path, }; #[derive(Default)] diff --git a/satrs/src/event_man.rs b/satrs/src/event_man.rs index b0470dd..304f9a1 100644 --- a/satrs/src/event_man.rs +++ b/satrs/src/event_man.rs @@ -85,12 +85,8 @@ pub trait EventSendProvider { /// Generic abstraction for an event receiver. pub trait EventReceiveProvider { - /// This function has to be provided by any event receiver. A receive call may or may not return - /// an event. - /// - /// To allow returning arbitrary additional auxiliary data, a mutable slice is passed to the - /// [Self::receive] call as well. Receivers can write data to this slice, but care must be taken - /// to avoid panics due to size missmatches or out of bound writes. + /// This function has to be provided by any event receiver. A call may or may not return + /// an event and optional auxiliary data. fn try_recv_event(&self) -> Option<(Event, Option)>; } @@ -433,10 +429,7 @@ pub mod alloc_mod { #[cfg(feature = "std")] pub mod std_mod { use super::*; - use crate::event_man::{EventReceiveProvider, EventWithAuxData}; - use crate::events::{EventU16, EventU32, GenericEvent}; - use crate::params::Params; - use std::sync::mpsc::{self}; + use std::sync::mpsc; pub struct MpscEventReceiver { mpsc_receiver: mpsc::Receiver<(Event, Option)>, diff --git a/satrs/src/pus/event.rs b/satrs/src/pus/event.rs index 9424b24..5cde422 100644 --- a/satrs/src/pus/event.rs +++ b/satrs/src/pus/event.rs @@ -269,7 +269,7 @@ mod tests { } impl EcssChannel for TestSender { - fn id(&self) -> ChannelId { + fn channel_id(&self) -> ChannelId { 0 } } diff --git a/satrs/src/pus/event_man.rs b/satrs/src/pus/event_man.rs index 720d787..e6e18c9 100644 --- a/satrs/src/pus/event_man.rs +++ b/satrs/src/pus/event_man.rs @@ -41,8 +41,8 @@ pub trait PusEventMgmtBackendProvider { #[cfg(feature = "heapless")] pub mod heapless_mod { use super::*; - use crate::events::{GenericEvent, LargestEventRaw}; - use std::marker::PhantomData; + use crate::events::LargestEventRaw; + use core::marker::PhantomData; #[cfg_attr(doc_cfg, doc(cfg(feature = "heapless")))] // TODO: After a new version of heapless is released which uses hash32 version 0.3, try using @@ -257,9 +257,8 @@ pub mod alloc_mod { #[cfg(test)] mod tests { use super::*; - use crate::events::SeverityInfo; - use crate::pus::MpscTmAsVecSender; - use std::sync::mpsc::{channel, TryRecvError}; + use crate::{events::SeverityInfo, pus::TmAsVecSenderWithMpsc}; + use std::sync::mpsc::{self, TryRecvError}; const INFO_EVENT: EventU32TypedSev = EventU32TypedSev::::const_new(1, 0); @@ -279,8 +278,8 @@ mod tests { #[test] fn test_basic() { let mut event_man = create_basic_man_1(); - let (event_tx, event_rx) = channel(); - let mut sender = MpscTmAsVecSender::new(0, "test_sender", event_tx); + let (event_tx, event_rx) = mpsc::channel(); + let mut sender = TmAsVecSenderWithMpsc::new(0, "test_sender", event_tx); let event_sent = event_man .generate_pus_event_tm(&mut sender, &EMPTY_STAMP, INFO_EVENT, None) .expect("Sending info event failed"); @@ -293,8 +292,8 @@ mod tests { #[test] fn test_disable_event() { let mut event_man = create_basic_man_2(); - let (event_tx, event_rx) = channel(); - let mut sender = MpscTmAsVecSender::new(0, "test", event_tx); + let (event_tx, event_rx) = mpsc::channel(); + let mut sender = TmAsVecSenderWithMpsc::new(0, "test", event_tx); let res = event_man.disable_tm_for_event(&LOW_SEV_EVENT); assert!(res.is_ok()); assert!(res.unwrap()); @@ -316,8 +315,8 @@ mod tests { #[test] fn test_reenable_event() { let mut event_man = create_basic_man_1(); - let (event_tx, event_rx) = channel(); - let mut sender = MpscTmAsVecSender::new(0, "test", event_tx); + let (event_tx, event_rx) = mpsc::channel(); + let mut sender = TmAsVecSenderWithMpsc::new(0, "test", event_tx); let mut res = event_man.disable_tm_for_event_with_sev(&INFO_EVENT); assert!(res.is_ok()); assert!(res.unwrap()); diff --git a/satrs/src/pus/event_srv.rs b/satrs/src/pus/event_srv.rs index b55a5c9..5e9b053 100644 --- a/satrs/src/pus/event_srv.rs +++ b/satrs/src/pus/event_srv.rs @@ -23,11 +23,11 @@ impl< > PusService5EventHandler { pub fn new( - service_handler: PusServiceHelper, + service_helper: PusServiceHelper, event_request_tx: Sender, ) -> Self { Self { - service_helper: service_handler, + service_helper, event_request_tx, } } @@ -138,7 +138,9 @@ mod tests { use crate::pus::event_man::EventRequest; use crate::pus::tests::SimplePusPacketHandler; - use crate::pus::verification::{RequestId, VerificationReporterWithSender}; + use crate::pus::verification::{ + RequestId, VerificationReporterWithSharedPoolMpscBoundedSender, + }; use crate::{ events::EventU32, pus::{ @@ -155,8 +157,10 @@ mod tests { struct Pus5HandlerWithStoreTester { common: PusServiceHandlerWithSharedStoreCommon, - handler: - PusService5EventHandler, + handler: PusService5EventHandler< + EcssTcInSharedStoreConverter, + VerificationReporterWithSharedPoolMpscBoundedSender, + >, } impl Pus5HandlerWithStoreTester { diff --git a/satrs/src/pus/mod.rs b/satrs/src/pus/mod.rs index 24fc17c..282833b 100644 --- a/satrs/src/pus/mod.rs +++ b/satrs/src/pus/mod.rs @@ -2,6 +2,8 @@ //! //! This module contains structures to make working with the PUS C standard easier. //! The satrs-example application contains various usage examples of these components. +use crate::pool::{StoreAddr, StoreError}; +use crate::pus::verification::{TcStateAccepted, TcStateToken, VerificationToken}; use crate::queue::{GenericRecvError, GenericSendError}; use crate::ChannelId; use core::fmt::{Display, Formatter}; @@ -34,8 +36,6 @@ pub mod verification; #[cfg(feature = "alloc")] pub use alloc_mod::*; -use crate::pool::{StoreAddr, StoreError}; -use crate::pus::verification::{TcStateAccepted, TcStateToken, VerificationToken}; #[cfg(feature = "std")] pub use std_mod::*; @@ -63,6 +63,7 @@ pub enum EcssTmtcError { Store(StoreError), Pus(PusError), CantSendAddr(StoreAddr), + CantSendDirectTm, Send(GenericSendError), Recv(GenericRecvError), } @@ -82,6 +83,9 @@ impl Display for EcssTmtcError { EcssTmtcError::CantSendAddr(addr) => { write!(f, "can not send address {addr}") } + EcssTmtcError::CantSendDirectTm => { + write!(f, "can not send TM directly") + } EcssTmtcError::Send(send_e) => { write!(f, "send error {send_e}") } @@ -123,13 +127,14 @@ impl Error for EcssTmtcError { EcssTmtcError::Store(e) => Some(e), EcssTmtcError::Pus(e) => Some(e), EcssTmtcError::Send(e) => Some(e), + EcssTmtcError::Recv(e) => Some(e), _ => None, } } } pub trait EcssChannel: Send { /// Each sender can have an ID associated with it - fn id(&self) -> ChannelId; + fn channel_id(&self) -> ChannelId; fn name(&self) -> &'static str { "unset" } @@ -138,7 +143,7 @@ pub trait EcssChannel: Send { /// Generic trait for a user supplied sender object. /// /// This sender object is responsible for sending PUS telemetry to a TM sink. -pub trait EcssTmSenderCore: EcssChannel { +pub trait EcssTmSenderCore: Send { fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError>; } @@ -146,7 +151,7 @@ pub trait EcssTmSenderCore: EcssChannel { /// /// This sender object is responsible for sending PUS telecommands to a TC recipient. Each /// telecommand can optionally have a token which contains its verification state. -pub trait EcssTcSenderCore: EcssChannel { +pub trait EcssTcSenderCore { fn send_tc(&self, tc: PusTcCreator, token: Option) -> Result<(), EcssTmtcError>; } @@ -221,25 +226,25 @@ impl TryFrom for AcceptedEcssTcAndToken { #[derive(Debug, Clone)] pub enum TryRecvTmtcError { - Error(EcssTmtcError), + Tmtc(EcssTmtcError), Empty, } impl From for TryRecvTmtcError { fn from(value: EcssTmtcError) -> Self { - Self::Error(value) + Self::Tmtc(value) } } impl From for TryRecvTmtcError { fn from(value: PusError) -> Self { - Self::Error(value.into()) + Self::Tmtc(value.into()) } } impl From for TryRecvTmtcError { fn from(value: StoreError) -> Self { - Self::Error(value.into()) + Self::Tmtc(value.into()) } } @@ -374,10 +379,9 @@ pub mod std_mod { use crate::{ChannelId, TargetId}; use alloc::boxed::Box; use alloc::vec::Vec; - use crossbeam_channel as cb; use spacepackets::ecss::tc::PusTcReader; use spacepackets::ecss::tm::PusTmCreator; - use spacepackets::ecss::PusError; + use spacepackets::ecss::{PusError, WritablePusPacket}; use spacepackets::time::cds::TimeProvider; use spacepackets::time::StdTimestampError; use spacepackets::time::TimeWriter; @@ -386,6 +390,9 @@ pub mod std_mod { use std::sync::mpsc::TryRecvError; use thiserror::Error; + #[cfg(feature = "crossbeam")] + pub use cb_mod::*; + use super::verification::VerificationReportingProvider; use super::{AcceptedEcssTcAndToken, TcInMemory}; @@ -395,32 +402,65 @@ pub mod std_mod { } } - impl From> for EcssTmtcError { - fn from(_: cb::SendError) -> Self { - Self::Send(GenericSendError::RxDisconnected) + impl EcssTmSenderCore for mpsc::Sender { + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { + match tm { + PusTmWrapper::InStore(addr) => self + .send(addr) + .map_err(|_| GenericSendError::RxDisconnected)?, + PusTmWrapper::Direct(_) => return Err(EcssTmtcError::CantSendDirectTm), + }; + Ok(()) } } - impl From> for EcssTmtcError { - fn from(value: cb::TrySendError) -> Self { - match value { - cb::TrySendError::Full(_) => Self::Send(GenericSendError::QueueFull(None)), - cb::TrySendError::Disconnected(_) => Self::Send(GenericSendError::RxDisconnected), - } + impl EcssTmSenderCore for mpsc::SyncSender { + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { + match tm { + PusTmWrapper::InStore(addr) => self + .try_send(addr) + .map_err(|e| EcssTmtcError::Send(e.into()))?, + PusTmWrapper::Direct(_) => return Err(EcssTmtcError::CantSendDirectTm), + }; + Ok(()) + } + } + + impl EcssTmSenderCore for mpsc::Sender> { + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { + match tm { + PusTmWrapper::InStore(addr) => return Err(EcssTmtcError::CantSendAddr(addr)), + PusTmWrapper::Direct(tm) => self + .send(tm.to_vec()?) + .map_err(|e| EcssTmtcError::Send(e.into()))?, + }; + Ok(()) + } + } + + impl EcssTmSenderCore for mpsc::SyncSender> { + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { + match tm { + PusTmWrapper::InStore(addr) => return Err(EcssTmtcError::CantSendAddr(addr)), + PusTmWrapper::Direct(tm) => self + .send(tm.to_vec()?) + .map_err(|e| EcssTmtcError::Send(e.into()))?, + }; + Ok(()) } } #[derive(Clone)] - pub struct MpscTmInSharedPoolSender { - id: ChannelId, + pub struct TmInSharedPoolSenderWithId { + channel_id: ChannelId, name: &'static str, shared_tm_store: SharedTmPool, - sender: mpsc::Sender, + sender: Sender, } - impl EcssChannel for MpscTmInSharedPoolSender { - fn id(&self) -> ChannelId { - self.id + impl EcssChannel for TmInSharedPoolSenderWithId { + fn channel_id(&self) -> ChannelId { + self.channel_id } fn name(&self) -> &'static str { @@ -428,36 +468,31 @@ pub mod std_mod { } } - impl MpscTmInSharedPoolSender { + impl TmInSharedPoolSenderWithId { pub fn send_direct_tm(&self, tm: PusTmCreator) -> Result<(), EcssTmtcError> { let addr = self.shared_tm_store.add_pus_tm(&tm)?; - self.sender - .send(addr) - .map_err(|_| EcssTmtcError::Send(GenericSendError::RxDisconnected)) + self.sender.send_tm(PusTmWrapper::InStore(addr)) } } - impl EcssTmSenderCore for MpscTmInSharedPoolSender { + impl EcssTmSenderCore for TmInSharedPoolSenderWithId { fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { - match tm { - PusTmWrapper::InStore(addr) => { - self.sender.send(addr)?; - Ok(()) - } - PusTmWrapper::Direct(tm) => self.send_direct_tm(tm), + if let PusTmWrapper::Direct(tm) = tm { + return self.send_direct_tm(tm); } + self.sender.send_tm(tm) } } - impl MpscTmInSharedPoolSender { + impl TmInSharedPoolSenderWithId { pub fn new( id: ChannelId, name: &'static str, shared_tm_store: SharedTmPool, - sender: mpsc::Sender, + sender: Sender, ) -> Self { Self { - id, + channel_id: id, name, shared_tm_store, sender, @@ -465,6 +500,51 @@ pub mod std_mod { } } + pub type TmInSharedPoolSenderWithMpsc = TmInSharedPoolSenderWithId>; + pub type TmInSharedPoolSenderWithBoundedMpsc = + TmInSharedPoolSenderWithId>; + + /// This class can be used if frequent heap allocations during run-time are not an issue. + /// PUS TM packets will be sent around as [Vec]s. Please note that the current implementation + /// of this class can not deal with store addresses, so it is assumed that is is always + /// going to be called with direct packets. + #[derive(Clone)] + pub struct TmAsVecSenderWithId { + id: ChannelId, + name: &'static str, + sender: Sender, + } + + impl From>> for EcssTmtcError { + fn from(_: mpsc::SendError>) -> Self { + Self::Send(GenericSendError::RxDisconnected) + } + } + + impl TmAsVecSenderWithId { + pub fn new(id: u32, name: &'static str, sender: Sender) -> Self { + Self { id, sender, name } + } + } + + impl EcssChannel for TmAsVecSenderWithId { + fn channel_id(&self) -> ChannelId { + self.id + } + fn name(&self) -> &'static str { + self.name + } + } + + impl EcssTmSenderCore for TmAsVecSenderWithId { + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { + self.sender.send_tm(tm) + } + } + + pub type TmAsVecSenderWithMpsc = TmAsVecSenderWithId>>; + pub type TmAsVecSenderWithBoundedMpsc = TmAsVecSenderWithId>>; + pub struct MpscTcReceiver { id: ChannelId, name: &'static str, @@ -472,7 +552,7 @@ pub mod std_mod { } impl EcssChannel for MpscTcReceiver { - fn id(&self) -> ChannelId { + fn channel_id(&self) -> ChannelId { self.id } @@ -486,7 +566,7 @@ pub mod std_mod { self.receiver.try_recv().map_err(|e| match e { TryRecvError::Empty => TryRecvTmtcError::Empty, TryRecvError::Disconnected => { - TryRecvTmtcError::Error(EcssTmtcError::from(GenericRecvError::TxDisconnected)) + TryRecvTmtcError::Tmtc(EcssTmtcError::from(GenericRecvError::TxDisconnected)) } }) } @@ -502,133 +582,89 @@ pub mod std_mod { } } - /// This class can be used if frequent heap allocations during run-time are not an issue. - /// PUS TM packets will be sent around as [Vec]s. Please note that the current implementation - /// of this class can not deal with store addresses, so it is assumed that is is always - /// going to be called with direct packets. - #[derive(Clone)] - pub struct MpscTmAsVecSender { - id: ChannelId, - name: &'static str, - sender: mpsc::Sender>, - } + #[cfg(feature = "crossbeam")] + pub mod cb_mod { + use super::*; + use crossbeam_channel as cb; - impl From>> for EcssTmtcError { - fn from(_: mpsc::SendError>) -> Self { - Self::Send(GenericSendError::RxDisconnected) - } - } + pub type TmInSharedPoolSenderWithCrossbeam = + TmInSharedPoolSenderWithId>; - impl MpscTmAsVecSender { - pub fn new(id: u32, name: &'static str, sender: mpsc::Sender>) -> Self { - Self { id, sender, name } + impl From> for EcssTmtcError { + fn from(_: cb::SendError) -> Self { + Self::Send(GenericSendError::RxDisconnected) + } } - } - impl EcssChannel for MpscTmAsVecSender { - fn id(&self) -> ChannelId { - self.id - } - fn name(&self) -> &'static str { - self.name - } - } - - impl EcssTmSenderCore for MpscTmAsVecSender { - fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { - match tm { - PusTmWrapper::InStore(addr) => Err(EcssTmtcError::CantSendAddr(addr)), - PusTmWrapper::Direct(tm) => { - let mut vec = Vec::new(); - tm.append_to_vec(&mut vec).map_err(EcssTmtcError::Pus)?; - self.sender.send(vec)?; - Ok(()) + impl From> for EcssTmtcError { + fn from(value: cb::TrySendError) -> Self { + match value { + cb::TrySendError::Full(_) => Self::Send(GenericSendError::QueueFull(None)), + cb::TrySendError::Disconnected(_) => { + Self::Send(GenericSendError::RxDisconnected) + } } } } - } - #[derive(Clone)] - pub struct CrossbeamTmInStoreSender { - id: ChannelId, - name: &'static str, - shared_tm_store: SharedTmPool, - sender: crossbeam_channel::Sender, - } - - impl CrossbeamTmInStoreSender { - pub fn new( - id: ChannelId, - name: &'static str, - shared_tm_store: SharedTmPool, - sender: crossbeam_channel::Sender, - ) -> Self { - Self { - id, - name, - shared_tm_store, - sender, + impl EcssTmSenderCore for cb::Sender { + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { + match tm { + PusTmWrapper::InStore(addr) => self + .try_send(addr) + .map_err(|e| EcssTmtcError::Send(e.into()))?, + PusTmWrapper::Direct(_) => return Err(EcssTmtcError::CantSendDirectTm), + }; + Ok(()) } } - } - - impl EcssChannel for CrossbeamTmInStoreSender { - fn id(&self) -> ChannelId { - self.id - } - - fn name(&self) -> &'static str { - self.name - } - } - - impl EcssTmSenderCore for CrossbeamTmInStoreSender { - fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { - match tm { - PusTmWrapper::InStore(addr) => self.sender.try_send(addr)?, - PusTmWrapper::Direct(tm) => { - let addr = self.shared_tm_store.add_pus_tm(&tm)?; - self.sender.try_send(addr)?; - } + impl EcssTmSenderCore for cb::Sender> { + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { + match tm { + PusTmWrapper::InStore(addr) => return Err(EcssTmtcError::CantSendAddr(addr)), + PusTmWrapper::Direct(tm) => self + .send(tm.to_vec()?) + .map_err(|e| EcssTmtcError::Send(e.into()))?, + }; + Ok(()) } - Ok(()) } - } - pub struct CrossbeamTcReceiver { - id: ChannelId, - name: &'static str, - receiver: cb::Receiver, - } - - impl CrossbeamTcReceiver { - pub fn new( + pub struct CrossbeamTcReceiver { id: ChannelId, name: &'static str, receiver: cb::Receiver, - ) -> Self { - Self { id, name, receiver } - } - } - - impl EcssChannel for CrossbeamTcReceiver { - fn id(&self) -> ChannelId { - self.id } - fn name(&self) -> &'static str { - self.name + impl CrossbeamTcReceiver { + pub fn new( + id: ChannelId, + name: &'static str, + receiver: cb::Receiver, + ) -> Self { + Self { id, name, receiver } + } } - } - impl EcssTcReceiverCore for CrossbeamTcReceiver { - fn recv_tc(&self) -> Result { - self.receiver.try_recv().map_err(|e| match e { - cb::TryRecvError::Empty => TryRecvTmtcError::Empty, - cb::TryRecvError::Disconnected => { - TryRecvTmtcError::Error(EcssTmtcError::from(GenericRecvError::TxDisconnected)) - } - }) + impl EcssChannel for CrossbeamTcReceiver { + fn channel_id(&self) -> ChannelId { + self.id + } + + fn name(&self) -> &'static str { + self.name + } + } + + impl EcssTcReceiverCore for CrossbeamTcReceiver { + fn recv_tc(&self) -> Result { + self.receiver.try_recv().map_err(|e| match e { + cb::TryRecvError::Empty => TryRecvTmtcError::Empty, + cb::TryRecvError::Disconnected => TryRecvTmtcError::Tmtc(EcssTmtcError::from( + GenericRecvError::TxDisconnected, + )), + }) + } } } @@ -906,7 +942,7 @@ pub mod std_mod { })) } Err(e) => match e { - TryRecvTmtcError::Error(e) => Err(PusPacketHandlingError::EcssTmtc(e)), + TryRecvTmtcError::Tmtc(e) => Err(PusPacketHandlingError::EcssTmtc(e)), TryRecvTmtcError::Empty => Ok(None), }, } @@ -949,6 +985,9 @@ pub mod tests { use crate::tmtc::tm_helper::SharedTmPool; use crate::TargetId; + use super::verification::std_mod::{ + VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender, + }; use super::verification::tests::{SharedVerificationMap, TestVerificationReporter}; use super::verification::{ TcStateAccepted, VerificationReporterCfg, VerificationReporterWithSender, @@ -956,8 +995,9 @@ pub mod tests { }; use super::{ EcssTcAndToken, EcssTcInSharedStoreConverter, EcssTcInVecConverter, GenericRoutingError, - MpscTcReceiver, MpscTmAsVecSender, MpscTmInSharedPoolSender, PusPacketHandlerResult, - PusPacketHandlingError, PusRoutingErrorHandler, PusServiceHelper, TcInMemory, + MpscTcReceiver, PusPacketHandlerResult, PusPacketHandlingError, PusRoutingErrorHandler, + PusServiceHelper, TcInMemory, TmAsVecSenderWithId, TmInSharedPoolSenderWithBoundedMpsc, + TmInSharedPoolSenderWithId, }; pub const TEST_APID: u16 = 0x101; @@ -1002,9 +1042,9 @@ pub mod tests { tm_buf: [u8; 2048], tc_pool: SharedStaticMemoryPool, tm_pool: SharedTmPool, - tc_sender: mpsc::Sender, + tc_sender: mpsc::SyncSender, tm_receiver: mpsc::Receiver, - verification_handler: VerificationReporterWithSender, + verification_handler: VerificationReporterWithSharedPoolMpscBoundedSender, } impl PusServiceHandlerWithSharedStoreCommon { @@ -1014,17 +1054,20 @@ pub mod tests { /// The PUS service handler is instantiated with a [EcssTcInStoreConverter]. pub fn new() -> ( Self, - PusServiceHelper, + PusServiceHelper< + EcssTcInSharedStoreConverter, + VerificationReporterWithSharedPoolMpscBoundedSender, + >, ) { let pool_cfg = StaticPoolConfig::new(alloc::vec![(16, 16), (8, 32), (4, 64)], false); let tc_pool = StaticMemoryPool::new(pool_cfg.clone()); let tm_pool = StaticMemoryPool::new(pool_cfg); let shared_tc_pool = SharedStaticMemoryPool::new(RwLock::new(tc_pool)); let shared_tm_pool = SharedTmPool::new(tm_pool); - let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel(); - let (tm_tx, tm_rx) = mpsc::channel(); + let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::sync_channel(10); + let (tm_tx, tm_rx) = mpsc::sync_channel(10); - let verif_sender = MpscTmInSharedPoolSender::new( + let verif_sender = TmInSharedPoolSenderWithBoundedMpsc::new( 0, "verif_sender", shared_tm_pool.clone(), @@ -1032,9 +1075,9 @@ pub mod tests { ); let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let verification_handler = - VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); + VerificationReporterWithSharedPoolMpscBoundedSender::new(&verif_cfg, verif_sender); let test_srv_tm_sender = - MpscTmInSharedPoolSender::new(0, "TEST_SENDER", shared_tm_pool.clone(), tm_tx); + TmInSharedPoolSenderWithId::new(0, "TEST_SENDER", shared_tm_pool.clone(), tm_tx); let test_srv_tc_receiver = MpscTcReceiver::new(0, "TEST_RECEIVER", test_srv_tc_rx); let in_store_converter = EcssTcInSharedStoreConverter::new(shared_tc_pool.clone(), 2048); @@ -1115,20 +1158,20 @@ pub mod tests { pub verification_handler: VerificationReporter, } - impl PusServiceHandlerWithVecCommon { + impl PusServiceHandlerWithVecCommon { pub fn new_with_standard_verif_reporter() -> ( Self, - PusServiceHelper, + PusServiceHelper, ) { let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel(); let (tm_tx, tm_rx) = mpsc::channel(); - let verif_sender = MpscTmAsVecSender::new(0, "verififcatio-sender", tm_tx.clone()); + let verif_sender = TmAsVecSenderWithId::new(0, "verififcatio-sender", tm_tx.clone()); let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let verification_handler = - VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); + VerificationReporterWithSender::new(&verif_cfg, verif_sender); - let test_srv_tm_sender = MpscTmAsVecSender::new(0, "test-sender", tm_tx); + let test_srv_tm_sender = TmAsVecSenderWithId::new(0, "test-sender", tm_tx); let test_srv_tc_receiver = MpscTcReceiver::new(0, "test-receiver", test_srv_tc_rx); let in_store_converter = EcssTcInVecConverter::default(); ( @@ -1157,7 +1200,7 @@ pub mod tests { let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel(); let (tm_tx, tm_rx) = mpsc::channel(); - let test_srv_tm_sender = MpscTmAsVecSender::new(0, "test-sender", tm_tx); + let test_srv_tm_sender = TmAsVecSenderWithId::new(0, "test-sender", tm_tx); let test_srv_tc_receiver = MpscTcReceiver::new(0, "test-receiver", test_srv_tc_rx); let in_store_converter = EcssTcInVecConverter::default(); let shared_verif_map = SharedVerificationMap::default(); diff --git a/satrs/src/pus/scheduler_srv.rs b/satrs/src/pus/scheduler_srv.rs index 72e8a7b..22958ed 100644 --- a/satrs/src/pus/scheduler_srv.rs +++ b/satrs/src/pus/scheduler_srv.rs @@ -174,7 +174,7 @@ impl< mod tests { use crate::pool::{StaticMemoryPool, StaticPoolConfig}; use crate::pus::tests::TEST_APID; - use crate::pus::verification::VerificationReporterWithSender; + use crate::pus::verification::VerificationReporterWithSharedPoolMpscBoundedSender; use crate::pus::{ scheduler::{self, PusSchedulerProvider, TcInfo}, tests::{PusServiceHandlerWithSharedStoreCommon, PusTestHarness}, @@ -199,7 +199,7 @@ mod tests { common: PusServiceHandlerWithSharedStoreCommon, handler: PusService11SchedHandler< EcssTcInSharedStoreConverter, - VerificationReporterWithSender, + VerificationReporterWithSharedPoolMpscBoundedSender, TestScheduler, >, sched_tc_pool: StaticMemoryPool, diff --git a/satrs/src/pus/test.rs b/satrs/src/pus/test.rs index bfea48b..0eef92b 100644 --- a/satrs/src/pus/test.rs +++ b/satrs/src/pus/test.rs @@ -104,7 +104,10 @@ mod tests { PusServiceHandlerWithSharedStoreCommon, PusServiceHandlerWithVecCommon, PusTestHarness, SimplePusPacketHandler, TEST_APID, }; - use crate::pus::verification::{RequestId, VerificationReporterWithSender}; + use crate::pus::verification::std_mod::{ + VerificationReporterWithSharedPoolMpscBoundedSender, VerificationReporterWithVecMpscSender, + }; + use crate::pus::verification::RequestId; use crate::pus::verification::{TcStateAccepted, VerificationToken}; use crate::pus::{ EcssTcInSharedStoreConverter, EcssTcInVecConverter, PusPacketHandlerResult, @@ -120,8 +123,10 @@ mod tests { struct Pus17HandlerWithStoreTester { common: PusServiceHandlerWithSharedStoreCommon, - handler: - PusService17TestHandler, + handler: PusService17TestHandler< + EcssTcInSharedStoreConverter, + VerificationReporterWithSharedPoolMpscBoundedSender, + >, } impl Pus17HandlerWithStoreTester { @@ -158,8 +163,9 @@ mod tests { } struct Pus17HandlerWithVecTester { - common: PusServiceHandlerWithVecCommon, - handler: PusService17TestHandler, + common: PusServiceHandlerWithVecCommon, + handler: + PusService17TestHandler, } impl Pus17HandlerWithVecTester { diff --git a/satrs/src/pus/verification.rs b/satrs/src/pus/verification.rs index 369b46e..89ce9b9 100644 --- a/satrs/src/pus/verification.rs +++ b/satrs/src/pus/verification.rs @@ -20,7 +20,7 @@ //! VerificationReportingProvider, VerificationReporterCfg, VerificationReporterWithSender //! }; //! use satrs::seq_count::SeqCountProviderSimple; -//! use satrs::pus::MpscTmInSharedPoolSender; +//! use satrs::pus::TmInSharedPoolSenderWithMpsc; //! use satrs::tmtc::tm_helper::SharedTmPool; //! use spacepackets::ecss::PusPacket; //! use spacepackets::SpHeader; @@ -35,9 +35,9 @@ //! let shared_tm_store = SharedTmPool::new(tm_pool); //! let tm_store = shared_tm_store.clone_backing_pool(); //! let (verif_tx, verif_rx) = mpsc::channel(); -//! let sender = MpscTmInSharedPoolSender::new(0, "Test Sender", shared_tm_store, verif_tx); +//! let sender = TmInSharedPoolSenderWithMpsc::new(0, "Test Sender", shared_tm_store, verif_tx); //! let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); -//! let mut reporter = VerificationReporterWithSender::new(&cfg , Box::new(sender)); +//! let mut reporter = VerificationReporterWithSender::new(&cfg , sender); //! //! let mut sph = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap(); //! let tc_header = PusTcSecondaryHeader::new_simple(17, 1); @@ -95,10 +95,11 @@ pub use crate::seq_count::SeqCountProviderSimple; pub use spacepackets::ecss::verification::*; #[cfg(feature = "alloc")] -pub use alloc_mod::{ - VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender, -}; +#[cfg_attr(feature = "doc_cfg", doc(cfg(feature = "alloc")))] +pub use alloc_mod::*; + #[cfg(feature = "std")] +#[cfg_attr(feature = "doc_cfg", doc(cfg(feature = "std")))] pub use std_mod::*; /// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard. @@ -949,15 +950,13 @@ impl VerificationReporterCore { } #[cfg(feature = "alloc")] -mod alloc_mod { +pub mod alloc_mod { use super::*; - use crate::pus::alloc_mod::EcssTmSender; - use crate::seq_count::SequenceCountProvider; - use alloc::boxed::Box; - use alloc::vec; - use alloc::vec::Vec; + use crate::{ + pus::{TmAsVecSenderWithId, TmInSharedPoolSenderWithId}, + seq_count::SequenceCountProvider, + }; use core::cell::RefCell; - use spacepackets::ecss::tc::IsPusTelecommand; #[derive(Clone)] pub struct VerificationReporterCfg { @@ -992,9 +991,9 @@ mod alloc_mod { /// TM funnel. This helper will always set those fields to 0. #[derive(Clone)] pub struct VerificationReporter { - source_data_buf: RefCell>, - pub seq_count_provider: Option + Send>>, - pub msg_count_provider: Option + Send>>, + source_data_buf: RefCell>, + pub seq_count_provider: Option + Send>>, + pub msg_count_provider: Option + Send>>, pub reporter: VerificationReporterCore, } @@ -1002,7 +1001,7 @@ mod alloc_mod { pub fn new(cfg: &VerificationReporterCfg) -> Self { let reporter = VerificationReporterCore::new(cfg.apid).unwrap(); Self { - source_data_buf: RefCell::new(vec![ + source_data_buf: RefCell::new(alloc::vec![ 0; RequestId::SIZE_AS_BYTES + cfg.step_field_width @@ -1269,21 +1268,18 @@ mod alloc_mod { /// Helper object which caches the sender passed as a trait object. Provides the same /// API as [VerificationReporter] but without the explicit sender arguments. #[derive(Clone)] - pub struct VerificationReporterWithSender { + pub struct VerificationReporterWithSender { pub reporter: VerificationReporter, - pub sender: Box, + pub sender: Sender, } - impl VerificationReporterWithSender { - pub fn new(cfg: &VerificationReporterCfg, sender: Box) -> Self { + impl VerificationReporterWithSender { + pub fn new(cfg: &VerificationReporterCfg, sender: Sender) -> Self { let reporter = VerificationReporter::new(cfg); Self::new_from_reporter(reporter, sender) } - pub fn new_from_reporter( - reporter: VerificationReporter, - sender: Box, - ) -> Self { + pub fn new_from_reporter(reporter: VerificationReporter, sender: Sender) -> Self { Self { reporter, sender } } @@ -1297,7 +1293,9 @@ mod alloc_mod { } } - impl VerificationReportingProvider for VerificationReporterWithSender { + impl VerificationReportingProvider + for VerificationReporterWithSender + { delegate! { to self.reporter { fn add_tc( @@ -1315,7 +1313,7 @@ mod alloc_mod { ) -> Result, VerificationOrSendErrorWithToken> { self.reporter - .acceptance_success(token, self.sender.as_ref(), time_stamp) + .acceptance_success(token, &self.sender, time_stamp) } fn acceptance_failure( @@ -1324,7 +1322,7 @@ mod alloc_mod { params: FailParams, ) -> Result<(), VerificationOrSendErrorWithToken> { self.reporter - .acceptance_failure(token, self.sender.as_ref(), params) + .acceptance_failure(token, &self.sender, params) } fn start_success( @@ -1335,8 +1333,7 @@ mod alloc_mod { VerificationToken, VerificationOrSendErrorWithToken, > { - self.reporter - .start_success(token, self.sender.as_ref(), time_stamp) + self.reporter.start_success(token, &self.sender, time_stamp) } fn start_failure( @@ -1344,8 +1341,7 @@ mod alloc_mod { token: VerificationToken, params: FailParams, ) -> Result<(), VerificationOrSendErrorWithToken> { - self.reporter - .start_failure(token, self.sender.as_ref(), params) + self.reporter.start_failure(token, &self.sender, params) } fn step_success( @@ -1355,7 +1351,7 @@ mod alloc_mod { step: impl EcssEnumeration, ) -> Result<(), EcssTmtcError> { self.reporter - .step_success(token, self.sender.as_ref(), time_stamp, step) + .step_success(token, &self.sender, time_stamp, step) } fn step_failure( @@ -1363,8 +1359,7 @@ mod alloc_mod { token: VerificationToken, params: FailParamsWithStep, ) -> Result<(), VerificationOrSendErrorWithToken> { - self.reporter - .step_failure(token, self.sender.as_ref(), params) + self.reporter.step_failure(token, &self.sender, params) } fn completion_success( @@ -1373,7 +1368,7 @@ mod alloc_mod { time_stamp: &[u8], ) -> Result<(), VerificationOrSendErrorWithToken> { self.reporter - .completion_success(token, self.sender.as_ref(), time_stamp) + .completion_success(token, &self.sender, time_stamp) } fn completion_failure( @@ -1382,18 +1377,34 @@ mod alloc_mod { params: FailParams, ) -> Result<(), VerificationOrSendErrorWithToken> { self.reporter - .completion_failure(token, self.sender.as_ref(), params) + .completion_failure(token, &self.sender, params) } } + + pub type VerificationReporterWithSharedPoolSender = + VerificationReporterWithSender>; + pub type VerificationReporterWithVecSender = + VerificationReporterWithSender>; } #[cfg(feature = "std")] -mod std_mod { - use crate::pus::verification::VerificationReporterWithSender; - use std::sync::{Arc, Mutex}; +pub mod std_mod { + use std::sync::mpsc; - pub type StdVerifReporterWithSender = VerificationReporterWithSender; - pub type SharedStdVerifReporterWithSender = Arc>; + use crate::pool::StoreAddr; + + use super::alloc_mod::{ + VerificationReporterWithSharedPoolSender, VerificationReporterWithVecSender, + }; + + pub type VerificationReporterWithSharedPoolMpscSender = + VerificationReporterWithSharedPoolSender>; + pub type VerificationReporterWithSharedPoolMpscBoundedSender = + VerificationReporterWithSharedPoolSender>; + pub type VerificationReporterWithVecMpscSender = + VerificationReporterWithVecSender>>; + pub type VerificationReporterWithVecMpscBoundedSender = + VerificationReporterWithVecSender>>; } #[cfg(test)] @@ -1405,10 +1416,11 @@ pub mod tests { VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender, VerificationToken, }; - use crate::pus::{EcssChannel, MpscTmInSharedPoolSender, PusTmWrapper}; + use crate::pus::{ + EcssChannel, PusTmWrapper, TmInSharedPoolSenderWithId, TmInSharedPoolSenderWithMpsc, + }; use crate::tmtc::tm_helper::SharedTmPool; use crate::ChannelId; - use alloc::boxed::Box; use alloc::format; use alloc::sync::Arc; use hashbrown::HashMap; @@ -1637,7 +1649,7 @@ pub mod tests { } impl EcssChannel for TestSender { - fn id(&self) -> ChannelId { + fn channel_id(&self) -> ChannelId { 0 } fn name(&self) -> &'static str { @@ -1688,13 +1700,13 @@ pub mod tests { &mut self.vr } } - struct TestBaseWithHelper<'a> { - helper: VerificationReporterWithSender, + struct TestBaseWithHelper<'a, Sender: EcssTmSenderCore + Clone + 'static> { + helper: VerificationReporterWithSender, #[allow(dead_code)] tc: PusTcCreator<'a>, } - impl<'a> TestBaseWithHelper<'a> { + impl<'a, Sender: EcssTmSenderCore + Clone + 'static> TestBaseWithHelper<'a, Sender> { fn rep(&mut self) -> &mut VerificationReporter { &mut self.helper.reporter } @@ -1725,12 +1737,15 @@ pub mod tests { (TestBase { vr: reporter, tc }, init_tok) } - fn base_with_helper_init() -> (TestBaseWithHelper<'static>, VerificationToken) { + fn base_with_helper_init() -> ( + TestBaseWithHelper<'static, TestSender>, + VerificationToken, + ) { let mut reporter = base_reporter(); let (tc, _) = base_tc_init(None); let init_tok = reporter.add_tc(&tc); let sender = TestSender::default(); - let helper = VerificationReporterWithSender::new_from_reporter(reporter, Box::new(sender)); + let helper = VerificationReporterWithSender::new_from_reporter(reporter, sender); (TestBaseWithHelper { helper, tc }, init_tok) } @@ -1758,7 +1773,7 @@ pub mod tests { let shared_tm_store = SharedTmPool::new(pool); let (tx, _) = mpsc::channel(); let mpsc_verif_sender = - MpscTmInSharedPoolSender::new(0, "verif_sender", shared_tm_store, tx); + TmInSharedPoolSenderWithMpsc::new(0, "verif_sender", shared_tm_store, tx); is_send(&mpsc_verif_sender); } @@ -1785,8 +1800,7 @@ pub mod tests { b.helper .acceptance_success(tok, &EMPTY_STAMP) .expect("Sending acceptance success failed"); - let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); - acceptance_check(sender, &tok.req_id); + acceptance_check(&mut b.helper.sender, &tok.req_id); } fn acceptance_fail_check(sender: &mut TestSender, req_id: RequestId, stamp_buf: [u8; 7]) { @@ -1830,8 +1844,7 @@ pub mod tests { b.helper .acceptance_failure(tok, fail_params) .expect("Sending acceptance success failed"); - let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); - acceptance_fail_check(sender, tok.req_id, stamp_buf); + acceptance_fail_check(&mut b.helper.sender, tok.req_id, stamp_buf); } #[test] @@ -1961,8 +1974,7 @@ pub mod tests { b.helper .start_failure(accepted_token, fail_params) .expect("Start failure failure"); - let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); - start_fail_check(sender, tok.req_id, fail_data_raw); + start_fail_check(&mut b.helper.sender, tok.req_id, fail_data_raw); } fn step_success_check(sender: &mut TestSender, req_id: RequestId) { @@ -2059,9 +2071,8 @@ pub mod tests { b.helper .step_success(&started_token, &EMPTY_STAMP, EcssEnumU8::new(1)) .expect("Sending step 1 success failed"); - let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); - assert_eq!(sender.service_queue.borrow().len(), 4); - step_success_check(sender, tok.req_id); + assert_eq!(b.helper.sender.service_queue.borrow().len(), 4); + step_success_check(&mut b.helper.sender, tok.req_id); } fn check_step_failure(sender: &mut TestSender, req_id: RequestId, fail_data_raw: [u8; 4]) { @@ -2191,8 +2202,7 @@ pub mod tests { b.helper .step_failure(started_token, fail_params) .expect("Step failure failed"); - let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); - check_step_failure(sender, req_id, fail_data_raw); + check_step_failure(&mut b.helper.sender, req_id, fail_data_raw); } fn completion_fail_check(sender: &mut TestSender, req_id: RequestId) { @@ -2278,8 +2288,7 @@ pub mod tests { b.helper .completion_failure(started_token, fail_params) .expect("Completion failure"); - let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); - completion_fail_check(sender, req_id); + completion_fail_check(&mut b.helper.sender, req_id); } fn completion_success_check(sender: &mut TestSender, req_id: RequestId) { @@ -2355,8 +2364,7 @@ pub mod tests { b.helper .completion_success(started_token, &EMPTY_STAMP) .expect("Sending completion success failed"); - let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); - completion_success_check(sender, tok.req_id); + completion_success_check(&mut b.helper.sender, tok.req_id); } #[test] @@ -2368,9 +2376,9 @@ pub mod tests { let shared_tm_pool = shared_tm_store.clone_backing_pool(); let (verif_tx, verif_rx) = mpsc::channel(); let sender = - MpscTmInSharedPoolSender::new(0, "Verification Sender", shared_tm_store, verif_tx); + TmInSharedPoolSenderWithId::new(0, "Verification Sender", shared_tm_store, verif_tx); let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); - let mut reporter = VerificationReporterWithSender::new(&cfg, Box::new(sender)); + let mut reporter = VerificationReporterWithSender::new(&cfg, sender); let mut sph = SpHeader::tc_unseg(TEST_APID, 0, 0).unwrap(); let tc_header = PusTcSecondaryHeader::new_simple(17, 1); diff --git a/satrs/src/queue.rs b/satrs/src/queue.rs index 25ff6c6..5ba4bdc 100644 --- a/satrs/src/queue.rs +++ b/satrs/src/queue.rs @@ -1,6 +1,8 @@ use core::fmt::{Display, Formatter}; #[cfg(feature = "std")] use std::error::Error; +#[cfg(feature = "std")] +use std::sync::mpsc; /// Generic error type for sending something via a message queue. #[derive(Debug, Copy, Clone)] @@ -47,3 +49,37 @@ impl Display for GenericRecvError { #[cfg(feature = "std")] impl Error for GenericRecvError {} + +#[cfg(feature = "std")] +impl From> for GenericSendError { + fn from(_: mpsc::SendError) -> Self { + GenericSendError::RxDisconnected + } +} + +#[cfg(feature = "std")] +impl From> for GenericSendError { + fn from(err: mpsc::TrySendError) -> Self { + match err { + mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None), + mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, + } + } +} + +#[cfg(feature = "crossbeam")] +impl From> for GenericSendError { + fn from(_: crossbeam_channel::SendError) -> Self { + GenericSendError::RxDisconnected + } +} + +#[cfg(feature = "crossbeam")] +impl From> for GenericSendError { + fn from(err: crossbeam_channel::TrySendError) -> Self { + match err { + crossbeam_channel::TrySendError::Full(_) => GenericSendError::QueueFull(None), + crossbeam_channel::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected, + } + } +} diff --git a/satrs/tests/pus_events.rs b/satrs/tests/pus_events.rs index 4c6cc9a..ca6d71e 100644 --- a/satrs/tests/pus_events.rs +++ b/satrs/tests/pus_events.rs @@ -5,10 +5,10 @@ use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo}; use satrs::params::U32Pair; use satrs::params::{Params, ParamsHeapless, WritableToBeBytes}; use satrs::pus::event_man::{DefaultPusEventMgmtBackend, EventReporter, PusEventDispatcher}; -use satrs::pus::MpscTmAsVecSender; +use satrs::pus::TmAsVecSenderWithMpsc; use spacepackets::ecss::tm::PusTmReader; use spacepackets::ecss::{PusError, PusPacket}; -use std::sync::mpsc::{channel, SendError, TryRecvError}; +use std::sync::mpsc::{self, SendError, TryRecvError}; use std::thread; const INFO_EVENT: EventU32TypedSev = @@ -24,21 +24,21 @@ pub enum CustomTmSenderError { #[test] fn test_threaded_usage() { - let (event_sender, event_man_receiver) = channel(); + let (event_sender, event_man_receiver) = mpsc::channel(); let event_receiver = MpscEventU32Receiver::new(event_man_receiver); let mut event_man = EventManagerWithMpsc::new(event_receiver); - let (pus_event_man_tx, pus_event_man_rx) = channel(); + let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel(); let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx); event_man.subscribe_all(pus_event_man_send_provider.channel_id()); event_man.add_sender(pus_event_man_send_provider); - let (event_tx, event_rx) = channel(); + let (event_tx, event_rx) = mpsc::channel(); let reporter = EventReporter::new(0x02, 128).expect("Creating event reporter failed"); let mut pus_event_man = PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default()); // PUS + Generic event manager thread let jh0 = thread::spawn(move || { - let mut sender = MpscTmAsVecSender::new(0, "event_sender", event_tx); + let mut sender = TmAsVecSenderWithMpsc::new(0, "event_sender", event_tx); let mut event_cnt = 0; let mut params_array: [u8; 128] = [0; 128]; loop { diff --git a/satrs/tests/pus_verification.rs b/satrs/tests/pus_verification.rs index dd72394..386fea6 100644 --- a/satrs/tests/pus_verification.rs +++ b/satrs/tests/pus_verification.rs @@ -6,7 +6,7 @@ pub mod crossbeam_test { FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender, VerificationReportingProvider, }; - use satrs::pus::CrossbeamTmInStoreSender; + use satrs::pus::TmInSharedPoolSenderWithCrossbeam; use satrs::tmtc::tm_helper::SharedTmPool; use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader}; use spacepackets::ecss::tm::PusTmReader; @@ -40,10 +40,13 @@ pub mod crossbeam_test { let shared_tc_pool_0 = Arc::new(RwLock::new(StaticMemoryPool::new(pool_cfg))); let shared_tc_pool_1 = shared_tc_pool_0.clone(); let (tx, rx) = crossbeam_channel::bounded(10); - let sender = - CrossbeamTmInStoreSender::new(0, "verif_sender", shared_tm_pool.clone(), tx.clone()); - let mut reporter_with_sender_0 = - VerificationReporterWithSender::new(&cfg, Box::new(sender)); + let sender = TmInSharedPoolSenderWithCrossbeam::new( + 0, + "verif_sender", + shared_tm_pool.clone(), + tx.clone(), + ); + let mut reporter_with_sender_0 = VerificationReporterWithSender::new(&cfg, sender); let mut reporter_with_sender_1 = reporter_with_sender_0.clone(); // For test purposes, we retrieve the request ID from the TCs and pass them to the receiver // tread.