From 3f4c0e7d513e890e61672a9332fb7924ef7a539c Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 27 Nov 2025 18:11:14 +0100 Subject: [PATCH] try to make it compile again --- satrs-example/src/eps/pcdu.rs | 28 ++-- satrs-example/src/events.rs | 4 +- satrs-example/src/interface/udp.rs | 95 +++++++----- satrs-example/src/lib.rs | 20 ++- satrs-example/src/main.rs | 224 +++------------------------- satrs-example/src/tmtc/sender.rs | 20 +-- satrs-example/src/tmtc/tc_source.rs | 47 ++++-- satrs-example/src/tmtc/tm_sink.rs | 14 +- 8 files changed, 164 insertions(+), 288 deletions(-) diff --git a/satrs-example/src/eps/pcdu.rs b/satrs-example/src/eps/pcdu.rs index 8613c56..d1326dd 100644 --- a/satrs-example/src/eps/pcdu.rs +++ b/satrs-example/src/eps/pcdu.rs @@ -32,12 +32,7 @@ use satrs_minisim::{ }; use serde::{Deserialize, Serialize}; -use crate::{ - hk::PusHkHelper, - //pus::hk::{HkReply, HkReplyVariant}, - requests::CompositeRequest, - tmtc::sender::TmTcSender, -}; +use crate::{hk::PusHkHelper, requests::CompositeRequest}; pub trait SerialInterface { type Error: core::fmt::Debug; @@ -213,7 +208,7 @@ pub struct PcduHandler { //hk_reply_tx: mpsc::SyncSender>, hk_tx: std::sync::mpsc::SyncSender, switch_request_rx: mpsc::Receiver>, - tm_sender: TmTcSender, + tm_tx: mpsc::SyncSender, pub com_interface: ComInterface, shared_switch_map: Arc>, #[new(value = "PusHkHelper::new(id)")] @@ -274,7 +269,7 @@ impl PcduHandler { } } - pub fn handle_hk_request(&mut self, requestor_info: &MessageMetadata, hk_request: &HkRequest) { + pub fn handle_hk_request(&mut self, _requestor_info: &MessageMetadata, hk_request: &HkRequest) { match hk_request.variant { HkRequestVariant::OneShot => { if hk_request.unique_id == SetId::SwitcherSet as u32 { @@ -301,11 +296,11 @@ impl PcduHandler { }, &mut self.tm_buf, ) { - self.tm_sender - .send_tm(self.id.id(), PusTmVariant::Direct(hk_tm)) - .expect("failed to send HK TM"); // TODO: Fix /* + self.tm_sender + .send(self.id.id(), PusTmVariant::Direct(hk_tm)) + .expect("failed to send HK TM"); self.hk_reply_tx .send(GenericMessage::new( *requestor_info, @@ -511,9 +506,7 @@ mod tests { use std::sync::mpsc; use arbitrary_int::u21; - use satrs::{ - mode::ModeRequest, power::SwitchStateBinary, request::GenericMessage, tmtc::PacketAsVec, - }; + use satrs::{mode::ModeRequest, power::SwitchStateBinary, request::GenericMessage}; use satrs_example::ids::{self, Apid}; use satrs_minisim::eps::SwitchMapBinary; @@ -561,7 +554,7 @@ mod tests { pub composite_request_tx: mpsc::Sender>, //pub hk_reply_rx: mpsc::Receiver>, pub hk_rx: std::sync::mpsc::Receiver, - pub tm_rx: mpsc::Receiver, + pub tm_rx: mpsc::Receiver, pub switch_request_tx: mpsc::Sender>, pub handler: PcduHandler, } @@ -574,7 +567,7 @@ mod tests { let mode_node = ModeRequestHandlerMpscBounded::new(PCDU.into(), mode_request_rx); let (composite_request_tx, composite_request_rx) = mpsc::channel(); let (hk_tx, hk_rx) = mpsc::sync_channel(10); - let (tm_tx, tm_rx) = mpsc::sync_channel::(5); + let (tm_tx, tm_rx) = mpsc::sync_channel(5); let (switch_request_tx, switch_reqest_rx) = mpsc::channel(); let shared_switch_map = Arc::new(Mutex::new(SwitchSet::default())); let mut handler = PcduHandler::new( @@ -584,7 +577,8 @@ mod tests { composite_request_rx, hk_tx, switch_reqest_rx, - TmTcSender::Heap(tm_tx.clone()), + tm_tx.clone(), + //TmTcSender::Normal(tm_tx.clone()), SerialInterfaceTest::default(), shared_switch_map, ); diff --git a/satrs-example/src/events.rs b/satrs-example/src/events.rs index bc91add..3e0b6fb 100644 --- a/satrs-example/src/events.rs +++ b/satrs-example/src/events.rs @@ -1,6 +1,6 @@ use std::sync::mpsc::{self}; -use crate::pus::create_verification_reporter; +//use crate::pus::create_verification_reporter; use arbitrary_int::traits::Integer as _; use arbitrary_int::u11; use satrs::event_man_legacy::{EventMessageU32, EventRoutingError}; @@ -34,6 +34,7 @@ impl EventTmHook for EventApidSetter { } } +/* /// The PUS event handler subscribes for all events and converts them into ECSS PUS 5 event /// packets. It also handles the verification completion of PUS event service requests. pub struct PusEventHandler { @@ -292,3 +293,4 @@ mod tests { // TODO: Add test. } } +*/ diff --git a/satrs-example/src/interface/udp.rs b/satrs-example/src/interface/udp.rs index 70efe78..63668c3 100644 --- a/satrs-example/src/interface/udp.rs +++ b/satrs-example/src/interface/udp.rs @@ -1,22 +1,22 @@ #![allow(dead_code)] +use std::collections::VecDeque; use std::net::{SocketAddr, UdpSocket}; -use std::sync::mpsc; +use std::sync::{mpsc, Arc, Mutex}; use log::{info, warn}; use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer}; use satrs::pus::HandlingStatus; use satrs::queue::GenericSendError; -use satrs::tmtc::PacketAsVec; -use satrs::pool::{PoolProviderWithGuards, SharedStaticMemoryPool}; -use satrs::tmtc::PacketInPool; +use satrs_example::CcsdsTmPacketOwned; use crate::tmtc::sender::TmTcSender; -pub trait UdpTmHandler { +pub trait UdpTmHandlerProvider { fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr); } +/* pub struct StaticUdpTmHandler { pub tm_rx: mpsc::Receiver, pub tm_store: SharedStaticMemoryPool, @@ -45,22 +45,17 @@ impl UdpTmHandler for StaticUdpTmHandler { } } } +*/ -pub struct DynamicUdpTmHandler { - pub tm_rx: mpsc::Receiver, +pub struct UdpTmHandlerWithChannel { + pub tm_rx: mpsc::Receiver, } -impl UdpTmHandler for DynamicUdpTmHandler { +impl UdpTmHandlerProvider for UdpTmHandlerWithChannel { fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr) { while let Ok(tm) = self.tm_rx.try_recv() { - if tm.packet.len() > 9 { - let service = tm.packet[7]; - let subservice = tm.packet[8]; - info!("Sending PUS TM[{service},{subservice}]") - } else { - info!("Sending PUS TM"); - } - let result = socket.send_to(&tm.packet, recv_addr); + info!("Sending PUS TM with header {:?}", tm.tm_header); + let result = socket.send_to(&tm.to_vec(), recv_addr); if let Err(e) = result { warn!("Sending TM with UDP socket failed: {e}") } @@ -68,12 +63,49 @@ impl UdpTmHandler for DynamicUdpTmHandler { } } -pub struct UdpTmtcServer { - pub udp_tc_server: UdpTcServer, - pub tm_handler: TmHandler, +#[derive(Default, Debug, Clone)] +pub struct TestTmHandler { + addrs_to_send_to: Arc>>, } -impl UdpTmtcServer { +impl UdpTmHandlerProvider for TestTmHandler { + fn send_tm_to_udp_client(&mut self, _socket: &UdpSocket, recv_addr: &SocketAddr) { + self.addrs_to_send_to.lock().unwrap().push_back(*recv_addr); + } +} + +pub enum UdpTmHandler { + Normal(UdpTmHandlerWithChannel), + Test(TestTmHandler), +} + +impl From for UdpTmHandler { + fn from(handler: UdpTmHandlerWithChannel) -> Self { + UdpTmHandler::Normal(handler) + } +} + +impl From for UdpTmHandler { + fn from(handler: TestTmHandler) -> Self { + UdpTmHandler::Test(handler) + } +} + +impl UdpTmHandlerProvider for UdpTmHandler { + fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr) { + match self { + UdpTmHandler::Normal(handler) => handler.send_tm_to_udp_client(socket, recv_addr), + UdpTmHandler::Test(handler) => handler.send_tm_to_udp_client(socket, recv_addr), + } + } +} + +pub struct UdpTmtcServer { + pub udp_tc_server: UdpTcServer, + pub tm_handler: UdpTmHandler, +} + +impl UdpTmtcServer { pub fn periodic_operation(&mut self) { loop { if self.poll_tc_server() == HandlingStatus::Empty { @@ -107,12 +139,8 @@ impl UdpTmtcServer { #[cfg(test)] mod tests { + use std::net::IpAddr; use std::net::Ipv4Addr; - use std::{ - collections::VecDeque, - net::IpAddr, - sync::{Arc, Mutex}, - }; use arbitrary_int::traits::Integer as _; use arbitrary_int::u14; @@ -127,23 +155,12 @@ mod tests { use satrs_example::config::OBSW_SERVER_ADDR; use satrs_example::ids; - use crate::tmtc::sender::{MockSender, TmTcSender}; + use crate::tmtc::sender::MockSender; use super::*; const UDP_SERVER_ID: ComponentId = 0x05; - #[derive(Default, Debug, Clone)] - pub struct TestTmHandler { - addrs_to_send_to: Arc>>, - } - - impl UdpTmHandler for TestTmHandler { - fn send_tm_to_udp_client(&mut self, _socket: &UdpSocket, recv_addr: &SocketAddr) { - self.addrs_to_send_to.lock().unwrap().push_back(*recv_addr); - } - } - #[test] fn test_basic() { let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); @@ -154,7 +171,7 @@ mod tests { let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); let mut udp_dyn_server = UdpTmtcServer { udp_tc_server, - tm_handler, + tm_handler: tm_handler.into(), }; udp_dyn_server.periodic_operation(); let queue = udp_dyn_server @@ -179,7 +196,7 @@ mod tests { let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); let mut udp_dyn_server = UdpTmtcServer { udp_tc_server, - tm_handler, + tm_handler: tm_handler.into(), }; let sph = SpHeader::new_for_unseg_tc(ids::Apid::GenericPus.raw_value(), u14::ZERO, 0); let ping_tc = PusTcCreator::new_simple( diff --git a/satrs-example/src/lib.rs b/satrs-example/src/lib.rs index f22db2b..98b041c 100644 --- a/satrs-example/src/lib.rs +++ b/satrs-example/src/lib.rs @@ -42,6 +42,20 @@ pub struct CcsdsTmPacketOwned { pub payload: alloc::vec::Vec, } +/// Simple type modelling packet stored in the heap. This structure is intended to +/// be used when sending a packet via a message queue, so it also contains the sender ID. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct PacketAsVec { + pub sender_id: ComponentId, + pub packet: Vec, +} + +impl PacketAsVec { + pub fn new(sender_id: ComponentId, packet: Vec) -> Self { + Self { sender_id, packet } + } +} + #[derive(Debug, thiserror::Error)] pub enum CcsdsCreationError { #[error("CCSDS packet creation error: {0}")] @@ -76,11 +90,11 @@ impl CcsdsTmPacketOwned { .unwrap() } - pub fn to_vec(&self) -> Result, CcsdsCreationError> { + pub fn to_vec(&self) -> alloc::vec::Vec { let mut buf = alloc::vec![0u8; self.len_written()]; - let len = self.write_to_bytes(&mut buf)?; + let len = self.write_to_bytes(&mut buf).unwrap(); buf.truncate(len); - Ok(buf) + buf } } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index a1a55ba..acf8845 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -5,12 +5,10 @@ use std::{ time::Duration, }; -use acs::mgm::{MgmHandlerLis3Mdl, SpiDummyInterface, SpiSimInterface, SpiSimInterfaceWrapper}; use eps::{ pcdu::{PcduHandler, SerialInterfaceDummy, SerialInterfaceToSim, SerialSimInterfaceWrapper}, PowerSwitchHelper, }; -use events::EventHandler; use interface::{ sim_client_udp::create_sim_client, tcp::{SyncTcpTmSource, TcpTask}, @@ -18,31 +16,17 @@ use interface::{ }; use log::info; use logger::setup_logger; -/* -use pus::{ - action::create_action_service, - event::create_event_service, - hk::create_hk_service, - mode::create_mode_service, - scheduler::{create_scheduler_service, TcReleaser}, - stack::PusStack, - test::create_test_service, - PusTcDistributor, PusTcMpscRouter, -}; -*/ use requests::GenericRequestRouter; use satrs::{ hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, mode::{Mode, ModeAndSubmode, ModeRequest, ModeRequestHandlerMpscBounded}, - mode_tree::connect_mode_nodes, - pus::{event_man::EventRequestWithToken, EcssTcCacher, HandlingStatus}, + pus::{event_man::EventRequestWithToken, HandlingStatus}, request::{GenericMessage, MessageMetadata}, spacepackets::time::cds::CdsTime, }; use satrs_example::{ config::{ components::NO_SENDER, - pool::create_sched_tc_pool, tasks::{FREQ_MS_AOCS, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC, SIM_CLIENT_IDLE_DELAY_MS}, OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT, }, @@ -56,23 +40,7 @@ use satrs_example::{ use tmtc::sender::TmTcSender; use tmtc::{tc_source::TcSourceTask, tm_sink::TmSink}; -cfg_if::cfg_if! { - if #[cfg(feature = "heap_tmtc")] { - use interface::udp::DynamicUdpTmHandler; - use satrs::pus::EcssTcVecCacher; - use tmtc::{tc_source::TcSourceTaskDynamic, tm_sink::TmSinkDynamic}; - } else { - use std::sync::RwLock; - use interface::udp::StaticUdpTmHandler; - use satrs::pus::EcssTcInSharedPoolCacher; - use satrs::tmtc::{PacketSenderWithSharedPool, SharedPacketPool}; - use satrs_example::config::pool::create_static_pools; - use tmtc::{ - tc_source::TcSourceTaskStatic, - tm_sink::TmSinkStatic, - }; - } -} +use crate::{interface::udp::UdpTmHandlerWithChannel, tmtc::tc_source::CcsdsDistributor}; mod acs; mod eps; @@ -89,33 +57,13 @@ fn main() { setup_logger().expect("setting up logging with fern failed"); println!("Running OBSW example"); - cfg_if::cfg_if! { - if #[cfg(not(feature = "heap_tmtc"))] { - let (tm_pool, tc_pool) = create_static_pools(); - let shared_tm_pool = Arc::new(RwLock::new(tm_pool)); - let shared_tc_pool = Arc::new(RwLock::new(tc_pool)); - let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool); - let shared_tc_pool_wrapper = SharedPacketPool::new(&shared_tc_pool); - } - } - let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50); let (tm_sink_tx, tm_sink_rx) = mpsc::sync_channel(50); let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50); - cfg_if::cfg_if! { - if #[cfg(not(feature = "heap_tmtc"))] { - let tm_sender = TmTcSender::Static( - PacketSenderWithSharedPool::new(tm_sink_tx.clone(), shared_tm_pool_wrapper.clone()) - ); - } else if #[cfg(feature = "heap_tmtc")] { - let tm_sender = TmTcSender::Heap(tm_sink_tx.clone()); - } - } - let (sim_request_tx, sim_request_rx) = mpsc::channel(); - let (mgm_0_sim_reply_tx, mgm_0_sim_reply_rx) = mpsc::channel(); - let (mgm_1_sim_reply_tx, mgm_1_sim_reply_rx) = mpsc::channel(); + //let (mgm_0_sim_reply_tx, mgm_0_sim_reply_rx) = mpsc::channel(); + //let (mgm_1_sim_reply_tx, mgm_1_sim_reply_rx) = mpsc::channel(); let (pcdu_sim_reply_tx, pcdu_sim_reply_rx) = mpsc::channel(); let mut opt_sim_client = create_sim_client(sim_request_rx); @@ -138,141 +86,35 @@ fn main() { .composite_router_map .insert(PCDU.id(), pcdu_handler_composite_tx); - // This helper structure is used by all telecommand providers which need to send telecommands - // to the TC source. - /* - cfg_if::cfg_if! { - if #[cfg(not(feature = "heap_tmtc"))] { - let tc_sender_with_shared_pool = - PacketSenderWithSharedPool::new(tc_source_tx, shared_tc_pool_wrapper.clone()); - let tc_in_mem_converter = - EcssTcCacher::Static(EcssTcInSharedPoolCacher::new(shared_tc_pool, 4096)); - } else if #[cfg(feature = "heap_tmtc")] { - let tc_in_mem_converter = EcssTcCacher::Heap(EcssTcVecCacher::default()); - } - } - */ - // Create event handling components // These sender handles are used to send event requests, for example to enable or disable // certain events. - let (event_tx, event_rx) = mpsc::sync_channel(100); + //let (event_tx, event_rx) = mpsc::sync_channel(100); let (event_request_tx, event_request_rx) = mpsc::channel::(); // The event task is the core handler to perform the event routing and TM handling as specified // in the sat-rs documentation. - let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx); + //let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx); - let (pus_test_tx, pus_test_rx) = mpsc::sync_channel(20); - let (pus_event_tx, pus_event_rx) = mpsc::sync_channel(10); - let (pus_sched_tx, pus_sched_rx) = mpsc::sync_channel(50); - let (pus_hk_tx, pus_hk_rx) = mpsc::sync_channel(50); - let (pus_action_tx, pus_action_rx) = mpsc::sync_channel(50); - let (pus_mode_tx, pus_mode_rx) = mpsc::sync_channel(50); + //let (pus_test_tx, pus_test_rx) = mpsc::sync_channel(20); + //let (pus_event_tx, pus_event_rx) = mpsc::sync_channel(10); + //let (pus_sched_tx, pus_sched_rx) = mpsc::sync_channel(50); + //let (pus_hk_tx, pus_hk_rx) = mpsc::sync_channel(50); + //let (pus_action_tx, pus_action_rx) = mpsc::sync_channel(50); + //let (pus_mode_tx, pus_mode_rx) = mpsc::sync_channel(50); - let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); + //let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::sync_channel(50); - let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::sync_channel(30); + //let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::sync_channel(30); - /* - cfg_if::cfg_if! { - if #[cfg(not(feature = "heap_tmtc"))] { - let tc_releaser = TcReleaser::Static(tc_sender_with_shared_pool.clone()); - } else if #[cfg(feature = "heap_tmtc")] { - let tc_releaser = TcReleaser::Heap(tc_source_tx.clone()); - } - } - */ - /* - let pus_router = PusTcMpscRouter { - test_tc_sender: pus_test_tx, - event_tc_sender: pus_event_tx, - sched_tc_sender: pus_sched_tx, - hk_tc_sender: pus_hk_tx, - action_tc_sender: pus_action_tx, - mode_tc_sender: pus_mode_tx, - }; - let pus_test_service = create_test_service( - tm_sender.clone(), - tc_in_mem_converter.clone(), - event_tx.clone(), - pus_test_rx, - ); - let pus_scheduler_service = create_scheduler_service( - tm_sender.clone(), - tc_in_mem_converter.clone(), - tc_releaser, - pus_sched_rx, - create_sched_tc_pool(), - ); - let pus_event_service = create_event_service( - tm_sender.clone(), - tc_in_mem_converter.clone(), - pus_event_rx, - event_request_tx, - ); - let pus_action_service = create_action_service( - tm_sender.clone(), - tc_in_mem_converter.clone(), - pus_action_rx, - request_map.clone(), - pus_action_reply_rx, - ); - let pus_hk_service = create_hk_service( - tm_sender.clone(), - tc_in_mem_converter.clone(), - pus_hk_rx, - request_map.clone(), - pus_hk_reply_rx, - ); - let pus_mode_service = create_mode_service( - tm_sender.clone(), - tc_in_mem_converter.clone(), - pus_mode_rx, - request_map, - pus_mode_reply_rx, - ); - let mut pus_stack = PusStack::new( - pus_test_service, - pus_hk_service, - pus_event_service, - pus_action_service, - pus_scheduler_service, - pus_mode_service, - ); - */ - - /* - cfg_if::cfg_if! { - if #[cfg(not(feature = "heap_tmtc"))] { - let mut tmtc_task = TcSourceTask::Static(TcSourceTaskStatic::new( - shared_tc_pool_wrapper.clone(), - tc_source_rx, - PusTcDistributor::new(tm_sender.clone(), pus_router), - )); - let tc_sender = TmTcSender::Static(tc_sender_with_shared_pool); - let udp_tm_handler = StaticUdpTmHandler { - tm_rx: tm_server_rx, - tm_store: shared_tm_pool.clone(), - }; - } else if #[cfg(feature = "heap_tmtc")] { - let mut tmtc_task = TcSourceTask::Heap(TcSourceTaskDynamic::new( - tc_source_rx, - PusTcDistributor::new(tm_sender.clone(), pus_router), - )); - let tc_sender = TmTcSender::Heap(tc_source_tx.clone()); - let udp_tm_handler = DynamicUdpTmHandler { - tm_rx: tm_server_rx, - }; - } - } - */ + let mut ccsds_distributor = CcsdsDistributor::default(); let mut tmtc_task = TcSourceTask::new( tc_source_rx, + ccsds_distributor, //PusTcDistributor::new(tm_sender.clone(), pus_router), ); - let tc_sender = TmTcSender::Heap(tc_source_tx.clone()); - let udp_tm_handler = DynamicUdpTmHandler { + let tc_sender = TmTcSender::Normal(tc_source_tx.clone()); + let udp_tm_handler = UdpTmHandlerWithChannel { tm_rx: tm_server_rx, }; @@ -281,7 +123,7 @@ fn main() { .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, - tm_handler: udp_tm_handler, + tm_handler: udp_tm_handler.into(), }; let tcp_server_cfg = ServerConfig::new( @@ -300,32 +142,14 @@ fn main() { ) .expect("tcp server creation failed"); - /* - cfg_if::cfg_if! { - if #[cfg(not(feature = "heap_tmtc"))] { - let mut tm_sink = TmSink::Static(TmSinkStatic::new( - shared_tm_pool_wrapper, - sync_tm_tcp_source, - tm_sink_rx, - tm_server_tx, - )); - } else if #[cfg(feature = "heap_tmtc")] { - let mut tm_sink = TmSink::Heap(TmSinkDynamic::new( - sync_tm_tcp_source, - tm_sink_rx, - tm_server_tx, - )); - } - } - */ let mut tm_sink = TmSink::new(sync_tm_tcp_source, tm_sink_rx, tm_server_tx); let shared_switch_set = Arc::new(Mutex::default()); let (switch_request_tx, switch_request_rx) = mpsc::sync_channel(20); let switch_helper = PowerSwitchHelper::new(switch_request_tx, shared_switch_set.clone()); - let shared_mgm_0_set = Arc::default(); - let shared_mgm_1_set = Arc::default(); + //let shared_mgm_0_set = Arc::default(); + //let shared_mgm_1_set = Arc::default(); let mgm_0_mode_node = ModeRequestHandlerMpscBounded::new(MGM0.into(), mgm_0_handler_mode_rx); let mgm_1_mode_node = ModeRequestHandlerMpscBounded::new(MGM1.into(), mgm_1_handler_mode_rx); /* @@ -405,7 +229,7 @@ fn main() { pcdu_handler_composite_rx, pus_hk_reply_tx, switch_request_rx, - tm_sender.clone(), + tm_sink_tx.clone(), pcdu_serial_interface, shared_switch_set, ); @@ -507,8 +331,8 @@ fn main() { let jh_pus_handler = thread::Builder::new() .name("sat-rs pus".to_string()) .spawn(move || loop { - event_handler.periodic_operation(); - pus_stack.periodic_operation(); + //event_handler.periodic_operation(); + //pus_stack.periodic_operation(); thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); }) .unwrap(); diff --git a/satrs-example/src/tmtc/sender.rs b/satrs-example/src/tmtc/sender.rs index 4d243b7..34f477e 100644 --- a/satrs-example/src/tmtc/sender.rs +++ b/satrs-example/src/tmtc/sender.rs @@ -1,10 +1,8 @@ use std::{cell::RefCell, collections::VecDeque, sync::mpsc}; use satrs::{ - pus::EcssTmSender, queue::GenericSendError, - spacepackets::ecss::WritablePusPacket, - tmtc::{PacketAsVec, PacketHandler, PacketSenderWithSharedPool}, + tmtc::{PacketAsVec, PacketHandler}, ComponentId, }; @@ -14,8 +12,7 @@ pub struct MockSender(pub RefCell>); #[allow(dead_code)] #[derive(Debug, Clone)] pub enum TmTcSender { - Static(PacketSenderWithSharedPool), - Heap(mpsc::SyncSender), + Normal(mpsc::SyncSender), Mock(MockSender), } @@ -29,6 +26,7 @@ impl TmTcSender { } } +/* impl EcssTmSender for TmTcSender { fn send_tm( &self, @@ -36,7 +34,7 @@ impl EcssTmSender for TmTcSender { tm: satrs::pus::PusTmVariant, ) -> Result<(), satrs::pus::EcssTmtcError> { match self { - TmTcSender::Static(sync_sender) => sync_sender.send_tm(sender_id, tm), + //TmTcSender::Static(sync_sender) => sync_sender.send_tm(sender_id, tm), TmTcSender::Heap(sync_sender) => match tm { satrs::pus::PusTmVariant::InStore(_) => panic!("can not send TM in store"), satrs::pus::PusTmVariant::Direct(pus_tm_creator) => sync_sender @@ -47,19 +45,15 @@ impl EcssTmSender for TmTcSender { } } } +*/ impl PacketHandler for TmTcSender { type Error = GenericSendError; fn handle_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> { match self { - TmTcSender::Static(packet_sender_with_shared_pool) => { - if let Err(e) = packet_sender_with_shared_pool.handle_packet(sender_id, packet) { - log::error!("Error sending packet via Static TM/TC sender: {:?}", e); - } - } - TmTcSender::Heap(sync_sender) => { - if let Err(e) = sync_sender.handle_packet(sender_id, packet) { + TmTcSender::Normal(sync_sender) => { + if let Err(e) = sync_sender.send(PacketAsVec::new(sender_id, packet.to_vec())) { log::error!("Error sending packet via Heap TM/TC sender: {:?}", e); } } diff --git a/satrs-example/src/tmtc/tc_source.rs b/satrs-example/src/tmtc/tc_source.rs index e100df0..1de2aa6 100644 --- a/satrs-example/src/tmtc/tc_source.rs +++ b/satrs-example/src/tmtc/tc_source.rs @@ -1,5 +1,9 @@ -use satrs::pus::HandlingStatus; -use satrs_example::{CcsdsTcPacketOwned, ComponentId}; +use satrs::{ + pus::HandlingStatus, + spacepackets::{CcsdsPacketReader, ChecksumType}, + tmtc::PacketAsVec, +}; +use satrs_example::{CcsdsTcPacketOwned, ComponentId, TcHeader}; use std::{ collections::HashMap, sync::mpsc::{self, TryRecvError}, @@ -69,14 +73,14 @@ pub type CcsdsDistributor = HashMap, + pub tc_receiver: mpsc::Receiver, ccsds_distributor: CcsdsDistributor, } //#[allow(dead_code)] impl TcSourceTask { pub fn new( - tc_receiver: mpsc::Receiver, + tc_receiver: mpsc::Receiver, ccsds_distributor: CcsdsDistributor, ) -> Self { Self { @@ -94,14 +98,39 @@ impl TcSourceTask { // If packets like CFDP are expected, we might have to check the APID first. match self.tc_receiver.try_recv() { Ok(packet) => { - if let Some(sender) = self.ccsds_distributor.get(&packet.tc_header.target_id) { - sender.send(packet).ok(); - } else { + let ccsds_tc_reader_result = + CcsdsPacketReader::new(&packet.packet, Some(ChecksumType::WithCrc16)); + if ccsds_tc_reader_result.is_err() { log::warn!( - "no TC handler for target ID {:?}", - packet.tc_header.target_id + "received invalid CCSDS TC packet: {:?}", + ccsds_tc_reader_result.err() ); // TODO: Send a dedicated TM packet. + return HandlingStatus::HandledOne; + } + let ccsds_tc_reader = ccsds_tc_reader_result.unwrap(); + let tc_header_result = + postcard::take_from_bytes::(ccsds_tc_reader.user_data()); + if tc_header_result.is_err() { + log::warn!( + "received CCSDS TC packet with invalid TC header: {:?}", + tc_header_result.err() + ); + // TODO: Send a dedicated TM packet. + return HandlingStatus::HandledOne; + } + let (tc_header, payload) = tc_header_result.unwrap(); + if let Some(sender) = self.ccsds_distributor.get(&tc_header.target_id) { + sender + .send(CcsdsTcPacketOwned { + sp_header: *ccsds_tc_reader.sp_header(), + tc_header, + payload: payload.to_vec(), + }) + .ok(); + } else { + log::warn!("no TC handler for target ID {:?}", tc_header.target_id); + // TODO: Send a dedicated TM packet. } HandlingStatus::HandledOne } diff --git a/satrs-example/src/tmtc/tm_sink.rs b/satrs-example/src/tmtc/tm_sink.rs index 293b3c7..83627a0 100644 --- a/satrs-example/src/tmtc/tm_sink.rs +++ b/satrs-example/src/tmtc/tm_sink.rs @@ -5,11 +5,13 @@ use std::{ use arbitrary_int::{u11, u14}; use log::info; -use satrs::spacepackets::{ - ecss::{tm::PusTmZeroCopyWriter, PusPacket}, - seq_count::SequenceCounter, - seq_count::SequenceCounterCcsdsSimple, - CcsdsPacket, +use satrs::{ + spacepackets::{ + ecss::{tm::PusTmZeroCopyWriter, PusPacket}, + seq_count::{SequenceCounter, SequenceCounterCcsdsSimple}, + CcsdsPacket, + }, + tmtc::PacketAsVec, }; use satrs_example::CcsdsTmPacketOwned; @@ -165,7 +167,7 @@ impl TmSink { .expect("Creating TM zero copy writer failed"); self.common.apply_packet_processing(zero_copy_writer); */ - self.common.sync_tm_tcp_source.add_tm(&tm.to_vec().unwrap()); + self.common.sync_tm_tcp_source.add_tm(&tm.to_vec()); self.tm_server_tx .send(tm) .expect("Sending TM to server failed");