diff --git a/satrs-example/Cargo.toml b/satrs-example/Cargo.toml index 2619dca..fa1dc88 100644 --- a/satrs-example/Cargo.toml +++ b/satrs-example/Cargo.toml @@ -20,6 +20,7 @@ thiserror = "2" lazy_static = "1" strum = { version = "0.26", features = ["derive"] } derive-new = "0.7" +cfg-if = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/satrs-example/src/acs/mgm.rs b/satrs-example/src/acs/mgm.rs index d6563cb..5ef17ee 100644 --- a/satrs-example/src/acs/mgm.rs +++ b/satrs-example/src/acs/mgm.rs @@ -25,6 +25,7 @@ use satrs_example::config::components::{NO_SENDER, PUS_MODE_SERVICE}; use crate::hk::PusHkHelper; use crate::pus::hk::{HkReply, HkReplyVariant}; use crate::requests::CompositeRequest; +use crate::spi::SpiInterface; use crate::tm_sender::TmSender; use serde::{Deserialize, Serialize}; @@ -50,11 +51,6 @@ pub enum TransitionState { Done, } -pub trait SpiInterface { - type Error: Debug; - fn transfer(&mut self, tx: &[u8], rx: &mut [u8]) -> Result<(), Self::Error>; -} - #[derive(Default)] pub struct SpiDummyInterface { pub dummy_values: MgmLis3RawValues, @@ -160,6 +156,7 @@ impl Default for ModeHelpers { #[derive(new)] #[allow(clippy::too_many_arguments)] pub struct MgmHandlerLis3Mdl< + ComInterface: SpiInterface, SwitchHelper: PowerSwitchInfo + PowerSwitcherCommandSender, > { id: UniqueApidTargetId, @@ -169,7 +166,7 @@ pub struct MgmHandlerLis3Mdl< hk_reply_tx: mpsc::Sender>, switch_helper: SwitchHelper, tm_sender: TmSender, - pub com_interface: SpiSimInterfaceWrapper, + pub com_interface: ComInterface, shared_mgm_set: Arc>, #[new(value = "PusHkHelper::new(id)")] hk_helper: PusHkHelper, @@ -181,8 +178,10 @@ pub struct MgmHandlerLis3Mdl< stamp_helper: TimestampHelper, } -impl + PowerSwitcherCommandSender> - MgmHandlerLis3Mdl +impl< + ComInterface: SpiInterface, + SwitchHelper: PowerSwitchInfo + PowerSwitcherCommandSender, + > MgmHandlerLis3Mdl { pub fn periodic_operation(&mut self) { self.stamp_helper.update_from_now(); @@ -356,16 +355,20 @@ impl + PowerSwitcherCommandSender + PowerSwitcherCommandSender> - ModeProvider for MgmHandlerLis3Mdl +impl< + ComInterface: SpiInterface, + SwitchHelper: PowerSwitchInfo + PowerSwitcherCommandSender, + > ModeProvider for MgmHandlerLis3Mdl { fn mode_and_submode(&self) -> ModeAndSubmode { self.mode_helpers.current } } -impl + PowerSwitcherCommandSender> - ModeRequestHandler for MgmHandlerLis3Mdl +impl< + ComInterface: SpiInterface, + SwitchHelper: PowerSwitchInfo + PowerSwitcherCommandSender, + > ModeRequestHandler for MgmHandlerLis3Mdl { type Error = ModeError; @@ -452,8 +455,9 @@ impl + PowerSwitcherCommandSender + PowerSwitcherCommandSender, - > ModeNode for MgmHandlerLis3Mdl + > ModeNode for MgmHandlerLis3Mdl { fn id(&self) -> satrs::ComponentId { self.id.into() @@ -461,8 +465,9 @@ impl< } impl< + ComInterface: SpiInterface, SwitchHelper: PowerSwitchInfo + PowerSwitcherCommandSender, - > ModeChild for MgmHandlerLis3Mdl + > ModeChild for MgmHandlerLis3Mdl { type Sender = mpsc::SyncSender>; @@ -522,7 +527,7 @@ mod tests { pub composite_request_tx: mpsc::Sender>, pub hk_reply_rx: mpsc::Receiver>, pub tm_rx: mpsc::Receiver, - pub handler: MgmHandlerLis3Mdl, + pub handler: MgmHandlerLis3Mdl, } #[derive(Default)] diff --git a/satrs-example/src/interface/tcp.rs b/satrs-example/src/interface/tcp.rs index 021ad31..0a0755f 100644 --- a/satrs-example/src/interface/tcp.rs +++ b/satrs-example/src/interface/tcp.rs @@ -1,17 +1,16 @@ use std::time::Duration; use std::{ collections::{HashSet, VecDeque}, - fmt::Debug, - marker::PhantomData, sync::{Arc, Mutex}, }; use log::{info, warn}; +use satrs::tmtc::{StoreAndSendError, TcSender}; use satrs::{ encoding::ccsds::{SpValidity, SpacePacketValidator}, hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer}, spacepackets::{CcsdsPacket, PacketId}, - tmtc::{PacketSenderRaw, PacketSource}, + tmtc::PacketSource, }; #[derive(Default)] @@ -111,31 +110,23 @@ pub type TcpServer = TcpSpacepacketsServer< SendError, >; -pub struct TcpTask, SendError: Debug + 'static>( - pub TcpServer, - PhantomData, -); +pub struct TcpTask(pub TcpServer); -impl, SendError: Debug + 'static> - TcpTask -{ +impl TcpTask { pub fn new( cfg: ServerConfig, tm_source: SyncTcpTmSource, tc_sender: TcSender, valid_ids: HashSet, ) -> Result { - Ok(Self( - TcpSpacepacketsServer::new( - cfg, - tm_source, - tc_sender, - SimplePacketValidator { valid_ids }, - ConnectionFinishedHandler::default(), - None, - )?, - PhantomData, - )) + Ok(Self(TcpSpacepacketsServer::new( + cfg, + tm_source, + tc_sender, + SimplePacketValidator { valid_ids }, + ConnectionFinishedHandler::default(), + None, + )?)) } pub fn periodic_operation(&mut self) { diff --git a/satrs-example/src/interface/udp.rs b/satrs-example/src/interface/udp.rs index e7720bb..1f13a03 100644 --- a/satrs-example/src/interface/udp.rs +++ b/satrs-example/src/interface/udp.rs @@ -1,10 +1,9 @@ -use core::fmt::Debug; use std::net::{SocketAddr, UdpSocket}; use std::sync::mpsc; use log::{info, warn}; use satrs::pus::HandlingStatus; -use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderRaw}; +use satrs::tmtc::{PacketAsVec, PacketInPool, StoreAndSendError, TcSender}; use satrs::{ hal::std::udp_server::{ReceiveResult, UdpTcServer}, pool::{PoolProviderWithGuards, SharedStaticMemoryPool}, @@ -65,21 +64,12 @@ impl UdpTmHandler for DynamicUdpTmHandler { } } -pub struct UdpTmtcServer< - TcSender: PacketSenderRaw, - TmHandler: UdpTmHandler, - SendError, -> { - pub udp_tc_server: UdpTcServer, +pub struct UdpTmtcServer { + pub udp_tc_server: UdpTcServer, pub tm_handler: TmHandler, } -impl< - TcSender: PacketSenderRaw, - TmHandler: UdpTmHandler, - SendError: Debug + 'static, - > UdpTmtcServer -{ +impl UdpTmtcServer { pub fn periodic_operation(&mut self) { loop { if self.poll_tc_server() == HandlingStatus::Empty { diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 2adf59d..2c6938b 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -1,5 +1,77 @@ +#[cfg(not(feature = "heap_tmtc"))] +use std::sync::RwLock; +use std::{ + net::{IpAddr, SocketAddr}, + sync::{mpsc, Arc, Mutex}, + thread, + time::Duration, +}; + +use acs::mgm::{MgmHandlerLis3Mdl, SpiDummyInterface, SpiSimInterface, SpiSimInterfaceWrapper}; +use eps::{ + pcdu::{PcduHandler, SerialInterfaceDummy, SerialInterfaceToSim, SerialSimInterfaceWrapper}, + PowerSwitchHelper, +}; +use events::EventHandler; +#[cfg(feature = "heap_tmtc")] +use interface::udp::DynamicUdpTmHandler; +#[cfg(not(feature = "heap_tmtc"))] +use interface::udp::StaticUdpTmHandler; +use interface::{ + sim_client_udp::create_sim_client, + tcp::{SyncTcpTmSource, TcpTask}, + udp::UdpTmtcServer, +}; +use log::info; use logger::setup_logger; -use satrs::spacepackets::time::{cds::CdsTime, TimeWriter}; +use pus::{ + action::create_action_service, + event::create_event_service, + hk::create_hk_service, + mode::create_mode_service, + scheduler::{create_scheduler_service, TcReleaser}, + stack::PusStack, + test::create_test_service, + PusTcDistributor, PusTcMpscRouter, +}; +use requests::GenericRequestRouter; +#[cfg(not(feature = "heap_tmtc"))] +use satrs::pus::EcssTcInSharedPoolConverter; +#[cfg(feature = "heap_tmtc")] +use satrs::pus::EcssTcInVecConverter; +#[cfg(not(feature = "heap_tmtc"))] +use satrs::tmtc::{PacketSenderWithSharedPool, SharedPacketPool}; +use satrs::{ + hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, + mode::{Mode, ModeAndSubmode, ModeRequest, ModeRequestHandlerMpscBounded}, + mode_tree::connect_mode_nodes, + pus::{event_man::EventRequestWithToken, EcssTcInMemConverter, HandlingStatus}, + request::{GenericMessage, MessageMetadata}, + spacepackets::time::{cds::CdsTime, TimeWriter}, + tmtc::TcSender, +}; +#[cfg(not(feature = "heap_tmtc"))] +use satrs_example::config::pool::create_static_pools; +use satrs_example::{ + config::{ + components::{MGM_HANDLER_0, NO_SENDER, PCDU_HANDLER, TCP_SERVER, UDP_SERVER}, + pool::create_sched_tc_pool, + tasks::{FREQ_MS_AOCS, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC, SIM_CLIENT_IDLE_DELAY_MS}, + OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT, + }, + DeviceMode, +}; +use tm_sender::TmSender; +#[cfg(feature = "heap_tmtc")] +use tmtc::tc_source::TcSourceTaskDynamic; +#[cfg(not(feature = "heap_tmtc"))] +use tmtc::tc_source::TcSourceTaskStatic; +#[cfg(feature = "heap_tmtc")] +use tmtc::tm_sink::TmSinkDynamic; +#[cfg(not(feature = "heap_tmtc"))] +use tmtc::tm_sink::TmSinkStatic; +use tmtc::{tc_source::TcSourceTask, tm_sink::TmSink}; +mod spi; mod tm_sender; mod main_heap_tmtc; @@ -18,10 +90,390 @@ mod tmtc; fn main() { setup_logger().expect("setting up logging with fern failed"); println!("Running OBSW example"); - #[cfg(not(feature = "heap_tmtc"))] - main_static_tmtc::main_with_static_tmtc_pool(); - #[cfg(feature = "heap_tmtc")] - main_heap_tmtc::main_with_heap_backed_tmtc_pool(); + + cfg_if::cfg_if! { + if #[cfg(not(feature = "heap_tmtc"))] { + let (tm_pool, tc_pool) = create_static_pools(); + let shared_tm_pool = Arc::new(RwLock::new(tm_pool)); + let shared_tc_pool = Arc::new(RwLock::new(tc_pool)); + let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool); + let shared_tc_pool_wrapper = SharedPacketPool::new(&shared_tc_pool); + } + } + + let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50); + let (tm_sink_tx, tm_sink_rx) = mpsc::sync_channel(50); + let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50); + + cfg_if::cfg_if! { + if #[cfg(not(feature = "heap_tmtc"))] { + let tm_sink_tx_sender = + PacketSenderWithSharedPool::new(tm_sink_tx.clone(), shared_tm_pool_wrapper.clone()); + let tm_sender = TmSender::new_static(tm_sink_tx_sender.clone()); + } else if #[cfg(feature = "heap_tmtc")] { + let tm_sender = TmSender::new_heap(tm_sink_tx.clone()); + } + } + + let (sim_request_tx, sim_request_rx) = mpsc::channel(); + let (mgm_sim_reply_tx, mgm_sim_reply_rx) = mpsc::channel(); + let (pcdu_sim_reply_tx, pcdu_sim_reply_rx) = mpsc::channel(); + let mut opt_sim_client = create_sim_client(sim_request_rx); + + let (mgm_handler_composite_tx, mgm_handler_composite_rx) = mpsc::sync_channel(10); + let (pcdu_handler_composite_tx, pcdu_handler_composite_rx) = mpsc::sync_channel(30); + let (mgm_handler_mode_tx, mgm_handler_mode_rx) = mpsc::sync_channel(5); + let (pcdu_handler_mode_tx, pcdu_handler_mode_rx) = mpsc::sync_channel(5); + + // Some request are targetable. This map is used to retrieve sender handles based on a target ID. + let mut request_map = GenericRequestRouter::default(); + request_map + .composite_router_map + .insert(MGM_HANDLER_0.id(), mgm_handler_composite_tx); + request_map + .composite_router_map + .insert(PCDU_HANDLER.id(), pcdu_handler_composite_tx); + + // This helper structure is used by all telecommand providers which need to send telecommands + // to the TC source. + cfg_if::cfg_if! { + if #[cfg(not(feature = "heap_tmtc"))] { + let tc_sender_with_shared_pool = + PacketSenderWithSharedPool::new(tc_source_tx, shared_tc_pool_wrapper.clone()); + let tc_in_mem_converter = + EcssTcInMemConverter::new_static(EcssTcInSharedPoolConverter::new(shared_tc_pool, 4096)); + } else if #[cfg(feature = "heap_tmtc")] { + let tc_in_mem_converter = EcssTcInMemConverter::new_heap(EcssTcInVecConverter::default()); + } + } + + // Create event handling components + // These sender handles are used to send event requests, for example to enable or disable + // certain events. + let (event_tx, event_rx) = mpsc::sync_channel(100); + let (event_request_tx, event_request_rx) = mpsc::channel::(); + + // The event task is the core handler to perform the event routing and TM handling as specified + // in the sat-rs documentation. + let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx); + + let (pus_test_tx, pus_test_rx) = mpsc::channel(); + let (pus_event_tx, pus_event_rx) = mpsc::channel(); + let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); + let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); + let (pus_action_tx, pus_action_rx) = mpsc::channel(); + let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); + + let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); + let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); + let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::sync_channel(5); + + cfg_if::cfg_if! { + if #[cfg(not(feature = "heap_tmtc"))] { + let tc_releaser = TcReleaser::Static(tc_sender_with_shared_pool.clone()); + } else if #[cfg(feature = "heap_tmtc")] { + let tc_releaser = TcReleaser::Heap(tc_source_tx.clone()); + } + } + let pus_router = PusTcMpscRouter { + test_tc_sender: pus_test_tx, + event_tc_sender: pus_event_tx, + sched_tc_sender: pus_sched_tx, + hk_tc_sender: pus_hk_tx, + action_tc_sender: pus_action_tx, + mode_tc_sender: pus_mode_tx, + }; + let pus_test_service = create_test_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), + event_tx.clone(), + pus_test_rx, + ); + let pus_scheduler_service = create_scheduler_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), + tc_releaser, + pus_sched_rx, + create_sched_tc_pool(), + ); + let pus_event_service = create_event_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), + pus_event_rx, + event_request_tx, + ); + let pus_action_service = create_action_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), + pus_action_rx, + request_map.clone(), + pus_action_reply_rx, + ); + let pus_hk_service = create_hk_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), + pus_hk_rx, + request_map.clone(), + pus_hk_reply_rx, + ); + let pus_mode_service = create_mode_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), + pus_mode_rx, + request_map, + pus_mode_reply_rx, + ); + let mut pus_stack = PusStack::new( + pus_test_service, + pus_hk_service, + pus_event_service, + pus_action_service, + pus_scheduler_service, + pus_mode_service, + ); + + cfg_if::cfg_if! { + if #[cfg(not(feature = "heap_tmtc"))] { + let mut tmtc_task = TcSourceTask::Static(TcSourceTaskStatic::new( + shared_tc_pool_wrapper.clone(), + tc_source_rx, + PusTcDistributor::new(tm_sender.clone(), pus_router), + )); + let tc_sender = TcSender::StaticStore(tc_sender_with_shared_pool); + let udp_tm_handler = StaticUdpTmHandler { + tm_rx: tm_server_rx, + tm_store: shared_tm_pool.clone(), + }; + } else if #[cfg(feature = "heap_tmtc")] { + let mut tmtc_task = TcSourceTask::Heap(TcSourceTaskDynamic::new( + tc_source_rx, + PusTcDistributor::new(tm_sender.clone(), pus_router), + )); + let tc_sender = TcSender::HeapStore(tc_source_tx.clone()); + let udp_tm_handler = DynamicUdpTmHandler { + tm_rx: tm_server_rx, + }; + } + } + + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); + let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_sender.clone()) + .expect("creating UDP TMTC server failed"); + let mut udp_tmtc_server = UdpTmtcServer { + udp_tc_server, + tm_handler: udp_tm_handler, + }; + + let tcp_server_cfg = ServerConfig::new( + TCP_SERVER.id(), + sock_addr, + Duration::from_millis(400), + 4096, + 8192, + ); + let sync_tm_tcp_source = SyncTcpTmSource::new(200); + let mut tcp_server = TcpTask::new( + tcp_server_cfg, + sync_tm_tcp_source.clone(), + tc_sender, + PACKET_ID_VALIDATOR.clone(), + ) + .expect("tcp server creation failed"); + + cfg_if::cfg_if! { + if #[cfg(not(feature = "heap_tmtc"))] { + let mut tm_sink = TmSink::Static(TmSinkStatic::new( + shared_tm_pool_wrapper, + sync_tm_tcp_source, + tm_sink_rx, + tm_server_tx, + )); + } else if #[cfg(feature = "heap_tmtc")] { + let mut tm_sink = TmSink::Heap(TmSinkDynamic::new( + sync_tm_tcp_source, + tm_sink_rx, + tm_server_tx, + )); + } + } + + let shared_switch_set = Arc::new(Mutex::default()); + let (switch_request_tx, switch_request_rx) = mpsc::sync_channel(20); + let switch_helper = PowerSwitchHelper::new(switch_request_tx, shared_switch_set.clone()); + + let shared_mgm_set = Arc::default(); + let mgm_mode_node = + ModeRequestHandlerMpscBounded::new(MGM_HANDLER_0.into(), mgm_handler_mode_rx); + let mgm_spi_interface = if let Some(sim_client) = opt_sim_client.as_mut() { + sim_client.add_reply_recipient(satrs_minisim::SimComponent::MgmLis3Mdl, mgm_sim_reply_tx); + SpiSimInterfaceWrapper::Sim(SpiSimInterface { + sim_request_tx: sim_request_tx.clone(), + sim_reply_rx: mgm_sim_reply_rx, + }) + } else { + SpiSimInterfaceWrapper::Dummy(SpiDummyInterface::default()) + }; + let mut mgm_handler = MgmHandlerLis3Mdl::new( + MGM_HANDLER_0, + "MGM_0", + mgm_mode_node, + mgm_handler_composite_rx, + pus_hk_reply_tx.clone(), + switch_helper.clone(), + tm_sender, + mgm_spi_interface, + shared_mgm_set, + ); + // Connect PUS service to device handler. + connect_mode_nodes( + &mut pus_stack.mode_srv, + mgm_handler_mode_tx, + &mut mgm_handler, + pus_mode_reply_tx.clone(), + ); + + let pcdu_serial_interface = if let Some(sim_client) = opt_sim_client.as_mut() { + sim_client.add_reply_recipient(satrs_minisim::SimComponent::Pcdu, pcdu_sim_reply_tx); + SerialSimInterfaceWrapper::Sim(SerialInterfaceToSim::new( + sim_request_tx.clone(), + pcdu_sim_reply_rx, + )) + } else { + SerialSimInterfaceWrapper::Dummy(SerialInterfaceDummy::default()) + }; + let pcdu_mode_node = + ModeRequestHandlerMpscBounded::new(PCDU_HANDLER.into(), pcdu_handler_mode_rx); + let mut pcdu_handler = PcduHandler::new( + PCDU_HANDLER, + "PCDU", + pcdu_mode_node, + pcdu_handler_composite_rx, + pus_hk_reply_tx, + switch_request_rx, + tm_sink_tx, + pcdu_serial_interface, + shared_switch_set, + ); + connect_mode_nodes( + &mut pus_stack.mode_srv, + pcdu_handler_mode_tx.clone(), + &mut pcdu_handler, + pus_mode_reply_tx, + ); + + // The PCDU is a critical component which should be in normal mode immediately. + pcdu_handler_mode_tx + .send(GenericMessage::new( + MessageMetadata::new(0, NO_SENDER), + ModeRequest::SetMode { + mode_and_submode: ModeAndSubmode::new(DeviceMode::Normal as Mode, 0), + forced: false, + }, + )) + .expect("sending initial mode request failed"); + + info!("Starting TMTC and UDP task"); + let jh_udp_tmtc = thread::Builder::new() + .name("SATRS tmtc-udp".to_string()) + .spawn(move || { + info!("Running UDP server on port {SERVER_PORT}"); + loop { + udp_tmtc_server.periodic_operation(); + tmtc_task.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); + } + }) + .unwrap(); + + info!("Starting TCP task"); + let jh_tcp = thread::Builder::new() + .name("sat-rs tcp".to_string()) + .spawn(move || { + info!("Running TCP server on port {SERVER_PORT}"); + loop { + tcp_server.periodic_operation(); + } + }) + .unwrap(); + + info!("Starting TM funnel task"); + let jh_tm_funnel = thread::Builder::new() + .name("tm sink".to_string()) + .spawn(move || loop { + tm_sink.operation(); + }) + .unwrap(); + + let mut opt_jh_sim_client = None; + if let Some(mut sim_client) = opt_sim_client { + info!("Starting UDP sim client task"); + opt_jh_sim_client = Some( + thread::Builder::new() + .name("sat-rs sim adapter".to_string()) + .spawn(move || loop { + if sim_client.operation() == HandlingStatus::Empty { + std::thread::sleep(Duration::from_millis(SIM_CLIENT_IDLE_DELAY_MS)); + } + }) + .unwrap(), + ); + } + + info!("Starting AOCS thread"); + let jh_aocs = thread::Builder::new() + .name("sat-rs aocs".to_string()) + .spawn(move || loop { + mgm_handler.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_AOCS)); + }) + .unwrap(); + + info!("Starting EPS thread"); + let jh_eps = thread::Builder::new() + .name("sat-rs eps".to_string()) + .spawn(move || loop { + // TODO: We should introduce something like a fixed timeslot helper to allow a more + // declarative API. It would also be very useful for the AOCS task. + // + // TODO: The fixed timeslot handler exists.. use it. + pcdu_handler.periodic_operation(crate::eps::pcdu::OpCode::RegularOp); + thread::sleep(Duration::from_millis(50)); + pcdu_handler.periodic_operation(crate::eps::pcdu::OpCode::PollAndRecvReplies); + thread::sleep(Duration::from_millis(50)); + pcdu_handler.periodic_operation(crate::eps::pcdu::OpCode::PollAndRecvReplies); + thread::sleep(Duration::from_millis(300)); + }) + .unwrap(); + + info!("Starting PUS handler thread"); + let jh_pus_handler = thread::Builder::new() + .name("sat-rs pus".to_string()) + .spawn(move || loop { + event_handler.periodic_operation(); + pus_stack.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); + }) + .unwrap(); + + jh_udp_tmtc + .join() + .expect("Joining UDP TMTC server thread failed"); + jh_tcp + .join() + .expect("Joining TCP TMTC server thread failed"); + jh_tm_funnel + .join() + .expect("Joining TM Funnel thread failed"); + if let Some(jh_sim_client) = opt_jh_sim_client { + jh_sim_client + .join() + .expect("Joining SIM client thread failed"); + } + jh_aocs.join().expect("Joining AOCS thread failed"); + jh_eps.join().expect("Joining EPS thread failed"); + jh_pus_handler + .join() + .expect("Joining PUS handler thread failed"); } pub fn update_time(time_provider: &mut CdsTime, timestamp: &mut [u8]) { diff --git a/satrs-example/src/main_heap_tmtc.rs b/satrs-example/src/main_heap_tmtc.rs index b1bd86a..52d113d 100644 --- a/satrs-example/src/main_heap_tmtc.rs +++ b/satrs-example/src/main_heap_tmtc.rs @@ -5,16 +5,17 @@ use crate::eps::{self, PowerSwitchHelper}; use crate::events::EventHandler; use crate::interface::udp::DynamicUdpTmHandler; use crate::pus::stack::PusStack; -use crate::pus::test::create_test_service_dynamic; +use crate::pus::test::create_test_service; use crate::tm_sender::TmSender; -use crate::tmtc::tc_source::TcSourceTaskDynamic; +use crate::tmtc::tc_source::{TcSourceTask, TcSourceTaskDynamic}; use crate::tmtc::tm_sink::TmSinkDynamic; use log::info; use satrs::hal::std::tcp_server::ServerConfig; use satrs::hal::std::udp_server::UdpTcServer; use satrs::mode_tree::connect_mode_nodes; -use satrs::pus::HandlingStatus; +use satrs::pus::{EcssTcInMemConverter, EcssTcInVecConverter, HandlingStatus}; use satrs::request::{GenericMessage, MessageMetadata}; +use satrs::tmtc::TcSender; use satrs_example::config::pool::create_sched_tc_pool; use satrs_example::config::tasks::{ FREQ_MS_AOCS, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC, SIM_CLIENT_IDLE_DELAY_MS, @@ -28,13 +29,13 @@ use crate::acs::mgm::{ use crate::interface::sim_client_udp::create_sim_client; use crate::interface::tcp::{SyncTcpTmSource, TcpTask}; use crate::interface::udp::UdpTmtcServer; -use crate::pus::action::create_action_service_dynamic; -use crate::pus::event::create_event_service_dynamic; -use crate::pus::hk::create_hk_service_dynamic; -use crate::pus::mode::create_mode_service_dynamic; -use crate::pus::scheduler::create_scheduler_service_dynamic; +use crate::pus::action::create_action_service; +use crate::pus::event::create_event_service; +use crate::pus::hk::create_hk_service; +use crate::pus::mode::create_mode_service; +use crate::pus::scheduler::{create_scheduler_service, TcReleaser}; use crate::pus::{PusTcDistributor, PusTcMpscRouter}; -use crate::requests::{CompositeRequest, GenericRequestRouter}; +use crate::requests::GenericRequestRouter; use satrs::mode::{Mode, ModeAndSubmode, ModeRequest, ModeRequestHandlerMpscBounded}; use satrs::pus::event_man::EventRequestWithToken; use satrs_example::config::components::{ @@ -48,9 +49,9 @@ use std::time::Duration; #[allow(dead_code)] pub fn main_with_heap_backed_tmtc_pool() { - let (tc_source_tx, tc_source_rx) = mpsc::channel(); - let (tm_sink_tx, tm_sink_rx) = mpsc::channel(); - let (tm_server_tx, tm_server_rx) = mpsc::channel(); + let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50); + let (tm_sink_tx, tm_sink_rx) = mpsc::sync_channel(50); + let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50); let (sim_request_tx, sim_request_rx) = mpsc::channel(); let (mgm_sim_reply_tx, mgm_sim_reply_rx) = mpsc::channel(); @@ -58,14 +59,10 @@ pub fn main_with_heap_backed_tmtc_pool() { let mut opt_sim_client = create_sim_client(sim_request_rx); // Some request are targetable. This map is used to retrieve sender handles based on a target ID. - let (mgm_handler_composite_tx, mgm_handler_composite_rx) = - mpsc::sync_channel::>(5); - let (pcdu_handler_composite_tx, pcdu_handler_composite_rx) = - mpsc::sync_channel::>(10); - let (mgm_handler_mode_tx, mgm_handler_mode_rx) = - mpsc::sync_channel::>(5); - let (pcdu_handler_mode_tx, pcdu_handler_mode_rx) = - mpsc::sync_channel::>(10); + let (mgm_handler_composite_tx, mgm_handler_composite_rx) = mpsc::sync_channel(5); + let (pcdu_handler_composite_tx, pcdu_handler_composite_rx) = mpsc::sync_channel(10); + let (mgm_handler_mode_tx, mgm_handler_mode_rx) = mpsc::sync_channel(5); + let (pcdu_handler_mode_tx, pcdu_handler_mode_rx) = mpsc::sync_channel(10); // Some request are targetable. This map is used to retrieve sender handles based on a target ID. let mut request_map = GenericRequestRouter::default(); @@ -104,32 +101,46 @@ pub fn main_with_heap_backed_tmtc_pool() { action_tc_sender: pus_action_tx, mode_tc_sender: pus_mode_tx, }; - - let pus_test_service = - create_test_service_dynamic(tm_sink_tx.clone(), event_tx.clone(), pus_test_rx); - let pus_scheduler_service = create_scheduler_service_dynamic( - tm_sink_tx.clone(), - tc_source_tx.clone(), + let tm_sender = TmSender::new_heap(tm_sink_tx.clone()); + let tc_releaser = TcReleaser::new_heap(tc_source_tx.clone()); + let tc_in_mem_converter = EcssTcInMemConverter::new_heap(EcssTcInVecConverter::default()); + let pus_test_service = create_test_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), + event_tx.clone(), + pus_test_rx, + ); + let pus_scheduler_service = create_scheduler_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), + tc_releaser, pus_sched_rx, create_sched_tc_pool(), ); - let pus_event_service = - create_event_service_dynamic(tm_sink_tx.clone(), pus_event_rx, event_request_tx); - let pus_action_service = create_action_service_dynamic( - tm_sink_tx.clone(), + let pus_event_service = create_event_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), + pus_event_rx, + event_request_tx, + ); + let pus_action_service = create_action_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), pus_action_rx, request_map.clone(), pus_action_reply_rx, ); - let pus_hk_service = create_hk_service_dynamic( - tm_sink_tx.clone(), + let pus_hk_service = create_hk_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), pus_hk_rx, request_map.clone(), pus_hk_reply_rx, ); - let pus_mode_service = create_mode_service_dynamic( - tm_sink_tx.clone(), + let pus_mode_service = create_mode_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), pus_mode_rx, request_map, pus_mode_reply_rx, @@ -143,13 +154,14 @@ pub fn main_with_heap_backed_tmtc_pool() { pus_mode_service, ); - let mut tmtc_task = TcSourceTaskDynamic::new( + let mut tmtc_task = TcSourceTask::Heap(TcSourceTaskDynamic::new( tc_source_rx, - PusTcDistributor::new(tm_sink_tx.clone(), pus_router), - ); + PusTcDistributor::new(tm_sender.clone(), pus_router), + )); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_source_tx.clone()) + let tc_sender = TcSender::HeapStore(tc_source_tx.clone()); + let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_sender.clone()) .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, @@ -169,7 +181,7 @@ pub fn main_with_heap_backed_tmtc_pool() { let mut tcp_server = TcpTask::new( tcp_server_cfg, sync_tm_tcp_source.clone(), - tc_source_tx.clone(), + tc_sender.clone(), PACKET_ID_VALIDATOR.clone(), ) .expect("tcp server creation failed"); diff --git a/satrs-example/src/main_static_tmtc.rs b/satrs-example/src/main_static_tmtc.rs index 2e7152a..cc78c16 100644 --- a/satrs-example/src/main_static_tmtc.rs +++ b/satrs-example/src/main_static_tmtc.rs @@ -4,15 +4,16 @@ use crate::eps::pcdu::{ use crate::eps::PowerSwitchHelper; use crate::events::EventHandler; use crate::pus::stack::PusStack; +use crate::tm_sender::TmSender; use crate::tmtc::tc_source::TcSourceTaskStatic; use crate::tmtc::tm_sink::TmSinkStatic; use log::info; use satrs::hal::std::tcp_server::ServerConfig; use satrs::hal::std::udp_server::UdpTcServer; use satrs::mode_tree::connect_mode_nodes; -use satrs::pus::HandlingStatus; +use satrs::pus::{EcssTcInMemConverter, EcssTcInSharedPoolConverter, HandlingStatus}; use satrs::request::{GenericMessage, MessageMetadata}; -use satrs::tmtc::{PacketSenderWithSharedPool, SharedPacketPool}; +use satrs::tmtc::{PacketSenderWithSharedPool, SharedPacketPool, TcSender}; use satrs_example::config::pool::{create_sched_tc_pool, create_static_pools}; use satrs_example::config::tasks::{ FREQ_MS_AOCS, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC, SIM_CLIENT_IDLE_DELAY_MS, @@ -26,12 +27,12 @@ use crate::acs::mgm::{ use crate::interface::sim_client_udp::create_sim_client; use crate::interface::tcp::{SyncTcpTmSource, TcpTask}; use crate::interface::udp::{StaticUdpTmHandler, UdpTmtcServer}; -use crate::pus::action::create_action_service_static; -use crate::pus::event::create_event_service_static; -use crate::pus::hk::create_hk_service_static; -use crate::pus::mode::create_mode_service_static; -use crate::pus::scheduler::create_scheduler_service_static; -use crate::pus::test::create_test_service_static; +use crate::pus::action::create_action_service; +use crate::pus::event::create_event_service; +use crate::pus::hk::create_hk_service; +use crate::pus::mode::create_mode_service; +use crate::pus::scheduler::{create_scheduler_service, TcReleaser}; +use crate::pus::test::create_test_service; use crate::pus::{PusTcDistributor, PusTcMpscRouter}; use crate::requests::{CompositeRequest, GenericRequestRouter}; use satrs::mode::{Mode, ModeAndSubmode, ModeRequest, ModeRequestHandlerMpscBounded}; @@ -58,6 +59,7 @@ pub fn main_with_static_tmtc_pool() { let tm_sink_tx_sender = PacketSenderWithSharedPool::new(tm_sink_tx.clone(), shared_tm_pool_wrapper.clone()); + let tm_sender = TmSender::new_static(tm_sink_tx_sender.clone()); let (sim_request_tx, sim_request_rx) = mpsc::channel(); let (mgm_sim_reply_tx, mgm_sim_reply_rx) = mpsc::channel(); @@ -82,13 +84,13 @@ pub fn main_with_static_tmtc_pool() { request_map .composite_router_map .insert(PCDU_HANDLER.id(), pcdu_handler_composite_tx); - request_map - .mode_router_map - .insert(PCDU_HANDLER.id(), pcdu_handler_mode_tx.clone()); // This helper structure is used by all telecommand providers which need to send telecommands // to the TC source. - let tc_source = PacketSenderWithSharedPool::new(tc_source_tx, shared_tc_pool_wrapper.clone()); + let tc_source_tx_with_pool = + PacketSenderWithSharedPool::new(tc_source_tx, shared_tc_pool_wrapper.clone()); + let tc_in_mem_converter = + EcssTcInMemConverter::new_static(EcssTcInSharedPoolConverter::new(shared_tc_pool, 4096)); // Create event handling components // These sender handles are used to send event requests, for example to enable or disable @@ -111,6 +113,7 @@ pub fn main_with_static_tmtc_pool() { let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::sync_channel(5); + let tc_releaser = TcReleaser::new_static(tc_source_tx_with_pool.clone()); let pus_router = PusTcMpscRouter { test_tc_sender: pus_test_tx, event_tc_sender: pus_event_tx, @@ -119,41 +122,42 @@ pub fn main_with_static_tmtc_pool() { action_tc_sender: pus_action_tx, mode_tc_sender: pus_mode_tx, }; - let pus_test_service = create_test_service_static( - tm_sink_tx_sender.clone(), - shared_tc_pool.clone(), + let pus_test_service = create_test_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), event_tx.clone(), pus_test_rx, ); - let pus_scheduler_service = create_scheduler_service_static( - tm_sink_tx_sender.clone(), - tc_source.clone(), + let pus_scheduler_service = create_scheduler_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), + tc_releaser, pus_sched_rx, create_sched_tc_pool(), ); - let pus_event_service = create_event_service_static( - tm_sink_tx_sender.clone(), - shared_tc_pool.clone(), + let pus_event_service = create_event_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), pus_event_rx, event_request_tx, ); - let pus_action_service = create_action_service_static( - tm_sink_tx_sender.clone(), - shared_tc_pool.clone(), + let pus_action_service = create_action_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), pus_action_rx, request_map.clone(), pus_action_reply_rx, ); - let pus_hk_service = create_hk_service_static( - tm_sink_tx_sender.clone(), - shared_tc_pool.clone(), + let pus_hk_service = create_hk_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), pus_hk_rx, request_map.clone(), pus_hk_reply_rx, ); - let pus_mode_service = create_mode_service_static( - tm_sink_tx_sender.clone(), - shared_tc_pool.clone(), + let pus_mode_service = create_mode_service( + tm_sender.clone(), + tc_in_mem_converter.clone(), pus_mode_rx, request_map, pus_mode_reply_rx, @@ -170,11 +174,12 @@ pub fn main_with_static_tmtc_pool() { let mut tmtc_task = TcSourceTaskStatic::new( shared_tc_pool_wrapper.clone(), tc_source_rx, - PusTcDistributor::new(tm_sink_tx_sender, pus_router), + PusTcDistributor::new(tm_sender.clone(), pus_router), ); + let tc_sender = TcSender::StaticStore(tc_source_tx_with_pool); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_source.clone()) + let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_sender.clone()) .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, @@ -195,7 +200,7 @@ pub fn main_with_static_tmtc_pool() { let mut tcp_server = TcpTask::new( tcp_server_cfg, sync_tm_tcp_source.clone(), - tc_source.clone(), + tc_sender, PACKET_ID_VALIDATOR.clone(), ) .expect("tcp server creation failed"); @@ -230,7 +235,7 @@ pub fn main_with_static_tmtc_pool() { mgm_handler_composite_rx, pus_hk_reply_tx.clone(), switch_helper.clone(), - tm_sink_tx.clone(), + tm_sender, mgm_spi_interface, shared_mgm_set, ); diff --git a/satrs-example/src/pus/action.rs b/satrs-example/src/pus/action.rs index 238f1c5..7215dcb 100644 --- a/satrs-example/src/pus/action.rs +++ b/satrs-example/src/pus/action.rs @@ -1,6 +1,5 @@ use log::warn; use satrs::action::{ActionRequest, ActionRequestVariant}; -use satrs::pool::SharedStaticMemoryPool; use satrs::pus::action::{ ActionReplyPus, ActionReplyVariant, ActivePusActionRequestStd, DefaultActiveActionRequestMap, }; @@ -10,21 +9,20 @@ use satrs::pus::verification::{ VerificationReportingProvider, VerificationToken, }; use satrs::pus::{ - ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, - EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, MpscTcReceiver, - MpscTmAsVecSender, PusPacketHandlingError, PusReplyHandler, PusServiceHelper, - PusTcToRequestConverter, + ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, EcssTmSender, EcssTmtcError, + GenericConversionError, MpscTcReceiver, PusPacketHandlingError, PusReplyHandler, + PusServiceHelper, PusTcToRequestConverter, }; use satrs::request::{GenericMessage, UniqueApidTargetId}; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket, PusServiceId}; -use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool}; use satrs_example::config::components::PUS_ACTION_SERVICE; use satrs_example::config::tmtc_err; use std::sync::mpsc; use std::time::Duration; use crate::requests::GenericRequestRouter; +use crate::tm_sender::TmSender; use super::{ create_verification_reporter, generic_pus_request_timeout_handler, HandlingStatus, @@ -207,20 +205,20 @@ impl PusTcToRequestConverter for Actio } } -pub fn create_action_service_static( - tm_sender: PacketSenderWithSharedPool, - tc_pool: SharedStaticMemoryPool, +pub fn create_action_service( + tm_sender: TmSender, + tc_in_mem_converter: EcssTcInMemConverter, pus_action_rx: mpsc::Receiver, action_router: GenericRequestRouter, reply_receiver: mpsc::Receiver>, -) -> ActionServiceWrapper { +) -> ActionServiceWrapper { let action_request_handler = PusTargetedRequestService::new( PusServiceHelper::new( PUS_ACTION_SERVICE.id(), pus_action_rx, tm_sender, create_verification_reporter(PUS_ACTION_SERVICE.id(), PUS_ACTION_SERVICE.apid), - EcssTcInSharedStoreConverter::new(tc_pool.clone(), 2048), + tc_in_mem_converter, ), ActionRequestConverter::default(), // TODO: Implementation which does not use run-time allocation? Maybe something like @@ -235,36 +233,11 @@ pub fn create_action_service_static( } } -pub fn create_action_service_dynamic( - tm_funnel_tx: mpsc::Sender, - pus_action_rx: mpsc::Receiver, - action_router: GenericRequestRouter, - reply_receiver: mpsc::Receiver>, -) -> ActionServiceWrapper { - let action_request_handler = PusTargetedRequestService::new( - PusServiceHelper::new( - PUS_ACTION_SERVICE.id(), - pus_action_rx, - tm_funnel_tx, - create_verification_reporter(PUS_ACTION_SERVICE.id(), PUS_ACTION_SERVICE.apid), - EcssTcInVecConverter::default(), - ), - ActionRequestConverter::default(), - DefaultActiveActionRequestMap::default(), - ActionReplyHandler::default(), - action_router, - reply_receiver, - ); - ActionServiceWrapper { - service: action_request_handler, - } -} - -pub struct ActionServiceWrapper { +pub struct ActionServiceWrapper { pub(crate) service: PusTargetedRequestService< MpscTcReceiver, TmSender, - TcInMemConverter, + EcssTcInMemConverter, VerificationReporter, ActionRequestConverter, ActionReplyHandler, @@ -275,9 +248,7 @@ pub struct ActionServiceWrapper, } -impl TargetedPusService - for ActionServiceWrapper -{ +impl TargetedPusService for ActionServiceWrapper { const SERVICE_ID: u8 = PusServiceId::Action as u8; const SERVICE_STR: &'static str = "action"; @@ -303,9 +274,10 @@ mod tests { use satrs::pus::test_util::{ TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1, }; - use satrs::pus::verification; use satrs::pus::verification::test_util::TestVerificationReporter; + use satrs::pus::{verification, EcssTcInVecConverter}; use satrs::request::MessageMetadata; + use satrs::tmtc::PacketAsVec; use satrs::ComponentId; use satrs::{ res_code::ResultU16, diff --git a/satrs-example/src/pus/event.rs b/satrs-example/src/pus/event.rs index caecdd9..a0d9af7 100644 --- a/satrs-example/src/pus/event.rs +++ b/satrs-example/src/pus/event.rs @@ -1,34 +1,32 @@ use std::sync::mpsc; use crate::pus::create_verification_reporter; -use satrs::pool::SharedStaticMemoryPool; +use crate::tm_sender::TmSender; use satrs::pus::event_man::EventRequestWithToken; use satrs::pus::event_srv::PusEventServiceHandler; use satrs::pus::verification::VerificationReporter; use satrs::pus::{ - DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInMemConverter, - EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTmSender, MpscTcReceiver, - MpscTmAsVecSender, PartialPusHandlingError, PusServiceHelper, + DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInMemConverter, MpscTcReceiver, + PartialPusHandlingError, PusServiceHelper, }; use satrs::spacepackets::ecss::PusServiceId; -use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool}; use satrs_example::config::components::PUS_EVENT_MANAGEMENT; use super::{DirectPusService, HandlingStatus}; -pub fn create_event_service_static( - tm_sender: PacketSenderWithSharedPool, - tc_pool: SharedStaticMemoryPool, +pub fn create_event_service( + tm_sender: TmSender, + tm_in_pool_converter: EcssTcInMemConverter, pus_event_rx: mpsc::Receiver, event_request_tx: mpsc::Sender, -) -> EventServiceWrapper { +) -> EventServiceWrapper { let pus_5_handler = PusEventServiceHandler::new( PusServiceHelper::new( PUS_EVENT_MANAGEMENT.id(), pus_event_rx, tm_sender, create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid), - EcssTcInSharedStoreConverter::new(tc_pool.clone(), 2048), + tm_in_pool_converter, ), event_request_tx, ); @@ -37,34 +35,16 @@ pub fn create_event_service_static( } } -pub fn create_event_service_dynamic( - tm_funnel_tx: mpsc::Sender, - pus_event_rx: mpsc::Receiver, - event_request_tx: mpsc::Sender, -) -> EventServiceWrapper { - let pus_5_handler = PusEventServiceHandler::new( - PusServiceHelper::new( - PUS_EVENT_MANAGEMENT.id(), - pus_event_rx, - tm_funnel_tx, - create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid), - EcssTcInVecConverter::default(), - ), - event_request_tx, - ); - EventServiceWrapper { - handler: pus_5_handler, - } +pub struct EventServiceWrapper { + pub handler: PusEventServiceHandler< + MpscTcReceiver, + TmSender, + EcssTcInMemConverter, + VerificationReporter, + >, } -pub struct EventServiceWrapper { - pub handler: - PusEventServiceHandler, -} - -impl DirectPusService - for EventServiceWrapper -{ +impl DirectPusService for EventServiceWrapper { const SERVICE_ID: u8 = PusServiceId::Event as u8; const SERVICE_STR: &'static str = "events"; diff --git a/satrs-example/src/pus/hk.rs b/satrs-example/src/pus/hk.rs index c0d21bf..43f3453 100644 --- a/satrs-example/src/pus/hk.rs +++ b/satrs-example/src/pus/hk.rs @@ -1,21 +1,18 @@ use derive_new::new; use satrs::hk::{CollectionIntervalFactor, HkRequest, HkRequestVariant, UniqueId}; -use satrs::pool::SharedStaticMemoryPool; use satrs::pus::verification::{ FailParams, TcStateAccepted, TcStateStarted, VerificationReporter, VerificationReportingProvider, VerificationToken, }; use satrs::pus::{ ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken, - EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTmSender, - EcssTmtcError, GenericConversionError, MpscTcReceiver, MpscTmAsVecSender, + EcssTcInMemConverter, EcssTmSender, EcssTmtcError, GenericConversionError, MpscTcReceiver, PusPacketHandlingError, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter, }; use satrs::request::{GenericMessage, UniqueApidTargetId}; use satrs::res_code::ResultU16; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::{hk, PusPacket, PusServiceId}; -use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool}; use satrs_example::config::components::PUS_HK_SERVICE; use satrs_example::config::{hk_err, tmtc_err}; use std::sync::mpsc; @@ -23,6 +20,7 @@ use std::time::Duration; use crate::pus::{create_verification_reporter, generic_pus_request_timeout_handler}; use crate::requests::GenericRequestRouter; +use crate::tm_sender::TmSender; use super::{HandlingStatus, PusTargetedRequestService, TargetedPusService}; @@ -242,20 +240,20 @@ impl PusTcToRequestConverter for HkRequestConver } } -pub fn create_hk_service_static( - tm_sender: PacketSenderWithSharedPool, - tc_pool: SharedStaticMemoryPool, +pub fn create_hk_service( + tm_sender: TmSender, + tc_in_mem_converter: EcssTcInMemConverter, pus_hk_rx: mpsc::Receiver, request_router: GenericRequestRouter, reply_receiver: mpsc::Receiver>, -) -> HkServiceWrapper { +) -> HkServiceWrapper { let pus_3_handler = PusTargetedRequestService::new( PusServiceHelper::new( PUS_HK_SERVICE.id(), pus_hk_rx, tm_sender, create_verification_reporter(PUS_HK_SERVICE.id(), PUS_HK_SERVICE.apid), - EcssTcInSharedStoreConverter::new(tc_pool, 2048), + tc_in_mem_converter, ), HkRequestConverter::default(), DefaultActiveRequestMap::default(), @@ -268,36 +266,11 @@ pub fn create_hk_service_static( } } -pub fn create_hk_service_dynamic( - tm_funnel_tx: mpsc::Sender, - pus_hk_rx: mpsc::Receiver, - request_router: GenericRequestRouter, - reply_receiver: mpsc::Receiver>, -) -> HkServiceWrapper { - let pus_3_handler = PusTargetedRequestService::new( - PusServiceHelper::new( - PUS_HK_SERVICE.id(), - pus_hk_rx, - tm_funnel_tx, - create_verification_reporter(PUS_HK_SERVICE.id(), PUS_HK_SERVICE.apid), - EcssTcInVecConverter::default(), - ), - HkRequestConverter::default(), - DefaultActiveRequestMap::default(), - HkReplyHandler::default(), - request_router, - reply_receiver, - ); - HkServiceWrapper { - service: pus_3_handler, - } -} - -pub struct HkServiceWrapper { +pub struct HkServiceWrapper { pub(crate) service: PusTargetedRequestService< MpscTcReceiver, TmSender, - TcInMemConverter, + EcssTcInMemConverter, VerificationReporter, HkRequestConverter, HkReplyHandler, @@ -308,9 +281,7 @@ pub struct HkServiceWrapper, } -impl TargetedPusService - for HkServiceWrapper -{ +impl TargetedPusService for HkServiceWrapper { const SERVICE_ID: u8 = PusServiceId::Housekeeping as u8; const SERVICE_STR: &'static str = "housekeeping"; diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index 3ce5533..a565713 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -1,4 +1,5 @@ use crate::requests::GenericRequestRouter; +use crate::tm_sender::TmSender; use log::warn; use satrs::pool::PoolAddr; use satrs::pus::verification::{ @@ -6,7 +7,7 @@ use satrs::pus::verification::{ VerificationReporterCfg, VerificationReportingProvider, VerificationToken, }; use satrs::pus::{ - ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, + ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConversionProvider, EcssTcReceiver, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError, HandlingStatus, PusPacketHandlingError, PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory, @@ -49,7 +50,7 @@ pub struct PusTcMpscRouter { pub mode_tc_sender: Sender, } -pub struct PusTcDistributor { +pub struct PusTcDistributor { #[allow(dead_code)] pub id: ComponentId, pub tm_sender: TmSender, @@ -58,7 +59,7 @@ pub struct PusTcDistributor { stamp_helper: TimestampHelper, } -impl PusTcDistributor { +impl PusTcDistributor { pub fn new(tm_sender: TmSender, pus_router: PusTcMpscRouter) -> Self { Self { id: PUS_ROUTING_SERVICE.raw(), @@ -269,7 +270,7 @@ pub trait DirectPusService { pub struct PusTargetedRequestService< TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, - TcInMemConverter: EcssTcInMemConverter, + TcInMemConverter: EcssTcInMemConversionProvider, VerificationReporter: VerificationReportingProvider, RequestConverter: PusTcToRequestConverter, ReplyHandler: PusReplyHandler, @@ -291,7 +292,7 @@ pub struct PusTargetedRequestService< impl< TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, - TcInMemConverter: EcssTcInMemConverter, + TcInMemConverter: EcssTcInMemConversionProvider, VerificationReporter: VerificationReportingProvider, RequestConverter: PusTcToRequestConverter, ReplyHandler: PusReplyHandler, diff --git a/satrs-example/src/pus/mode.rs b/satrs-example/src/pus/mode.rs index 181680c..867c542 100644 --- a/satrs-example/src/pus/mode.rs +++ b/satrs-example/src/pus/mode.rs @@ -1,16 +1,14 @@ use derive_new::new; use satrs::mode_tree::{ModeNode, ModeParent}; -use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool}; use std::sync::mpsc; use std::time::Duration; use crate::requests::GenericRequestRouter; -use satrs::pool::SharedStaticMemoryPool; +use crate::tm_sender::TmSender; use satrs::pus::verification::VerificationReporter; use satrs::pus::{ - DefaultActiveRequestMap, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, - EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlingError, - PusServiceHelper, + DefaultActiveRequestMap, EcssTcAndToken, EcssTcInMemConverter, MpscTcReceiver, + PusPacketHandlingError, PusServiceHelper, }; use satrs::request::GenericMessage; use satrs::{ @@ -210,20 +208,20 @@ impl PusTcToRequestConverter for ModeRequestCo } } -pub fn create_mode_service_static( - tm_sender: PacketSenderWithSharedPool, - tc_pool: SharedStaticMemoryPool, +pub fn create_mode_service( + tm_sender: TmSender, + tc_in_mem_converter: EcssTcInMemConverter, pus_action_rx: mpsc::Receiver, mode_router: GenericRequestRouter, reply_receiver: mpsc::Receiver>, -) -> ModeServiceWrapper { +) -> ModeServiceWrapper { let mode_request_handler = PusTargetedRequestService::new( PusServiceHelper::new( PUS_MODE_SERVICE.id(), pus_action_rx, tm_sender, create_verification_reporter(PUS_MODE_SERVICE.id(), PUS_MODE_SERVICE.apid), - EcssTcInSharedStoreConverter::new(tc_pool, 2048), + tc_in_mem_converter, ), ModeRequestConverter::default(), DefaultActiveRequestMap::default(), @@ -236,36 +234,11 @@ pub fn create_mode_service_static( } } -pub fn create_mode_service_dynamic( - tm_funnel_tx: mpsc::Sender, - pus_action_rx: mpsc::Receiver, - mode_router: GenericRequestRouter, - reply_receiver: mpsc::Receiver>, -) -> ModeServiceWrapper { - let mode_request_handler = PusTargetedRequestService::new( - PusServiceHelper::new( - PUS_MODE_SERVICE.id(), - pus_action_rx, - tm_funnel_tx, - create_verification_reporter(PUS_MODE_SERVICE.id(), PUS_MODE_SERVICE.apid), - EcssTcInVecConverter::default(), - ), - ModeRequestConverter::default(), - DefaultActiveRequestMap::default(), - ModeReplyHandler::new(PUS_MODE_SERVICE.id()), - mode_router, - reply_receiver, - ); - ModeServiceWrapper { - service: mode_request_handler, - } -} - -pub struct ModeServiceWrapper { +pub struct ModeServiceWrapper { pub(crate) service: PusTargetedRequestService< MpscTcReceiver, TmSender, - TcInMemConverter, + EcssTcInMemConverter, VerificationReporter, ModeRequestConverter, ModeReplyHandler, @@ -276,17 +249,13 @@ pub struct ModeServiceWrapper, } -impl ModeNode - for ModeServiceWrapper -{ +impl ModeNode for ModeServiceWrapper { fn id(&self) -> ComponentId { self.service.service_helper.id() } } -impl ModeParent - for ModeServiceWrapper -{ +impl ModeParent for ModeServiceWrapper { type Sender = mpsc::SyncSender>; fn add_mode_child(&mut self, id: ComponentId, request_sender: Self::Sender) { @@ -297,9 +266,7 @@ impl ModeParent } } -impl TargetedPusService - for ModeServiceWrapper -{ +impl TargetedPusService for ModeServiceWrapper { const SERVICE_ID: u8 = CustomPusServiceId::Mode as u8; const SERVICE_STR: &'static str = "mode"; diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index eaa03c4..a6d05ea 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -2,15 +2,15 @@ use std::sync::mpsc; use std::time::Duration; use crate::pus::create_verification_reporter; +use crate::tm_sender::TmSender; use log::info; use satrs::pool::{PoolProvider, StaticMemoryPool}; use satrs::pus::scheduler::{PusScheduler, TcInfo}; use satrs::pus::scheduler_srv::PusSchedServiceHandler; use satrs::pus::verification::VerificationReporter; use satrs::pus::{ - DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInMemConverter, - EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTmSender, MpscTcReceiver, - MpscTmAsVecSender, PartialPusHandlingError, PusServiceHelper, + DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInMemConverter, MpscTcReceiver, + PartialPusHandlingError, PusServiceHelper, }; use satrs::spacepackets::ecss::PusServiceId; use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool}; @@ -19,11 +19,11 @@ use satrs_example::config::components::PUS_SCHED_SERVICE; use super::{DirectPusService, HandlingStatus}; -pub trait TcReleaser { +pub trait TcReleaseProvider { fn release(&mut self, sender_id: ComponentId, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool; } -impl TcReleaser for PacketSenderWithSharedPool { +impl TcReleaseProvider for PacketSenderWithSharedPool { fn release( &mut self, sender_id: ComponentId, @@ -48,7 +48,7 @@ impl TcReleaser for PacketSenderWithSharedPool { } } -impl TcReleaser for mpsc::Sender { +impl TcReleaseProvider for mpsc::SyncSender { fn release( &mut self, sender_id: ComponentId, @@ -65,23 +65,43 @@ impl TcReleaser for mpsc::Sender { } } -pub struct SchedulingServiceWrapper -{ +pub enum TcReleaser { + Static(PacketSenderWithSharedPool), + Heap(mpsc::SyncSender), +} + +impl TcReleaser { + pub fn new_static(sender: PacketSenderWithSharedPool) -> Self { + Self::Static(sender) + } + pub fn new_heap(sender: mpsc::SyncSender) -> Self { + Self::Heap(sender) + } +} + +impl TcReleaseProvider for TcReleaser { + fn release(&mut self, sender_id: ComponentId, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool { + match self { + TcReleaser::Static(sender) => sender.release(sender_id, enabled, info, tc), + TcReleaser::Heap(sender) => sender.release(sender_id, enabled, info, tc), + } + } +} + +pub struct SchedulingServiceWrapper { pub pus_11_handler: PusSchedServiceHandler< MpscTcReceiver, TmSender, - TcInMemConverter, + EcssTcInMemConverter, VerificationReporter, PusScheduler, >, pub sched_tc_pool: StaticMemoryPool, pub releaser_buf: [u8; 4096], - pub tc_releaser: Box, + pub tc_releaser: TcReleaser, } -impl DirectPusService - for SchedulingServiceWrapper -{ +impl DirectPusService for SchedulingServiceWrapper { const SERVICE_ID: u8 = PusServiceId::Verification as u8; const SERVICE_STR: &'static str = "verification"; @@ -134,9 +154,7 @@ impl DirectPusSe } } -impl - SchedulingServiceWrapper -{ +impl SchedulingServiceWrapper { pub fn release_tcs(&mut self) { let id = self.pus_11_handler.service_helper.id(); let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool { @@ -162,12 +180,13 @@ impl } } -pub fn create_scheduler_service_static( - tm_sender: PacketSenderWithSharedPool, - tc_releaser: PacketSenderWithSharedPool, +pub fn create_scheduler_service( + tm_sender: TmSender, + tc_in_mem_converter: EcssTcInMemConverter, + tc_releaser: TcReleaser, pus_sched_rx: mpsc::Receiver, sched_tc_pool: StaticMemoryPool, -) -> SchedulingServiceWrapper { +) -> SchedulingServiceWrapper { let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) .expect("Creating PUS Scheduler failed"); let pus_11_handler = PusSchedServiceHandler::new( @@ -176,7 +195,7 @@ pub fn create_scheduler_service_static( pus_sched_rx, tm_sender, create_verification_reporter(PUS_SCHED_SERVICE.id(), PUS_SCHED_SERVICE.apid), - EcssTcInSharedStoreConverter::new(tc_releaser.shared_packet_store().0.clone(), 2048), + tc_in_mem_converter, ), scheduler, ); @@ -184,34 +203,6 @@ pub fn create_scheduler_service_static( pus_11_handler, sched_tc_pool, releaser_buf: [0; 4096], - tc_releaser: Box::new(tc_releaser), - } -} - -pub fn create_scheduler_service_dynamic( - tm_funnel_tx: mpsc::Sender, - tc_source_sender: mpsc::Sender, - pus_sched_rx: mpsc::Receiver, - sched_tc_pool: StaticMemoryPool, -) -> SchedulingServiceWrapper { - //let sched_srv_receiver = - //MpscTcReceiver::new(PUS_SCHED_SERVICE.raw(), "PUS_11_TC_RECV", pus_sched_rx); - let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) - .expect("Creating PUS Scheduler failed"); - let pus_11_handler = PusSchedServiceHandler::new( - PusServiceHelper::new( - PUS_SCHED_SERVICE.id(), - pus_sched_rx, - tm_funnel_tx, - create_verification_reporter(PUS_SCHED_SERVICE.id(), PUS_SCHED_SERVICE.apid), - EcssTcInVecConverter::default(), - ), - scheduler, - ); - SchedulingServiceWrapper { - pus_11_handler, - sched_tc_pool, - releaser_buf: [0; 4096], - tc_releaser: Box::new(tc_source_sender), + tc_releaser, } } diff --git a/satrs-example/src/pus/stack.rs b/satrs-example/src/pus/stack.rs index 67c3b57..c6d0b1b 100644 --- a/satrs-example/src/pus/stack.rs +++ b/satrs-example/src/pus/stack.rs @@ -1,9 +1,6 @@ use crate::pus::mode::ModeServiceWrapper; use derive_new::new; -use satrs::{ - pus::{EcssTcInMemConverter, EcssTmSender}, - spacepackets::time::{cds, TimeWriter}, -}; +use satrs::spacepackets::time::{cds, TimeWriter}; use super::{ action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper, @@ -11,21 +8,17 @@ use super::{ HandlingStatus, TargetedPusService, }; -// TODO: For better extensibility, we could create 2 vectors: One for direct PUS services and one -// for targeted services.. #[derive(new)] -pub struct PusStack { - pub test_srv: TestCustomServiceWrapper, - pub hk_srv_wrapper: HkServiceWrapper, - pub event_srv: EventServiceWrapper, - pub action_srv_wrapper: ActionServiceWrapper, - pub schedule_srv: SchedulingServiceWrapper, - pub mode_srv: ModeServiceWrapper, +pub struct PusStack { + pub test_srv: TestCustomServiceWrapper, + pub hk_srv_wrapper: HkServiceWrapper, + pub event_srv: EventServiceWrapper, + pub action_srv_wrapper: ActionServiceWrapper, + pub schedule_srv: SchedulingServiceWrapper, + pub mode_srv: ModeServiceWrapper, } -impl - PusStack -{ +impl PusStack { pub fn periodic_operation(&mut self) { // Release all telecommands which reached their release time before calling the service // handlers. diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index 473dc3e..253af98 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -1,35 +1,34 @@ use crate::pus::create_verification_reporter; +use crate::tm_sender::TmSender; use log::info; use satrs::event_man::{EventMessage, EventMessageU32}; -use satrs::pool::SharedStaticMemoryPool; use satrs::pus::test::PusService17TestHandler; use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; +use satrs::pus::PartialPusHandlingError; use satrs::pus::{ - DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, - EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, PusServiceHelper, + DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInMemConversionProvider, + EcssTcInMemConverter, MpscTcReceiver, PusServiceHelper, }; -use satrs::pus::{EcssTcInSharedStoreConverter, PartialPusHandlingError}; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::{PusPacket, PusServiceId}; -use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool}; use satrs_example::config::components::PUS_TEST_SERVICE; use satrs_example::config::{tmtc_err, TEST_EVENT}; use std::sync::mpsc; use super::{DirectPusService, HandlingStatus}; -pub fn create_test_service_static( - tm_sender: PacketSenderWithSharedPool, - tc_pool: SharedStaticMemoryPool, +pub fn create_test_service( + tm_sender: TmSender, + tc_in_mem_converter: EcssTcInMemConverter, event_sender: mpsc::SyncSender, pus_test_rx: mpsc::Receiver, -) -> TestCustomServiceWrapper { +) -> TestCustomServiceWrapper { let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( PUS_TEST_SERVICE.id(), pus_test_rx, tm_sender, create_verification_reporter(PUS_TEST_SERVICE.id(), PUS_TEST_SERVICE.apid), - EcssTcInSharedStoreConverter::new(tc_pool, 2048), + tc_in_mem_converter, )); TestCustomServiceWrapper { handler: pus17_handler, @@ -37,34 +36,17 @@ pub fn create_test_service_static( } } -pub fn create_test_service_dynamic( - tm_funnel_tx: mpsc::Sender, - event_sender: mpsc::SyncSender, - pus_test_rx: mpsc::Receiver, -) -> TestCustomServiceWrapper { - let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( - PUS_TEST_SERVICE.id(), - pus_test_rx, - tm_funnel_tx, - create_verification_reporter(PUS_TEST_SERVICE.id(), PUS_TEST_SERVICE.apid), - EcssTcInVecConverter::default(), - )); - TestCustomServiceWrapper { - handler: pus17_handler, - event_tx: event_sender, - } -} - -pub struct TestCustomServiceWrapper -{ - pub handler: - PusService17TestHandler, +pub struct TestCustomServiceWrapper { + pub handler: PusService17TestHandler< + MpscTcReceiver, + TmSender, + EcssTcInMemConverter, + VerificationReporter, + >, pub event_tx: mpsc::SyncSender, } -impl DirectPusService - for TestCustomServiceWrapper -{ +impl DirectPusService for TestCustomServiceWrapper { const SERVICE_ID: u8 = PusServiceId::Test as u8; const SERVICE_STR: &'static str = "test"; diff --git a/satrs-example/src/spi.rs b/satrs-example/src/spi.rs new file mode 100644 index 0000000..165e6da --- /dev/null +++ b/satrs-example/src/spi.rs @@ -0,0 +1,6 @@ +use core::fmt::Debug; + +pub trait SpiInterface { + type Error: Debug; + fn transfer(&mut self, tx: &[u8], rx: &mut [u8]) -> Result<(), Self::Error>; +} diff --git a/satrs-example/src/tm_sender.rs b/satrs-example/src/tm_sender.rs index fb7e5eb..70f9cb7 100644 --- a/satrs-example/src/tm_sender.rs +++ b/satrs-example/src/tm_sender.rs @@ -7,9 +7,10 @@ use satrs::{ tmtc::{PacketAsVec, PacketSenderWithSharedPool}, }; +#[derive(Debug, Clone)] pub enum TmSender { Static(PacketSenderWithSharedPool), - Heap(mpsc::Sender), + Heap(mpsc::SyncSender), } impl TmSender { @@ -17,7 +18,7 @@ impl TmSender { TmSender::Static(sender) } - pub fn new_heap(sender: mpsc::Sender) -> Self { + pub fn new_heap(sender: mpsc::SyncSender) -> Self { TmSender::Heap(sender) } } diff --git a/satrs-example/src/tmtc/tc_source.rs b/satrs-example/src/tmtc/tc_source.rs index 94b642c..88f4e51 100644 --- a/satrs-example/src/tmtc/tc_source.rs +++ b/satrs-example/src/tmtc/tc_source.rs @@ -1,12 +1,10 @@ use satrs::{ pool::PoolProvider, pus::HandlingStatus, - tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool, SharedPacketPool}, + tmtc::{PacketAsVec, PacketInPool, SharedPacketPool}, }; use std::sync::mpsc::{self, TryRecvError}; -use satrs::pus::MpscTmAsVecSender; - use crate::pus::PusTcDistributor; // TC source components where static pools are the backing memory of the received telecommands. @@ -14,14 +12,14 @@ pub struct TcSourceTaskStatic { shared_tc_pool: SharedPacketPool, tc_receiver: mpsc::Receiver, tc_buf: [u8; 4096], - pus_distributor: PusTcDistributor, + pus_distributor: PusTcDistributor, } impl TcSourceTaskStatic { pub fn new( shared_tc_pool: SharedPacketPool, tc_receiver: mpsc::Receiver, - pus_receiver: PusTcDistributor, + pus_receiver: PusTcDistributor, ) -> Self { Self { shared_tc_pool, @@ -67,14 +65,11 @@ impl TcSourceTaskStatic { // TC source components where the heap is the backing memory of the received telecommands. pub struct TcSourceTaskDynamic { pub tc_receiver: mpsc::Receiver, - pus_distributor: PusTcDistributor, + pus_distributor: PusTcDistributor, } impl TcSourceTaskDynamic { - pub fn new( - tc_receiver: mpsc::Receiver, - pus_receiver: PusTcDistributor, - ) -> Self { + pub fn new(tc_receiver: mpsc::Receiver, pus_receiver: PusTcDistributor) -> Self { Self { tc_receiver, pus_distributor: pus_receiver, @@ -105,3 +100,18 @@ impl TcSourceTaskDynamic { } } } + +#[allow(dead_code)] +pub enum TcSourceTask { + Static(TcSourceTaskStatic), + Heap(TcSourceTaskDynamic), +} + +impl TcSourceTask { + pub fn periodic_operation(&mut self) { + match self { + TcSourceTask::Static(task) => task.periodic_operation(), + TcSourceTask::Heap(task) => task.periodic_operation(), + } + } +} diff --git a/satrs-example/src/tmtc/tm_sink.rs b/satrs-example/src/tmtc/tm_sink.rs index 054ff9c..9e1b314 100644 --- a/satrs-example/src/tmtc/tm_sink.rs +++ b/satrs-example/src/tmtc/tm_sink.rs @@ -132,14 +132,14 @@ impl TmSinkStatic { pub struct TmSinkDynamic { common: TmFunnelCommon, tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::Sender, + tm_server_tx: mpsc::SyncSender, } impl TmSinkDynamic { pub fn new( sync_tm_tcp_source: SyncTcpTmSource, tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::Sender, + tm_server_tx: mpsc::SyncSender, ) -> Self { Self { common: TmFunnelCommon::new(sync_tm_tcp_source), @@ -162,3 +162,18 @@ impl TmSinkDynamic { } } } + +#[allow(dead_code)] +pub enum TmSink { + Static(TmSinkStatic), + Heap(TmSinkDynamic), +} + +impl TmSink { + pub fn operation(&mut self) { + match self { + TmSink::Static(static_sink) => static_sink.operation(), + TmSink::Heap(dynamic_sink) => dynamic_sink.operation(), + } + } +} diff --git a/satrs/src/pool.rs b/satrs/src/pool.rs index 86e4b75..45dc84b 100644 --- a/satrs/src/pool.rs +++ b/satrs/src/pool.rs @@ -822,7 +822,7 @@ mod alloc_mod { /// if the next fitting subpool is full. This is useful to ensure the pool remains useful /// for all data sizes as long as possible. However, an undesirable side-effect might be /// the chocking of larger subpools by underdimensioned smaller subpools. - #[derive(Clone)] + #[derive(Debug, Clone)] pub struct StaticPoolConfig { cfg: Vec, spill_to_higher_subpools: bool, @@ -881,6 +881,7 @@ mod alloc_mod { /// [address][PoolAddr] type. Adding any data to the pool will yield a store address. /// Modification and read operations are done using a reference to a store address. Deletion /// will consume the store address. + #[derive(Debug)] pub struct StaticMemoryPool { pool_cfg: StaticPoolConfig, pool: Vec>, diff --git a/satrs/src/pus/event_srv.rs b/satrs/src/pus/event_srv.rs index cb1bcb5..dbf2861 100644 --- a/satrs/src/pus/event_srv.rs +++ b/satrs/src/pus/event_srv.rs @@ -9,14 +9,14 @@ use std::sync::mpsc::Sender; use super::verification::VerificationReportingProvider; use super::{ - EcssTcInMemConverter, EcssTcReceiver, EcssTmSender, GenericConversionError, + EcssTcInMemConversionProvider, EcssTcReceiver, EcssTmSender, GenericConversionError, GenericRoutingError, HandlingStatus, PusServiceHelper, }; pub struct PusEventServiceHandler< TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, - TcInMemConverter: EcssTcInMemConverter, + TcInMemConverter: EcssTcInMemConversionProvider, VerificationReporter: VerificationReportingProvider, > { pub service_helper: @@ -27,7 +27,7 @@ pub struct PusEventServiceHandler< impl< TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, - TcInMemConverter: EcssTcInMemConverter, + TcInMemConverter: EcssTcInMemConversionProvider, VerificationReporter: VerificationReportingProvider, > PusEventServiceHandler { @@ -170,7 +170,7 @@ mod tests { event_man::EventRequestWithToken, tests::PusServiceHandlerWithSharedStoreCommon, verification::{TcStateAccepted, VerificationToken}, - DirectPusPacketHandlerResult, EcssTcInSharedStoreConverter, PusPacketHandlingError, + DirectPusPacketHandlerResult, EcssTcInSharedPoolConverter, PusPacketHandlingError, }, }; @@ -183,7 +183,7 @@ mod tests { handler: PusEventServiceHandler< MpscTcReceiver, PacketSenderWithSharedPool, - EcssTcInSharedStoreConverter, + EcssTcInSharedPoolConverter, VerificationReporter, >, } diff --git a/satrs/src/pus/mod.rs b/satrs/src/pus/mod.rs index 8a63454..b254138 100644 --- a/satrs/src/pus/mod.rs +++ b/satrs/src/pus/mod.rs @@ -947,7 +947,7 @@ pub mod std_mod { } } - pub trait EcssTcInMemConverter { + pub trait EcssTcInMemConversionProvider { fn cache(&mut self, possible_packet: &TcInMemory) -> Result<(), PusTcFromMemError>; fn tc_slice_raw(&self) -> &[u8]; @@ -980,7 +980,7 @@ pub mod std_mod { pub pus_tc_raw: Option>, } - impl EcssTcInMemConverter for EcssTcInVecConverter { + impl EcssTcInMemConversionProvider for EcssTcInVecConverter { fn cache(&mut self, tc_in_memory: &TcInMemory) -> Result<(), PusTcFromMemError> { self.pus_tc_raw = None; match tc_in_memory { @@ -1011,24 +1011,25 @@ pub mod std_mod { /// [SharedStaticMemoryPool] structure. This is useful if run-time allocation for these /// packets should be avoided. Please note that this structure is not able to convert TCs which /// are stored as a `Vec`. - pub struct EcssTcInSharedStoreConverter { + #[derive(Clone)] + pub struct EcssTcInSharedPoolConverter { sender_id: Option, - shared_tc_store: SharedStaticMemoryPool, + shared_tc_pool: SharedStaticMemoryPool, pus_buf: Vec, } - impl EcssTcInSharedStoreConverter { + impl EcssTcInSharedPoolConverter { pub fn new(shared_tc_store: SharedStaticMemoryPool, max_expected_tc_size: usize) -> Self { Self { sender_id: None, - shared_tc_store, + shared_tc_pool: shared_tc_store, pus_buf: alloc::vec![0; max_expected_tc_size], } } pub fn copy_tc_to_buf(&mut self, addr: PoolAddr) -> Result<(), PusTcFromMemError> { // Keep locked section as short as possible. - let mut tc_pool = self.shared_tc_store.write().map_err(|_| { + let mut tc_pool = self.shared_tc_pool.write().map_err(|_| { PusTcFromMemError::EcssTmtc(EcssTmtcError::Store(PoolError::LockError)) })?; let tc_size = tc_pool.len_of_data(&addr).map_err(EcssTmtcError::Store)?; @@ -1048,7 +1049,7 @@ pub mod std_mod { } } - impl EcssTcInMemConverter for EcssTcInSharedStoreConverter { + impl EcssTcInMemConversionProvider for EcssTcInSharedPoolConverter { fn cache(&mut self, tc_in_memory: &TcInMemory) -> Result<(), PusTcFromMemError> { match tc_in_memory { super::TcInMemory::Pool(packet_in_pool) => { @@ -1071,6 +1072,44 @@ pub mod std_mod { } } + // TODO: alloc feature flag? + #[derive(Clone)] + pub enum EcssTcInMemConverter { + Static(EcssTcInSharedPoolConverter), + Heap(EcssTcInVecConverter), + } + + impl EcssTcInMemConverter { + pub fn new_static(static_store_converter: EcssTcInSharedPoolConverter) -> Self { + EcssTcInMemConverter::Static(static_store_converter) + } + + pub fn new_heap(heap_converter: EcssTcInVecConverter) -> Self { + EcssTcInMemConverter::Heap(heap_converter) + } + } + + impl EcssTcInMemConversionProvider for EcssTcInMemConverter { + fn cache(&mut self, tc_in_memory: &TcInMemory) -> Result<(), PusTcFromMemError> { + match self { + EcssTcInMemConverter::Static(converter) => converter.cache(tc_in_memory), + EcssTcInMemConverter::Heap(converter) => converter.cache(tc_in_memory), + } + } + fn tc_slice_raw(&self) -> &[u8] { + match self { + EcssTcInMemConverter::Static(converter) => converter.tc_slice_raw(), + EcssTcInMemConverter::Heap(converter) => converter.tc_slice_raw(), + } + } + fn sender_id(&self) -> Option { + match self { + EcssTcInMemConverter::Static(converter) => converter.sender_id(), + EcssTcInMemConverter::Heap(converter) => converter.sender_id(), + } + } + } + pub struct PusServiceBase< TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, @@ -1094,7 +1133,7 @@ pub mod std_mod { pub struct PusServiceHelper< TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, - TcInMemConverter: EcssTcInMemConverter, + TcInMemConverter: EcssTcInMemConversionProvider, VerificationReporter: VerificationReportingProvider, > { pub common: PusServiceBase, @@ -1104,7 +1143,7 @@ pub mod std_mod { impl< TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, - TcInMemConverter: EcssTcInMemConverter, + TcInMemConverter: EcssTcInMemConversionProvider, VerificationReporter: VerificationReportingProvider, > PusServiceHelper { @@ -1347,7 +1386,7 @@ pub mod tests { pub type PusServiceHelperStatic = PusServiceHelper< MpscTcReceiver, PacketSenderWithSharedPool, - EcssTcInSharedStoreConverter, + EcssTcInSharedPoolConverter, VerificationReporter, >; @@ -1374,8 +1413,7 @@ pub mod tests { VerificationReporter::new(TEST_COMPONENT_ID_0.id(), &verif_cfg); let test_srv_tm_sender = PacketSenderWithSharedPool::new(tm_tx, shared_tm_pool_wrapper.clone()); - let in_store_converter = - EcssTcInSharedStoreConverter::new(shared_tc_pool.clone(), 2048); + let in_store_converter = EcssTcInSharedPoolConverter::new(shared_tc_pool.clone(), 2048); ( Self { pus_buf: RefCell::new([0; 2048]), diff --git a/satrs/src/pus/scheduler_srv.rs b/satrs/src/pus/scheduler_srv.rs index f84a0c6..922ab97 100644 --- a/satrs/src/pus/scheduler_srv.rs +++ b/satrs/src/pus/scheduler_srv.rs @@ -1,7 +1,7 @@ use super::scheduler::PusSchedulerProvider; use super::verification::{VerificationReporter, VerificationReportingProvider}; use super::{ - DirectPusPacketHandlerResult, EcssTcInMemConverter, EcssTcInSharedStoreConverter, + DirectPusPacketHandlerResult, EcssTcInMemConversionProvider, EcssTcInSharedPoolConverter, EcssTcInVecConverter, EcssTcReceiver, EcssTmSender, HandlingStatus, MpscTcReceiver, PartialPusHandlingError, PusServiceHelper, }; @@ -24,7 +24,7 @@ use std::sync::mpsc; pub struct PusSchedServiceHandler< TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, - TcInMemConverter: EcssTcInMemConverter, + TcInMemConverter: EcssTcInMemConversionProvider, VerificationReporter: VerificationReportingProvider, PusScheduler: PusSchedulerProvider, > { @@ -36,7 +36,7 @@ pub struct PusSchedServiceHandler< impl< TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, - TcInMemConverter: EcssTcInMemConverter, + TcInMemConverter: EcssTcInMemConversionProvider, VerificationReporter: VerificationReportingProvider, Scheduler: PusSchedulerProvider, > @@ -229,7 +229,7 @@ pub type PusService11SchedHandlerDynWithBoundedMpsc = PusSchedServ pub type PusService11SchedHandlerStaticWithMpsc = PusSchedServiceHandler< MpscTcReceiver, PacketSenderWithSharedPool, - EcssTcInSharedStoreConverter, + EcssTcInSharedPoolConverter, VerificationReporter, PusScheduler, >; @@ -238,7 +238,7 @@ pub type PusService11SchedHandlerStaticWithMpsc = PusSchedServiceH pub type PusService11SchedHandlerStaticWithBoundedMpsc = PusSchedServiceHandler< MpscTcReceiver, PacketSenderWithSharedPool, - EcssTcInSharedStoreConverter, + EcssTcInSharedPoolConverter, VerificationReporter, PusScheduler, >; @@ -253,7 +253,7 @@ mod tests { scheduler::{self, PusSchedulerProvider, TcInfo}, tests::PusServiceHandlerWithSharedStoreCommon, verification::{RequestId, TcStateAccepted, VerificationToken}, - EcssTcInSharedStoreConverter, + EcssTcInSharedPoolConverter, }; use crate::pus::{DirectPusPacketHandlerResult, MpscTcReceiver, PusPacketHandlingError}; use crate::tmtc::PacketSenderWithSharedPool; @@ -276,7 +276,7 @@ mod tests { handler: PusSchedServiceHandler< MpscTcReceiver, PacketSenderWithSharedPool, - EcssTcInSharedStoreConverter, + EcssTcInSharedPoolConverter, VerificationReporter, TestScheduler, >, diff --git a/satrs/src/pus/test.rs b/satrs/src/pus/test.rs index 5094be9..8b00453 100644 --- a/satrs/src/pus/test.rs +++ b/satrs/src/pus/test.rs @@ -9,8 +9,9 @@ use std::sync::mpsc; use super::verification::{VerificationReporter, VerificationReportingProvider}; use super::{ - EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcReceiver, - EcssTmSender, GenericConversionError, HandlingStatus, MpscTcReceiver, PusServiceHelper, + EcssTcInMemConversionProvider, EcssTcInSharedPoolConverter, EcssTcInVecConverter, + EcssTcReceiver, EcssTmSender, GenericConversionError, HandlingStatus, MpscTcReceiver, + PusServiceHelper, }; /// This is a helper class for [std] environments to handle generic PUS 17 (test service) packets. @@ -18,7 +19,7 @@ use super::{ pub struct PusService17TestHandler< TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, - TcInMemConverter: EcssTcInMemConverter, + TcInMemConverter: EcssTcInMemConversionProvider, VerificationReporter: VerificationReportingProvider, > { pub service_helper: @@ -28,7 +29,7 @@ pub struct PusService17TestHandler< impl< TcReceiver: EcssTcReceiver, TmSender: EcssTmSender, - TcInMemConverter: EcssTcInMemConverter, + TcInMemConverter: EcssTcInMemConversionProvider, VerificationReporter: VerificationReportingProvider, > PusService17TestHandler { @@ -127,7 +128,7 @@ pub type PusService17TestHandlerDynWithBoundedMpsc = PusService17TestHandler< pub type PusService17TestHandlerStaticWithBoundedMpsc = PusService17TestHandler< MpscTcReceiver, PacketSenderWithSharedPool, - EcssTcInSharedStoreConverter, + EcssTcInSharedPoolConverter, VerificationReporter, >; @@ -142,7 +143,7 @@ mod tests { }; use crate::pus::verification::{TcStateAccepted, VerificationToken}; use crate::pus::{ - DirectPusPacketHandlerResult, EcssTcInSharedStoreConverter, EcssTcInVecConverter, + DirectPusPacketHandlerResult, EcssTcInSharedPoolConverter, EcssTcInVecConverter, GenericConversionError, HandlingStatus, MpscTcReceiver, MpscTmAsVecSender, PartialPusHandlingError, PusPacketHandlingError, }; @@ -162,7 +163,7 @@ mod tests { handler: PusService17TestHandler< MpscTcReceiver, PacketSenderWithSharedPool, - EcssTcInSharedStoreConverter, + EcssTcInSharedPoolConverter, VerificationReporter, >, } diff --git a/satrs/src/tmtc/mod.rs b/satrs/src/tmtc/mod.rs index 498c1bd..50c97d6 100644 --- a/satrs/src/tmtc/mod.rs +++ b/satrs/src/tmtc/mod.rs @@ -14,6 +14,7 @@ use crate::{ }; #[cfg(feature = "std")] pub use alloc_mod::*; +use core::fmt::Debug; #[cfg(feature = "alloc")] use downcast_rs::{impl_downcast, Downcast}; use spacepackets::{ @@ -170,7 +171,7 @@ where } /// Helper trait for any generic (static) store which allows storing raw or CCSDS packets. -pub trait CcsdsPacketPool { +pub trait CcsdsPacketPool: Debug { fn add_ccsds_tc(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result { self.add_raw_tc(tc_raw) } @@ -190,7 +191,7 @@ pub trait PusTmPool { } /// Generic trait for any sender component able to send packets stored inside a pool structure. -pub trait PacketInPoolSender: Send { +pub trait PacketInPoolSender: Debug + Send { fn send_packet( &self, sender_id: ComponentId, @@ -235,7 +236,7 @@ pub mod std_mod { /// Newtype wrapper around the [SharedStaticMemoryPool] to enable extension helper traits on /// top of the regular shared memory pool API. - #[derive(Clone)] + #[derive(Debug, Clone)] pub struct SharedPacketPool(pub SharedStaticMemoryPool); impl SharedPacketPool { @@ -287,7 +288,6 @@ pub mod std_mod { } } - #[cfg(feature = "std")] impl PacketSenderRaw for mpsc::Sender { type Error = GenericSendError; @@ -297,7 +297,6 @@ pub mod std_mod { } } - #[cfg(feature = "std")] impl PacketSenderRaw for mpsc::SyncSender { type Error = GenericSendError; @@ -362,7 +361,7 @@ pub mod std_mod { /// This is the primary structure used to send packets stored in a dedicated memory pool /// structure. - #[derive(Clone)] + #[derive(Debug, Clone)] pub struct PacketSenderWithSharedPool< Sender: PacketInPoolSender = mpsc::SyncSender, PacketPool: CcsdsPacketPool = SharedPacketPool, @@ -478,6 +477,27 @@ pub mod std_mod { } } } + + #[derive(Debug, Clone)] + pub enum TcSender { + StaticStore(PacketSenderWithSharedPool), + HeapStore(mpsc::SyncSender), + } + + impl PacketSenderRaw for TcSender { + type Error = StoreAndSendError; + + fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> { + match self { + TcSender::StaticStore(packet_sender_with_shared_pool) => { + packet_sender_with_shared_pool.send_packet(sender_id, packet) + } + TcSender::HeapStore(sync_sender) => sync_sender + .send_packet(sender_id, packet) + .map_err(StoreAndSendError::Send), + } + } + } } #[cfg(test)]