diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index 7c721ee..5c62375 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -318,10 +318,31 @@ mod alloc_mod { /// [Clone]. #[cfg(feature = "alloc")] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] - pub trait EcssTmSender: EcssTmSenderCore + Downcast + DynClone {} + pub trait EcssTmSender: EcssTmSenderCore + Downcast + DynClone { + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast(&self) -> &dyn EcssTmSenderCore; + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast_mut(&mut self) -> &mut dyn EcssTmSenderCore; + } /// Blanket implementation for all types which implement [EcssTmSenderCore] and are clonable. - impl EcssTmSender for T where T: EcssTmSenderCore + Clone + 'static {} + impl EcssTmSender for T + where + T: EcssTmSenderCore + Clone + 'static, + { + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast(&self) -> &dyn EcssTmSenderCore { + self + } + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast_mut(&mut self) -> &mut dyn EcssTmSenderCore { + self + } + } dyn_clone::clone_trait_object!(EcssTmSender); impl_downcast!(EcssTmSender); diff --git a/satrs-example/src/ccsds.rs b/satrs-example/src/ccsds.rs index d0616c9..8dbf0c3 100644 --- a/satrs-example/src/ccsds.rs +++ b/satrs-example/src/ccsds.rs @@ -1,15 +1,22 @@ -use crate::tmtc::{MpscStoreAndSendError, PusTcSource}; +use satrs_core::pus::ReceivesEcssPusTc; use satrs_core::spacepackets::{CcsdsPacket, SpHeader}; use satrs_core::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; use satrs_example::PUS_APID; #[derive(Clone)] -pub struct CcsdsReceiver { - pub tc_source: PusTcSource, +pub struct CcsdsReceiver< + TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone, + E, +> { + pub tc_source: TcSource, } -impl CcsdsPacketHandler for CcsdsReceiver { - type Error = MpscStoreAndSendError; +impl< + TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone + 'static, + E: 'static, + > CcsdsPacketHandler for CcsdsReceiver +{ + type Error = E; fn valid_apids(&self) -> &'static [u16] { &[PUS_APID] diff --git a/satrs-example/src/events.rs b/satrs-example/src/events.rs index 5f48d53..5a4697b 100644 --- a/satrs-example/src/events.rs +++ b/satrs-example/src/events.rs @@ -7,20 +7,17 @@ use satrs_core::{ }, events::EventU32, params::Params, - pool::StoreAddr, pus::{ event_man::{ DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, PusEventDispatcher, }, verification::{TcStateStarted, VerificationReporterWithSender, VerificationToken}, - MpscTmInStoreSender, + EcssTmSender, }, spacepackets::time::cds::{self, TimeProvider}, - tmtc::tm_helper::SharedTmStore, - ChannelId, }; -use satrs_example::{TmSenderId, PUS_APID}; +use satrs_example::PUS_APID; use crate::update_time; @@ -30,19 +27,20 @@ pub struct PusEventHandler { event_request_rx: mpsc::Receiver, pus_event_dispatcher: PusEventDispatcher<(), EventU32>, pus_event_man_rx: mpsc::Receiver<(EventU32, Option)>, - tm_sender: MpscTmInStoreSender, + tm_sender: Box, time_provider: TimeProvider, timestamp: [u8; 7], verif_handler: VerificationReporterWithSender, } +/* +*/ impl PusEventHandler { pub fn new( - shared_tm_store: SharedTmStore, - tm_funnel_tx: mpsc::Sender, verif_handler: VerificationReporterWithSender, event_manager: &mut MpscEventManager, event_request_rx: mpsc::Receiver, + tm_sender: impl EcssTmSender, ) -> Self { let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel(); @@ -64,12 +62,7 @@ impl PusEventHandler { time_provider: cds::TimeProvider::new_with_u16_days(0, 0), timestamp: [0; 7], verif_handler, - tm_sender: MpscTmInStoreSender::new( - TmSenderId::AllEvents as ChannelId, - "ALL_EVENTS_TX", - shared_tm_store, - tm_funnel_tx, - ), + tm_sender: Box::new(tm_sender), } } @@ -109,7 +102,12 @@ impl PusEventHandler { if let Ok((event, _param)) = self.pus_event_man_rx.try_recv() { update_time(&mut self.time_provider, &mut self.timestamp); self.pus_event_dispatcher - .generate_pus_event_tm_generic(&mut self.tm_sender, &self.timestamp, event, None) + .generate_pus_event_tm_generic( + self.tm_sender.upcast_mut(), + &self.timestamp, + event, + None, + ) .expect("Sending TM as event failed"); } } @@ -155,18 +153,16 @@ pub struct EventHandler { impl EventHandler { pub fn new( - shared_tm_store: SharedTmStore, - tm_funnel_tx: mpsc::Sender, + tm_sender: impl EcssTmSender, verif_handler: VerificationReporterWithSender, event_request_rx: mpsc::Receiver, ) -> Self { let mut event_man_wrapper = EventManagerWrapper::new(); let pus_event_handler = PusEventHandler::new( - shared_tm_store, - tm_funnel_tx, verif_handler, event_man_wrapper.event_manager(), event_request_rx, + tm_sender, ); Self { event_man_wrapper, diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index d96ec59..0f1f6ad 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -14,8 +14,11 @@ use crate::events::EventHandler; use crate::pus::stack::PusStack; use crate::tm_funnel::TmFunnel; use log::info; +use pus::test::create_test_service_dynamic; use satrs_core::hal::std::tcp_server::ServerConfig; use satrs_core::hal::std::udp_server::UdpTcServer; +use tmtc::PusTcSourceDynamic; +use udp::DynamicUdpTmHandler; use crate::acs::AcsTask; use crate::ccsds::CcsdsReceiver; @@ -24,16 +27,19 @@ use crate::pus::action::create_action_service; use crate::pus::event::create_event_service; use crate::pus::hk::create_hk_service; use crate::pus::scheduler::create_scheduler_service; -use crate::pus::test::create_test_service; +use crate::pus::test::create_test_service_static; use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::requests::RequestWithToken; use crate::tcp::{SyncTcpTmSource, TcpTask}; -use crate::tmtc::{PusTcSource, SharedTcPool, TcArgs, TmArgs, TmtcTask}; -use crate::udp::UdpTmtcServer; +use crate::tmtc::{ + MpscStoreAndSendError, PusTcSourceStaticPool, SharedTcPool, TcArgs, TmArgs, TmtcTaskDynamic, + TmtcTaskStatic, +}; +use crate::udp::{StaticUdpTmHandler, UdpTmtcServer}; use satrs_core::pool::{StaticMemoryPool, StaticPoolConfig}; use satrs_core::pus::event_man::EventRequestWithToken; use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; -use satrs_core::pus::MpscTmInStoreSender; +use satrs_core::pus::{EcssTmSender, MpscTmAsVecSender, MpscTmInStoreSender}; use satrs_core::spacepackets::{time::cds::TimeProvider, time::TimeWriter}; use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::{CcsdsDistributor, TargetId}; @@ -48,29 +54,38 @@ use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; -fn main() { - setup_logger().expect("setting up logging with fern failed"); - println!("Running OBSW example"); - let tm_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ - (30, 32), - (15, 64), - (15, 128), - (15, 256), - (15, 1024), - (15, 2048), - ])); - let shared_tm_store = SharedTmStore::new(tm_pool); - let tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ - (30, 32), - (15, 64), - (15, 128), - (15, 256), - (15, 1024), - (15, 2048), - ])); - let shared_tc_pool = SharedTcPool { - pool: Arc::new(RwLock::new(tc_pool)), - }; +const USE_STATIC_POOLS: bool = true; + +fn create_static_pools() -> (StaticMemoryPool, StaticMemoryPool) { + ( + StaticMemoryPool::new(StaticPoolConfig::new(vec![ + (30, 32), + (15, 64), + (15, 128), + (15, 256), + (15, 1024), + (15, 2048), + ])), + StaticMemoryPool::new(StaticPoolConfig::new(vec![ + (30, 32), + (15, 64), + (15, 128), + (15, 256), + (15, 1024), + (15, 2048), + ])), + ) +} + +fn create_verification_reporter(verif_sender: impl EcssTmSender) -> VerificationReporterWithSender { + let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap(); + // Every software component which needs to generate verification telemetry, gets a cloned + // verification reporter. + VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)) +} + +fn static_tmtc_pool_main() { + let (tm_pool, tc_pool) = create_static_pools(); let sched_tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ (30, 32), (15, 64), @@ -79,28 +94,30 @@ fn main() { (15, 1024), (15, 2048), ])); - + let shared_tm_store = SharedTmStore::new(tm_pool); + let shared_tc_pool = SharedTcPool { + pool: Arc::new(RwLock::new(tc_pool)), + }; let (tc_source_tx, tc_source_rx) = channel(); let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel(); - let verif_sender = MpscTmInStoreSender::new( + + // Every software component which needs to generate verification telemetry, receives a cloned + // verification reporter. + let verif_reporter = create_verification_reporter(MpscTmInStoreSender::new( TmSenderId::PusVerification as ChannelId, "verif_sender", shared_tm_store.clone(), tm_funnel_tx.clone(), - ); - let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap(); - // Every software component which needs to generate verification telemetry, gets a cloned - // verification reporter. - let verif_reporter = VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); + )); + let acs_target_id = TargetIdWithApid::new(PUS_APID, RequestTargetId::AcsSubsystem as TargetId); + let (acs_thread_tx, acs_thread_rx) = channel::(); // Some request are targetable. This map is used to retrieve sender handles based on a target ID. let mut request_map = HashMap::new(); - let (acs_thread_tx, acs_thread_rx) = channel::(); - let target_apid = TargetIdWithApid::new(PUS_APID, RequestTargetId::AcsSubsystem as TargetId); - request_map.insert(target_apid, acs_thread_tx); + request_map.insert(acs_target_id, acs_thread_tx); - let tc_source_wrapper = PusTcSource { + let tc_source_wrapper = PusTcSourceStaticPool { tc_store: shared_tc_pool.clone(), tc_source: tc_source_tx, }; @@ -120,11 +137,16 @@ fn main() { // These sender handles are used to send event requests, for example to enable or disable // certain events. let (event_request_tx, event_request_rx) = mpsc::channel::(); + // The event task is the core handler to perform the event routing and TM handling as specified // in the sat-rs documentation. let mut event_handler = EventHandler::new( - shared_tm_store.clone(), - tm_funnel_tx.clone(), + MpscTmInStoreSender::new( + TmSenderId::AllEvents as ChannelId, + "ALL_EVENTS_TX", + shared_tm_store.clone(), + tm_funnel_tx.clone(), + ), verif_reporter.clone(), event_request_rx, ); @@ -141,7 +163,7 @@ fn main() { hk_service_receiver: pus_hk_tx, action_service_receiver: pus_action_tx, }; - let pus_test_service = create_test_service( + let pus_test_service = create_test_service_static( shared_tm_store.clone(), tm_funnel_tx.clone(), verif_reporter.clone(), @@ -192,7 +214,7 @@ fn main() { let ccsds_receiver = CcsdsReceiver { tc_source: tc_args.tc_source.clone(), }; - let mut tmtc_task = TmtcTask::new( + let mut tmtc_task = TmtcTaskStatic::new( tc_args, PusReceiver::new(verif_reporter.clone(), pus_router), ); @@ -203,8 +225,10 @@ fn main() { .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, - tm_rx: tm_args.tm_udp_server_rx, - tm_store: tm_args.tm_store.clone_backing_pool(), + tm_handler: StaticUdpTmHandler { + tm_rx: tm_args.tm_udp_server_rx, + tm_store: tm_args.tm_store.clone_backing_pool(), + }, }; let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); @@ -301,6 +325,127 @@ fn main() { jh4.join().expect("Joining PUS handler thread failed"); } +fn dyn_tmtc_pool_main() { + let (tc_source_tx, tc_source_rx) = channel(); + let (tm_funnel_tx, tm_funnel_rx) = channel(); + let (tm_server_tx, tm_server_rx) = channel(); + // Every software component which needs to generate verification telemetry, gets a cloned + // verification reporter. + let verif_reporter = create_verification_reporter(MpscTmAsVecSender::new( + TmSenderId::PusVerification as ChannelId, + "verif_sender", + tm_funnel_tx.clone(), + )); + + let acs_target_id = TargetIdWithApid::new(PUS_APID, RequestTargetId::AcsSubsystem as TargetId); + let (acs_thread_tx, acs_thread_rx) = channel::(); + // Some request are targetable. This map is used to retrieve sender handles based on a target ID. + let mut request_map = HashMap::new(); + request_map.insert(acs_target_id, acs_thread_tx); + + let tc_source = PusTcSourceDynamic { + tc_source: tc_source_tx, + }; + + // Create event handling components + // These sender handles are used to send event requests, for example to enable or disable + // certain events. + let (event_request_tx, event_request_rx) = mpsc::channel::(); + // The event task is the core handler to perform the event routing and TM handling as specified + // in the sat-rs documentation. + let mut event_handler = EventHandler::new( + MpscTmAsVecSender::new( + TmSenderId::AllEvents as ChannelId, + "ALL_EVENTS_TX", + tm_funnel_tx.clone(), + ), + verif_reporter.clone(), + event_request_rx, + ); + + let (pus_test_tx, pus_test_rx) = channel(); + let (pus_event_tx, pus_event_rx) = channel(); + let (pus_sched_tx, pus_sched_rx) = channel(); + let (pus_hk_tx, pus_hk_rx) = channel(); + let (pus_action_tx, pus_action_rx) = channel(); + let pus_router = PusTcMpscRouter { + test_service_receiver: pus_test_tx, + event_service_receiver: pus_event_tx, + sched_service_receiver: pus_sched_tx, + hk_service_receiver: pus_hk_tx, + action_service_receiver: pus_action_tx, + }; + + let pus_test_service = create_test_service_dynamic( + tm_funnel_tx.clone(), + verif_reporter.clone(), + event_handler.clone_event_sender(), + pus_test_rx, + ); + + let ccsds_receiver = CcsdsReceiver { tc_source }; + + let mut tmtc_task = TmtcTaskDynamic::new( + tc_source_rx, + PusReceiver::new(verif_reporter.clone(), pus_router), + ); + + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); + let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone())); + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) + .expect("creating UDP TMTC server failed"); + let mut udp_tmtc_server = UdpTmtcServer { + udp_tc_server, + tm_handler: DynamicUdpTmHandler { + tm_rx: tm_server_rx, + }, + }; + + let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); + let sync_tm_tcp_source = SyncTcpTmSource::new(200); + let mut tcp_server = TcpTask::new( + tcp_server_cfg, + sync_tm_tcp_source.clone(), + tcp_ccsds_distributor, + ) + .expect("tcp server creation failed"); + + info!("Starting TMTC and UDP task"); + let jh_udp_tmtc = thread::Builder::new() + .name("TMTC and UDP".to_string()) + .spawn(move || { + info!("Running UDP server on port {SERVER_PORT}"); + loop { + udp_tmtc_server.periodic_operation(); + tmtc_task.periodic_operation(); + thread::sleep(Duration::from_millis(400)); + } + }) + .unwrap(); + + info!("Starting TCP task"); + let jh_tcp = thread::Builder::new() + .name("TCP".to_string()) + .spawn(move || { + info!("Running TCP server on port {SERVER_PORT}"); + loop { + tcp_server.periodic_operation(); + } + }) + .unwrap(); +} + +fn main() { + setup_logger().expect("setting up logging with fern failed"); + println!("Running OBSW example"); + if USE_STATIC_POOLS { + static_tmtc_pool_main(); + } else { + dyn_tmtc_pool_main(); + } +} + pub fn update_time(time_provider: &mut TimeProvider, timestamp: &mut [u8]) { time_provider .update_from_now() diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index 08c2bf4..61161ca 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -9,11 +9,11 @@ use satrs_core::spacepackets::time::TimeWriter; use satrs_example::{tmtc_err, CustomPusServiceId}; use std::sync::mpsc::Sender; -pub mod stack; pub mod action; pub mod event; pub mod hk; pub mod scheduler; +pub mod stack; pub mod test; pub struct PusTcMpscRouter { diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index 6d2b32b..ed89397 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -1,7 +1,6 @@ use std::sync::mpsc; use std::time::Duration; -use crate::tmtc::PusTcSource; use log::{error, info, warn}; use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StoreAddr}; use satrs_core::pus::scheduler::{PusScheduler, TcInfo}; @@ -15,17 +14,19 @@ use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::ChannelId; use satrs_example::{TcReceiverId, TmSenderId, PUS_APID}; +use crate::tmtc::PusTcSourceStaticPool; + pub struct Pus11Wrapper { pub pus_11_handler: PusService11SchedHandler, pub sched_tc_pool: StaticMemoryPool, - pub tc_source_wrapper: PusTcSource, + pub tc_source_wrapper: PusTcSourceStaticPool, } pub fn create_scheduler_service( shared_tm_store: SharedTmStore, tm_funnel_tx: mpsc::Sender, verif_reporter: VerificationReporterWithSender, - tc_source_wrapper: PusTcSource, + tc_source_wrapper: PusTcSourceStaticPool, pus_sched_rx: mpsc::Receiver, sched_tc_pool: StaticMemoryPool, ) -> Pus11Wrapper { diff --git a/satrs-example/src/pus/stack.rs b/satrs-example/src/pus/stack.rs index e54cb47..cd27856 100644 --- a/satrs-example/src/pus/stack.rs +++ b/satrs-example/src/pus/stack.rs @@ -1,3 +1,5 @@ +use satrs_core::pus::EcssTcInSharedStoreConverter; + use super::{ action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11Wrapper, test::Service17CustomWrapper, @@ -8,7 +10,7 @@ pub struct PusStack { hk_srv: Pus3Wrapper, action_srv: Pus8Wrapper, schedule_srv: Pus11Wrapper, - test_srv: Service17CustomWrapper, + test_srv: Service17CustomWrapper, } impl PusStack { @@ -17,14 +19,14 @@ impl PusStack { event_srv: Pus5Wrapper, action_srv: Pus8Wrapper, schedule_srv: Pus11Wrapper, - test_srv: Service17CustomWrapper, + test_srv: Service17CustomWrapper, ) -> Self { Self { event_srv, action_srv, schedule_srv, test_srv, - hk_srv + hk_srv, } } diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index 8bb2514..2c0a608 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -4,8 +4,8 @@ use satrs_core::pool::{SharedStaticMemoryPool, StoreAddr}; use satrs_core::pus::test::PusService17TestHandler; use satrs_core::pus::verification::{FailParams, VerificationReporterWithSender}; use satrs_core::pus::{ - EcssTcAndToken, EcssTcInMemConverter, MpscTcReceiver, MpscTmInStoreSender, - PusPacketHandlerResult, PusServiceHelper, + EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, + MpscTmInStoreSender, PusPacketHandlerResult, PusServiceHelper, }; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; @@ -17,14 +17,14 @@ use satrs_core::{events::EventU32, pus::EcssTcInSharedStoreConverter}; use satrs_example::{tmtc_err, TcReceiverId, TmSenderId, PUS_APID, TEST_EVENT}; use std::sync::mpsc::{self, Sender}; -pub fn create_test_service( +pub fn create_test_service_static( shared_tm_store: SharedTmStore, tm_funnel_tx: mpsc::Sender, verif_reporter: VerificationReporterWithSender, tc_pool: SharedStaticMemoryPool, event_sender: mpsc::Sender<(EventU32, Option)>, pus_test_rx: mpsc::Receiver, -) -> Service17CustomWrapper { +) -> Service17CustomWrapper { let test_srv_tm_sender = MpscTmInStoreSender::new( TmSenderId::PusTest as ChannelId, "PUS_17_TM_SENDER", @@ -49,12 +49,41 @@ pub fn create_test_service( } } -pub struct Service17CustomWrapper { - pub pus17_handler: PusService17TestHandler, +pub fn create_test_service_dynamic( + tm_funnel_tx: mpsc::Sender>, + verif_reporter: VerificationReporterWithSender, + event_sender: mpsc::Sender<(EventU32, Option)>, + pus_test_rx: mpsc::Receiver, +) -> Service17CustomWrapper { + let test_srv_tm_sender = MpscTmAsVecSender::new( + TmSenderId::PusTest as ChannelId, + "PUS_17_TM_SENDER", + tm_funnel_tx.clone(), + ); + let test_srv_receiver = MpscTcReceiver::new( + TcReceiverId::PusTest as ChannelId, + "PUS_17_TC_RECV", + pus_test_rx, + ); + let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( + Box::new(test_srv_receiver), + Box::new(test_srv_tm_sender), + PUS_APID, + verif_reporter.clone(), + EcssTcInVecConverter::default(), + )); + Service17CustomWrapper { + pus17_handler, + test_srv_event_sender: event_sender, + } +} + +pub struct Service17CustomWrapper { + pub pus17_handler: PusService17TestHandler, pub test_srv_event_sender: Sender<(EventU32, Option)>, } -impl Service17CustomWrapper { +impl Service17CustomWrapper { pub fn handle_next_packet(&mut self) -> bool { let res = self.pus17_handler.handle_one_tc(); if res.is_err() { diff --git a/satrs-example/src/tcp.rs b/satrs-example/src/tcp.rs index 8b7feff..7ec010f 100644 --- a/satrs-example/src/tcp.rs +++ b/satrs-example/src/tcp.rs @@ -71,20 +71,21 @@ impl TmPacketSourceCore for SyncTcpTmSource { } } -pub struct TcpTask { +pub struct TcpTask { server: TcpSpacepacketsServer< (), - CcsdsError, + CcsdsError, SyncTcpTmSource, - CcsdsDistributor, + CcsdsDistributor, >, + phantom: std::marker::PhantomData, } -impl TcpTask { +impl TcpTask { pub fn new( cfg: ServerConfig, tm_source: SyncTcpTmSource, - tc_receiver: CcsdsDistributor, + tc_receiver: CcsdsDistributor, ) -> Result { Ok(Self { server: TcpSpacepacketsServer::new( @@ -93,6 +94,7 @@ impl TcpTask { tc_receiver, Box::new(PACKET_ID_LOOKUP), )?, + phantom: std::marker::PhantomData, }) } diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 4ffcefc..853d538 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -1,7 +1,7 @@ use log::warn; use satrs_core::pus::{EcssTcAndToken, ReceivesEcssPusTc}; use satrs_core::spacepackets::SpHeader; -use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; +use std::sync::mpsc::{self, Receiver, SendError, Sender, TryRecvError}; use thiserror::Error; use crate::pus::PusReceiver; @@ -18,13 +18,13 @@ pub struct TmArgs { } pub struct TcArgs { - pub tc_source: PusTcSource, + pub tc_source: PusTcSourceStaticPool, pub tc_receiver: Receiver, } impl TcArgs { #[allow(dead_code)] - fn split(self) -> (PusTcSource, Receiver) { + fn split(self) -> (PusTcSourceStaticPool, Receiver) { (self.tc_source, self.tc_receiver) } } @@ -54,19 +54,19 @@ impl SharedTcPool { } #[derive(Clone)] -pub struct PusTcSource { +pub struct PusTcSourceStaticPool { pub tc_source: Sender, pub tc_store: SharedTcPool, } -impl PusTcSource { +impl PusTcSourceStaticPool { #[allow(dead_code)] pub fn clone_backing_pool(&self) -> SharedStaticMemoryPool { self.tc_store.pool.clone() } } -impl ReceivesEcssPusTc for PusTcSource { +impl ReceivesEcssPusTc for PusTcSourceStaticPool { type Error = MpscStoreAndSendError; fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { @@ -76,7 +76,7 @@ impl ReceivesEcssPusTc for PusTcSource { } } -impl ReceivesCcsdsTc for PusTcSource { +impl ReceivesCcsdsTc for PusTcSourceStaticPool { type Error = MpscStoreAndSendError; fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { @@ -88,13 +88,36 @@ impl ReceivesCcsdsTc for PusTcSource { } } -pub struct TmtcTask { +#[derive(Clone)] +pub struct PusTcSourceDynamic { + pub tc_source: Sender>, +} + +impl ReceivesEcssPusTc for PusTcSourceDynamic { + type Error = SendError>; + + fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { + self.tc_source.send(pus_tc.raw_data().to_vec())?; + Ok(()) + } +} + +impl ReceivesCcsdsTc for PusTcSourceDynamic { + type Error = mpsc::SendError>; + + fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { + self.tc_source.send(tc_raw.to_vec())?; + Ok(()) + } +} + +pub struct TmtcTaskStatic { tc_args: TcArgs, tc_buf: [u8; 4096], pus_receiver: PusReceiver, } -impl TmtcTask { +impl TmtcTaskStatic { pub fn new(tc_args: TcArgs, pus_receiver: PusReceiver) -> Self { Self { tc_args, @@ -148,3 +171,50 @@ impl TmtcTask { } } } + +pub struct TmtcTaskDynamic { + pub tc_receiver: Receiver>, + pus_receiver: PusReceiver, +} + +impl TmtcTaskDynamic { + pub fn new(tc_receiver: Receiver>, pus_receiver: PusReceiver) -> Self { + Self { + tc_receiver, + pus_receiver, + } + } + + pub fn periodic_operation(&mut self) { + self.poll_tc(); + } + + pub fn poll_tc(&mut self) -> bool { + match self.tc_receiver.try_recv() { + Ok(tc) => match PusTcReader::new(&tc) { + Ok((pus_tc, _)) => { + self.pus_receiver + .handle_tc_packet( + satrs_core::pus::TcInMemory::Vec(tc.clone()), + pus_tc.service(), + &pus_tc, + ) + .ok(); + true + } + Err(e) => { + warn!("error creating PUS TC from raw data: {e}"); + warn!("raw data: {:x?}", tc); + true + } + }, + Err(e) => match e { + TryRecvError::Empty => false, + TryRecvError::Disconnected => { + warn!("tmtc thread: sender disconnected"); + false + } + }, + } + } +} diff --git a/satrs-example/src/udp.rs b/satrs-example/src/udp.rs index 3853fd3..7c09ad5 100644 --- a/satrs-example/src/udp.rs +++ b/satrs-example/src/udp.rs @@ -1,4 +1,7 @@ -use std::{net::SocketAddr, sync::mpsc::Receiver}; +use std::{ + net::{SocketAddr, UdpSocket}, + sync::mpsc::{self, Receiver}, +}; use log::{info, warn}; use satrs_core::{ @@ -7,45 +10,17 @@ use satrs_core::{ tmtc::CcsdsError, }; -use crate::tmtc::MpscStoreAndSendError; +pub trait UdpTmHandler { + fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr); +} -pub struct UdpTmtcServer { - pub udp_tc_server: UdpTcServer>, +pub struct StaticUdpTmHandler { pub tm_rx: Receiver, pub tm_store: SharedStaticMemoryPool, } -impl UdpTmtcServer { - pub fn periodic_operation(&mut self) { - while self.poll_tc_server() {} - if let Some(recv_addr) = self.udp_tc_server.last_sender() { - self.send_tm_to_udp_client(&recv_addr); - } - } - fn poll_tc_server(&mut self) -> bool { - match self.udp_tc_server.try_recv_tc() { - Ok(_) => true, - Err(e) => match e { - ReceiveResult::ReceiverError(e) => match e { - CcsdsError::ByteConversionError(e) => { - warn!("packet error: {e:?}"); - true - } - CcsdsError::CustomError(e) => { - warn!("mpsc store and send error {e:?}"); - true - } - }, - ReceiveResult::IoError(e) => { - warn!("IO error {e}"); - false - } - ReceiveResult::NothingReceived => false, - }, - } - } - - fn send_tm_to_udp_client(&mut self, recv_addr: &SocketAddr) { +impl UdpTmHandler for StaticUdpTmHandler { + fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, &recv_addr: &SocketAddr) { while let Ok(addr) = self.tm_rx.try_recv() { let store_lock = self.tm_store.write(); if store_lock.is_err() { @@ -67,10 +42,72 @@ impl UdpTmtcServer { } else { info!("Sending PUS TM"); } - let result = self.udp_tc_server.socket.send_to(buf, recv_addr); + let result = socket.send_to(buf, recv_addr); if let Err(e) = result { warn!("Sending TM with UDP socket failed: {e}") } } } } + +pub struct DynamicUdpTmHandler { + pub tm_rx: Receiver>, +} + +impl UdpTmHandler for DynamicUdpTmHandler { + fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr) { + while let Ok(tm) = self.tm_rx.try_recv() { + if tm.len() > 9 { + let service = tm[7]; + let subservice = tm[8]; + info!("Sending PUS TM[{service},{subservice}]") + } else { + info!("Sending PUS TM"); + } + let result = socket.send_to(&tm, recv_addr); + if let Err(e) = result { + warn!("Sending TM with UDP socket failed: {e}") + } + } + } +} + +pub struct UdpTmtcServer { + pub udp_tc_server: UdpTcServer>, + pub tm_handler: TmHandler, +} + +impl + UdpTmtcServer +{ + pub fn periodic_operation(&mut self) { + while self.poll_tc_server() {} + if let Some(recv_addr) = self.udp_tc_server.last_sender() { + self.tm_handler + .send_tm_to_udp_client(&self.udp_tc_server.socket, &recv_addr); + } + } + + fn poll_tc_server(&mut self) -> bool { + match self.udp_tc_server.try_recv_tc() { + Ok(_) => true, + Err(e) => match e { + ReceiveResult::ReceiverError(e) => match e { + CcsdsError::ByteConversionError(e) => { + warn!("packet error: {e:?}"); + true + } + CcsdsError::CustomError(e) => { + warn!("mpsc custom error {e:?}"); + true + } + }, + ReceiveResult::IoError(e) => { + warn!("IO error {e}"); + false + } + ReceiveResult::NothingReceived => false, + }, + } + } +}