diff --git a/satrs-example/Cargo.toml b/satrs-example/Cargo.toml index 93b1906..936903c 100644 --- a/satrs-example/Cargo.toml +++ b/satrs-example/Cargo.toml @@ -23,6 +23,7 @@ derive-new = "0.7" cfg-if = "1" arbitrary-int = "2" bitbybit = "1.4" +postcard = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" @@ -31,8 +32,8 @@ satrs-minisim = { path = "../satrs-minisim" } satrs-mib = { path = "../satrs-mib" } [features] -default = ["heap_tmtc"] -heap_tmtc = [] +# default = ["heap_tmtc"] +# heap_tmtc = [] [dev-dependencies] env_logger = "0.11" diff --git a/satrs-example/src/acs/mgm.rs b/satrs-example/src/acs/mgm.rs index 5bb18e8..a434e3a 100644 --- a/satrs-example/src/acs/mgm.rs +++ b/satrs-example/src/acs/mgm.rs @@ -1,3 +1,4 @@ +/* use derive_new::new; use satrs::hk::{HkRequest, HkRequestVariant}; use satrs::mode_tree::{ModeChild, ModeNode}; @@ -717,3 +718,4 @@ mod tests { assert!(mgm_set.valid); } } +*/ diff --git a/satrs-example/src/eps/pcdu.rs b/satrs-example/src/eps/pcdu.rs index d595d78..8613c56 100644 --- a/satrs-example/src/eps/pcdu.rs +++ b/satrs-example/src/eps/pcdu.rs @@ -22,7 +22,7 @@ use satrs::{ use satrs_example::{ config::components::NO_SENDER, ids::{eps::PCDU, generic_pus::PUS_MODE}, - DeviceMode, TimestampHelper, + CcsdsTmPacketOwned, DeviceMode, TimestampHelper, }; use satrs_minisim::{ eps::{ @@ -34,7 +34,7 @@ use serde::{Deserialize, Serialize}; use crate::{ hk::PusHkHelper, - pus::hk::{HkReply, HkReplyVariant}, + //pus::hk::{HkReply, HkReplyVariant}, requests::CompositeRequest, tmtc::sender::TmTcSender, }; @@ -210,7 +210,8 @@ pub struct PcduHandler { dev_str: &'static str, mode_node: ModeRequestHandlerMpscBounded, composite_request_rx: mpsc::Receiver>, - hk_reply_tx: mpsc::SyncSender>, + //hk_reply_tx: mpsc::SyncSender>, + hk_tx: std::sync::mpsc::SyncSender, switch_request_rx: mpsc::Receiver>, tm_sender: TmTcSender, pub com_interface: ComInterface, @@ -303,12 +304,15 @@ impl PcduHandler { self.tm_sender .send_tm(self.id.id(), PusTmVariant::Direct(hk_tm)) .expect("failed to send HK TM"); + // TODO: Fix + /* self.hk_reply_tx .send(GenericMessage::new( *requestor_info, HkReply::new(hk_request.unique_id, HkReplyVariant::Ack), )) .expect("failed to send HK reply"); + */ } } } @@ -555,7 +559,8 @@ mod tests { pub mode_reply_rx_to_pus: mpsc::Receiver>, pub mode_reply_rx_to_parent: mpsc::Receiver>, pub composite_request_tx: mpsc::Sender>, - pub hk_reply_rx: mpsc::Receiver>, + //pub hk_reply_rx: mpsc::Receiver>, + pub hk_rx: std::sync::mpsc::Receiver, pub tm_rx: mpsc::Receiver, pub switch_request_tx: mpsc::Sender>, pub handler: PcduHandler, @@ -568,7 +573,7 @@ mod tests { let (mode_reply_tx_to_parent, mode_reply_rx_to_parent) = mpsc::sync_channel(5); let mode_node = ModeRequestHandlerMpscBounded::new(PCDU.into(), mode_request_rx); let (composite_request_tx, composite_request_rx) = mpsc::channel(); - let (hk_reply_tx, hk_reply_rx) = mpsc::sync_channel(10); + let (hk_tx, hk_rx) = mpsc::sync_channel(10); let (tm_tx, tm_rx) = mpsc::sync_channel::(5); let (switch_request_tx, switch_reqest_rx) = mpsc::channel(); let shared_switch_map = Arc::new(Mutex::new(SwitchSet::default())); @@ -577,7 +582,7 @@ mod tests { "TEST_PCDU", mode_node, composite_request_rx, - hk_reply_tx, + hk_tx, switch_reqest_rx, TmTcSender::Heap(tm_tx.clone()), SerialInterfaceTest::default(), @@ -590,7 +595,7 @@ mod tests { mode_reply_rx_to_pus, mode_reply_rx_to_parent, composite_request_tx, - hk_reply_rx, + hk_rx, tm_rx, switch_request_tx, handler, diff --git a/satrs-example/src/lib.rs b/satrs-example/src/lib.rs index ff51a19..f22db2b 100644 --- a/satrs-example/src/lib.rs +++ b/satrs-example/src/lib.rs @@ -1,8 +1,115 @@ -use satrs::spacepackets::time::cds::CdsTime; +extern crate alloc; + +use satrs::spacepackets::{ + ccsds_packet_len_for_user_data_len_with_checksum, time::cds::CdsTime, CcsdsPacketCreationError, + CcsdsPacketCreatorWithReservedData, CcsdsPacketIdAndPsc, SpacePacketHeader, +}; pub mod config; pub mod ids; +#[derive( + Debug, + Copy, + Clone, + PartialEq, + Eq, + Hash, + serde::Serialize, + serde::Deserialize, + num_enum::TryFromPrimitive, + num_enum::IntoPrimitive, +)] +#[repr(u64)] +pub enum ComponentId { + Pcdu, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum MessageType { + Ping, + Mode, + Hk, + Action, + Verification, +} + +/// Unserialized owned TM packet which can be cloned and sent around. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CcsdsTmPacketOwned { + pub sp_header: SpacePacketHeader, + pub tm_header: TmHeader, + pub payload: alloc::vec::Vec, +} + +#[derive(Debug, thiserror::Error)] +pub enum CcsdsCreationError { + #[error("CCSDS packet creation error: {0}")] + CcsdsPacketCreation(#[from] CcsdsPacketCreationError), + #[error("postcard error: {0}")] + Postcard(#[from] postcard::Error), + #[error("timestamp generation error")] + Time, +} + +impl CcsdsTmPacketOwned { + pub fn write_to_bytes(&self, buf: &mut [u8]) -> Result { + let response_len = + postcard::experimental::serialized_size(&self.tm_header)? + self.payload.len(); + let mut ccsds_tm = CcsdsPacketCreatorWithReservedData::new_tm_with_checksum( + self.sp_header, + response_len, + buf, + )?; + let user_data = ccsds_tm.packet_data_mut(); + let ser_len = postcard::to_slice(&self.tm_header, user_data)?.len(); + user_data[ser_len..ser_len + self.payload.len()].copy_from_slice(&self.payload); + let ccsds_packet_len = ccsds_tm.finish(); + Ok(ccsds_packet_len) + } + + pub fn len_written(&self) -> usize { + ccsds_packet_len_for_user_data_len_with_checksum( + postcard::experimental::serialized_size(&self.tm_header).unwrap() as usize + + postcard::experimental::serialized_size(&self.payload).unwrap() as usize, + ) + .unwrap() + } + + pub fn to_vec(&self) -> Result, CcsdsCreationError> { + let mut buf = alloc::vec![0u8; self.len_written()]; + let len = self.write_to_bytes(&mut buf)?; + buf.truncate(len); + Ok(buf) + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[non_exhaustive] +pub struct TmHeader { + pub sender_id: ComponentId, + pub target_id: ComponentId, + pub message_type: MessageType, + /// Telemetry can either be sent unsolicited, or as a response to telecommands. + pub tc_id: Option, + /// Raw CDS short timestamp. + pub timestamp: Option<[u8; 7]>, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[non_exhaustive] +pub struct TcHeader { + pub target_id: ComponentId, + pub request_type: MessageType, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CcsdsTcPacketOwned { + pub sp_header: SpacePacketHeader, + pub tc_header: TcHeader, + pub payload: alloc::vec::Vec, +} + #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub enum DeviceMode { Off = 0, diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 8486081..a1a55ba 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -140,6 +140,7 @@ fn main() { // This helper structure is used by all telecommand providers which need to send telecommands // to the TC source. + /* cfg_if::cfg_if! { if #[cfg(not(feature = "heap_tmtc"))] { let tc_sender_with_shared_pool = @@ -150,6 +151,7 @@ fn main() { let tc_in_mem_converter = EcssTcCacher::Heap(EcssTcVecCacher::default()); } } + */ // Create event handling components // These sender handles are used to send event requests, for example to enable or disable @@ -172,6 +174,7 @@ fn main() { let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::sync_channel(50); let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::sync_channel(30); + /* cfg_if::cfg_if! { if #[cfg(not(feature = "heap_tmtc"))] { let tc_releaser = TcReleaser::Static(tc_sender_with_shared_pool.clone()); @@ -179,6 +182,7 @@ fn main() { let tc_releaser = TcReleaser::Heap(tc_source_tx.clone()); } } + */ /* let pus_router = PusTcMpscRouter { test_tc_sender: pus_test_tx, @@ -238,29 +242,39 @@ fn main() { ); */ - cfg_if::cfg_if! { - if #[cfg(not(feature = "heap_tmtc"))] { - let mut tmtc_task = TcSourceTask::Static(TcSourceTaskStatic::new( - shared_tc_pool_wrapper.clone(), - tc_source_rx, - PusTcDistributor::new(tm_sender.clone(), pus_router), - )); - let tc_sender = TmTcSender::Static(tc_sender_with_shared_pool); - let udp_tm_handler = StaticUdpTmHandler { - tm_rx: tm_server_rx, - tm_store: shared_tm_pool.clone(), - }; - } else if #[cfg(feature = "heap_tmtc")] { - let mut tmtc_task = TcSourceTask::Heap(TcSourceTaskDynamic::new( - tc_source_rx, - PusTcDistributor::new(tm_sender.clone(), pus_router), - )); - let tc_sender = TmTcSender::Heap(tc_source_tx.clone()); - let udp_tm_handler = DynamicUdpTmHandler { - tm_rx: tm_server_rx, - }; + /* + cfg_if::cfg_if! { + if #[cfg(not(feature = "heap_tmtc"))] { + let mut tmtc_task = TcSourceTask::Static(TcSourceTaskStatic::new( + shared_tc_pool_wrapper.clone(), + tc_source_rx, + PusTcDistributor::new(tm_sender.clone(), pus_router), + )); + let tc_sender = TmTcSender::Static(tc_sender_with_shared_pool); + let udp_tm_handler = StaticUdpTmHandler { + tm_rx: tm_server_rx, + tm_store: shared_tm_pool.clone(), + }; + } else if #[cfg(feature = "heap_tmtc")] { + let mut tmtc_task = TcSourceTask::Heap(TcSourceTaskDynamic::new( + tc_source_rx, + PusTcDistributor::new(tm_sender.clone(), pus_router), + )); + let tc_sender = TmTcSender::Heap(tc_source_tx.clone()); + let udp_tm_handler = DynamicUdpTmHandler { + tm_rx: tm_server_rx, + }; + } } - } + */ + let mut tmtc_task = TcSourceTask::new( + tc_source_rx, + //PusTcDistributor::new(tm_sender.clone(), pus_router), + ); + let tc_sender = TmTcSender::Heap(tc_source_tx.clone()); + let udp_tm_handler = DynamicUdpTmHandler { + tm_rx: tm_server_rx, + }; let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_sender.clone()) @@ -286,22 +300,25 @@ fn main() { ) .expect("tcp server creation failed"); - cfg_if::cfg_if! { - if #[cfg(not(feature = "heap_tmtc"))] { - let mut tm_sink = TmSink::Static(TmSinkStatic::new( - shared_tm_pool_wrapper, - sync_tm_tcp_source, - tm_sink_rx, - tm_server_tx, - )); - } else if #[cfg(feature = "heap_tmtc")] { - let mut tm_sink = TmSink::Heap(TmSinkDynamic::new( - sync_tm_tcp_source, - tm_sink_rx, - tm_server_tx, - )); + /* + cfg_if::cfg_if! { + if #[cfg(not(feature = "heap_tmtc"))] { + let mut tm_sink = TmSink::Static(TmSinkStatic::new( + shared_tm_pool_wrapper, + sync_tm_tcp_source, + tm_sink_rx, + tm_server_tx, + )); + } else if #[cfg(feature = "heap_tmtc")] { + let mut tm_sink = TmSink::Heap(TmSinkDynamic::new( + sync_tm_tcp_source, + tm_sink_rx, + tm_server_tx, + )); + } } - } + */ + let mut tm_sink = TmSink::new(sync_tm_tcp_source, tm_sink_rx, tm_server_tx); let shared_switch_set = Arc::new(Mutex::default()); let (switch_request_tx, switch_request_rx) = mpsc::sync_channel(20); @@ -311,63 +328,65 @@ fn main() { let shared_mgm_1_set = Arc::default(); let mgm_0_mode_node = ModeRequestHandlerMpscBounded::new(MGM0.into(), mgm_0_handler_mode_rx); let mgm_1_mode_node = ModeRequestHandlerMpscBounded::new(MGM1.into(), mgm_1_handler_mode_rx); - let (mgm_0_spi_interface, mgm_1_spi_interface) = - if let Some(sim_client) = opt_sim_client.as_mut() { - sim_client - .add_reply_recipient(satrs_minisim::SimComponent::Mgm0Lis3Mdl, mgm_0_sim_reply_tx); - sim_client - .add_reply_recipient(satrs_minisim::SimComponent::Mgm1Lis3Mdl, mgm_1_sim_reply_tx); - ( - SpiSimInterfaceWrapper::Sim(SpiSimInterface { - sim_request_tx: sim_request_tx.clone(), - sim_reply_rx: mgm_0_sim_reply_rx, - }), - SpiSimInterfaceWrapper::Sim(SpiSimInterface { - sim_request_tx: sim_request_tx.clone(), - sim_reply_rx: mgm_1_sim_reply_rx, - }), - ) - } else { - ( - SpiSimInterfaceWrapper::Dummy(SpiDummyInterface::default()), - SpiSimInterfaceWrapper::Dummy(SpiDummyInterface::default()), - ) - }; - let mut mgm_0_handler = MgmHandlerLis3Mdl::new( - MGM0, - "MGM_0", - mgm_0_mode_node, - mgm_0_handler_composite_rx, - pus_hk_reply_tx.clone(), - switch_helper.clone(), - tm_sender.clone(), - mgm_0_spi_interface, - shared_mgm_0_set, - ); - let mut mgm_1_handler = MgmHandlerLis3Mdl::new( - MGM1, - "MGM_1", - mgm_1_mode_node, - mgm_1_handler_composite_rx, - pus_hk_reply_tx.clone(), - switch_helper.clone(), - tm_sender.clone(), - mgm_1_spi_interface, - shared_mgm_1_set, - ); - // Connect PUS service to device handlers. - connect_mode_nodes( - &mut pus_stack.mode_srv, - mgm_0_handler_mode_tx, - &mut mgm_0_handler, - pus_mode_reply_tx.clone(), - ); - connect_mode_nodes( - &mut pus_stack.mode_srv, - mgm_1_handler_mode_tx, - &mut mgm_1_handler, - pus_mode_reply_tx.clone(), - ); + /* + let (mgm_0_spi_interface, mgm_1_spi_interface) = + if let Some(sim_client) = opt_sim_client.as_mut() { + sim_client + .add_reply_recipient(satrs_minisim::SimComponent::Mgm0Lis3Mdl, mgm_0_sim_reply_tx); + sim_client + .add_reply_recipient(satrs_minisim::SimComponent::Mgm1Lis3Mdl, mgm_1_sim_reply_tx); + ( + SpiSimInterfaceWrapper::Sim(SpiSimInterface { + sim_request_tx: sim_request_tx.clone(), + sim_reply_rx: mgm_0_sim_reply_rx, + }), + SpiSimInterfaceWrapper::Sim(SpiSimInterface { + sim_request_tx: sim_request_tx.clone(), + sim_reply_rx: mgm_1_sim_reply_rx, + }), + ) + } else { + ( + SpiSimInterfaceWrapper::Dummy(SpiDummyInterface::default()), + SpiSimInterfaceWrapper::Dummy(SpiDummyInterface::default()), + ) + }; + let mut mgm_0_handler = MgmHandlerLis3Mdl::new( + MGM0, + "MGM_0", + mgm_0_mode_node, + mgm_0_handler_composite_rx, + pus_hk_reply_tx.clone(), + switch_helper.clone(), + tm_sender.clone(), + mgm_0_spi_interface, + shared_mgm_0_set, + ); + let mut mgm_1_handler = MgmHandlerLis3Mdl::new( + MGM1, + "MGM_1", + mgm_1_mode_node, + mgm_1_handler_composite_rx, + pus_hk_reply_tx.clone(), + switch_helper.clone(), + tm_sender.clone(), + mgm_1_spi_interface, + shared_mgm_1_set, + ); + // Connect PUS service to device handlers. + connect_mode_nodes( + &mut pus_stack.mode_srv, + mgm_0_handler_mode_tx, + &mut mgm_0_handler, + pus_mode_reply_tx.clone(), + ); + connect_mode_nodes( + &mut pus_stack.mode_srv, + mgm_1_handler_mode_tx, + &mut mgm_1_handler, + pus_mode_reply_tx.clone(), + ); + */ let pcdu_serial_interface = if let Some(sim_client) = opt_sim_client.as_mut() { sim_client.add_reply_recipient(satrs_minisim::SimComponent::Pcdu, pcdu_sim_reply_tx); @@ -390,12 +409,14 @@ fn main() { pcdu_serial_interface, shared_switch_set, ); + /* connect_mode_nodes( &mut pus_stack.mode_srv, pcdu_handler_mode_tx.clone(), &mut pcdu_handler, pus_mode_reply_tx, ); + */ // The PCDU is a critical component which should be in normal mode immediately. pcdu_handler_mode_tx @@ -459,8 +480,8 @@ fn main() { let jh_aocs = thread::Builder::new() .name("sat-rs aocs".to_string()) .spawn(move || loop { - mgm_0_handler.periodic_operation(); - mgm_1_handler.periodic_operation(); + //mgm_0_handler.periodic_operation(); + //mgm_1_handler.periodic_operation(); thread::sleep(Duration::from_millis(FREQ_MS_AOCS)); }) .unwrap(); diff --git a/satrs-example/src/tmtc/tc_source.rs b/satrs-example/src/tmtc/tc_source.rs index 41e3560..e100df0 100644 --- a/satrs-example/src/tmtc/tc_source.rs +++ b/satrs-example/src/tmtc/tc_source.rs @@ -1,17 +1,12 @@ -use satrs::{ - pool::PoolProvider, - pus::HandlingStatus, - tmtc::{PacketAsVec, PacketInPool, SharedPacketPool}, - ComponentId, -}; +use satrs::pus::HandlingStatus; +use satrs_example::{CcsdsTcPacketOwned, ComponentId}; use std::{ collections::HashMap, sync::mpsc::{self, TryRecvError}, }; -use crate::pus::PusTcDistributor; - // TC source components where static pools are the backing memory of the received telecommands. +/* pub struct TcSourceTaskStatic { shared_tc_pool: SharedPacketPool, tc_receiver: mpsc::Receiver, @@ -68,21 +63,21 @@ impl TcSourceTaskStatic { } } } +*/ -pub type CcsdsDistributorDyn = HashMap>; -pub type CcsdsDistributorStatic = HashMap>; +pub type CcsdsDistributor = HashMap>; // TC source components where the heap is the backing memory of the received telecommands. -pub struct TcSourceTaskDynamic { - pub tc_receiver: mpsc::Receiver, - ccsds_distributor: CcsdsDistributorDyn, +pub struct TcSourceTask { + pub tc_receiver: mpsc::Receiver, + ccsds_distributor: CcsdsDistributor, } -#[allow(dead_code)] -impl TcSourceTaskDynamic { +//#[allow(dead_code)] +impl TcSourceTask { pub fn new( - tc_receiver: mpsc::Receiver, - ccsds_distributor: CcsdsDistributorDyn, + tc_receiver: mpsc::Receiver, + ccsds_distributor: CcsdsDistributor, ) -> Self { Self { tc_receiver, @@ -98,10 +93,16 @@ impl TcSourceTaskDynamic { // Right now, we only expect ECSS PUS packets. // If packets like CFDP are expected, we might have to check the APID first. match self.tc_receiver.try_recv() { - Ok(packet_as_vec) => { - self.pus_distributor - .handle_tc_packet_vec(packet_as_vec) - .ok(); + Ok(packet) => { + if let Some(sender) = self.ccsds_distributor.get(&packet.tc_header.target_id) { + sender.send(packet).ok(); + } else { + log::warn!( + "no TC handler for target ID {:?}", + packet.tc_header.target_id + ); + // TODO: Send a dedicated TM packet. + } HandlingStatus::HandledOne } Err(e) => match e { @@ -114,18 +115,3 @@ impl TcSourceTaskDynamic { } } } - -#[allow(dead_code)] -pub enum TcSourceTask { - Static(TcSourceTaskStatic), - Heap(TcSourceTaskDynamic), -} - -impl TcSourceTask { - pub fn periodic_operation(&mut self) { - match self { - TcSourceTask::Static(task) => task.periodic_operation(), - TcSourceTask::Heap(task) => task.periodic_operation(), - } - } -} diff --git a/satrs-example/src/tmtc/tm_sink.rs b/satrs-example/src/tmtc/tm_sink.rs index 78d22d8..293b3c7 100644 --- a/satrs-example/src/tmtc/tm_sink.rs +++ b/satrs-example/src/tmtc/tm_sink.rs @@ -5,17 +5,13 @@ use std::{ use arbitrary_int::{u11, u14}; use log::info; -use satrs::{ - pool::PoolProvider, - spacepackets::{ - ecss::{tm::PusTmZeroCopyWriter, PusPacket}, - seq_count::SequenceCounter, - seq_count::SequenceCounterCcsdsSimple, - time::cds::MIN_CDS_FIELD_LEN, - CcsdsPacket, - }, - tmtc::{PacketAsVec, PacketInPool, SharedPacketPool}, +use satrs::spacepackets::{ + ecss::{tm::PusTmZeroCopyWriter, PusPacket}, + seq_count::SequenceCounter, + seq_count::SequenceCounterCcsdsSimple, + CcsdsPacket, }; +use satrs_example::CcsdsTmPacketOwned; use crate::interface::tcp::SyncTcpTmSource; @@ -83,6 +79,7 @@ impl TmFunnelCommon { } } +/* pub struct TmSinkStatic { common: TmFunnelCommon, shared_tm_store: SharedPacketPool, @@ -130,19 +127,20 @@ impl TmSinkStatic { } } } +*/ -pub struct TmSinkDynamic { +pub struct TmSink { common: TmFunnelCommon, - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::SyncSender, + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::SyncSender, } -#[allow(dead_code)] -impl TmSinkDynamic { +//#[allow(dead_code)] +impl TmSink { pub fn new( sync_tm_tcp_source: SyncTcpTmSource, - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::SyncSender, + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::SyncSender, ) -> Self { Self { common: TmFunnelCommon::new(sync_tm_tcp_source), @@ -153,13 +151,21 @@ impl TmSinkDynamic { pub fn operation(&mut self) { if let Ok(mut tm) = self.tm_funnel_rx.recv() { + //let tm_raw = tm.to_vec(); + tm.sp_header.set_seq_count( + self.common + .seq_counter_map + .get_and_increment(tm.sp_header.apid()), + ); // Read the TM, set sequence counter and message counter, and finally update // the CRC. + /* let zero_copy_writer = - PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN, true) + PusTmZeroCopyWriter::new(&mut tm_raw, MIN_CDS_FIELD_LEN, true) .expect("Creating TM zero copy writer failed"); self.common.apply_packet_processing(zero_copy_writer); - self.common.sync_tm_tcp_source.add_tm(&tm.packet); + */ + self.common.sync_tm_tcp_source.add_tm(&tm.to_vec().unwrap()); self.tm_server_tx .send(tm) .expect("Sending TM to server failed"); @@ -167,12 +173,13 @@ impl TmSinkDynamic { } } -#[allow(dead_code)] -pub enum TmSink { - Static(TmSinkStatic), - Heap(TmSinkDynamic), -} +//#[allow(dead_code)] +//pub enum TmSink { +//Static(TmSinkStatic), +//Heap(TmSinkDynamic), +//} +/* impl TmSink { pub fn operation(&mut self) { match self { @@ -181,3 +188,4 @@ impl TmSink { } } } +*/