diff --git a/satrs-example/Cargo.toml b/satrs-example/Cargo.toml index aeb0379..2619dca 100644 --- a/satrs-example/Cargo.toml +++ b/satrs-example/Cargo.toml @@ -35,8 +35,8 @@ version = "0.1.1" path = "../satrs-mib" [features] -dyn_tmtc = [] -default = ["dyn_tmtc"] +heap_tmtc = [] +default = ["heap_tmtc"] [dev-dependencies] env_logger = "0.11" diff --git a/satrs-example/README.md b/satrs-example/README.md index b661423..87a8e13 100644 --- a/satrs-example/README.md +++ b/satrs-example/README.md @@ -14,7 +14,7 @@ You can run the application using `cargo run`. # Features -The example has the `dyn_tmtc` feature which is enabled by default. With this feature enabled, +The example has the `heap_tmtc` feature which is enabled by default. With this feature enabled, TMTC packets are exchanged using the heap as the backing memory instead of pre-allocated static stores. diff --git a/satrs-example/src/config.rs b/satrs-example/src/config.rs index ff1e2c8..5b4a8dd 100644 --- a/satrs-example/src/config.rs +++ b/satrs-example/src/config.rs @@ -155,6 +155,7 @@ pub mod components { #[derive(Copy, Clone, PartialEq, Eq)] pub enum EpsId { Pcdu = 0, + Subsystem = 1, } #[derive(Copy, Clone, PartialEq, Eq)] @@ -181,6 +182,8 @@ pub mod components { UniqueApidTargetId::new(Apid::Acs as u16, AcsId::Assembly as u32); pub const MGM_HANDLER_0: UniqueApidTargetId = UniqueApidTargetId::new(Apid::Acs as u16, AcsId::Mgm0 as u32); + pub const EPS_SUBSYSTEM: UniqueApidTargetId = + UniqueApidTargetId::new(Apid::Eps as u16, EpsId::Subsystem as u32); pub const PCDU_HANDLER: UniqueApidTargetId = UniqueApidTargetId::new(Apid::Eps as u16, EpsId::Pcdu as u32); pub const UDP_SERVER: UniqueApidTargetId = diff --git a/satrs-example/src/eps/pcdu.rs b/satrs-example/src/eps/pcdu.rs index efcb55b..473cbdc 100644 --- a/satrs-example/src/eps/pcdu.rs +++ b/satrs-example/src/eps/pcdu.rs @@ -8,7 +8,11 @@ use derive_new::new; use num_enum::{IntoPrimitive, TryFromPrimitive}; use satrs::{ hk::{HkRequest, HkRequestVariant}, - mode::{ModeAndSubmode, ModeError, ModeProvider, ModeReply, ModeRequestHandler}, + mode::{ + ModeAndSubmode, ModeError, ModeProvider, ModeReply, ModeRequestHandler, + ModeRequestHandlerMpscBounded, + }, + mode_tree::{ModeChild, ModeNode}, power::SwitchRequest, pus::{EcssTmSender, PusTmVariant}, queue::GenericSendError, @@ -16,7 +20,7 @@ use satrs::{ spacepackets::ByteConversionError, }; use satrs_example::{ - config::components::{NO_SENDER, PUS_MODE_SERVICE}, + config::components::{NO_SENDER, PCDU_HANDLER, PUS_MODE_SERVICE}, DeviceMode, TimestampHelper, }; use satrs_minisim::{ @@ -28,7 +32,6 @@ use satrs_minisim::{ use serde::{Deserialize, Serialize}; use crate::{ - acs::mgm::MpscModeLeafInterface, hk::PusHkHelper, pus::hk::{HkReply, HkReplyVariant}, requests::CompositeRequest, @@ -203,7 +206,7 @@ pub type SharedSwitchSet = Arc>; pub struct PcduHandler { id: UniqueApidTargetId, dev_str: &'static str, - mode_interface: MpscModeLeafInterface, + mode_node: ModeRequestHandlerMpscBounded, composite_request_rx: mpsc::Receiver>, hk_reply_tx: mpsc::Sender>, switch_request_rx: mpsc::Receiver>, @@ -324,25 +327,30 @@ impl PcduHandler { - let result = self.handle_mode_request(msg); - // TODO: Trigger event? - if result.is_err() { - log::warn!( - "{}: mode request failed with error {:?}", - self.dev_str, - result.err().unwrap() - ); - } - } - Err(e) => { - if e != mpsc::TryRecvError::Empty { - log::warn!("{}: failed to receive mode request: {:?}", self.dev_str, e); + match self.mode_node.try_recv_mode_request() { + Ok(opt_msg) => { + if let Some(msg) = opt_msg { + let result = self.handle_mode_request(msg); + // TODO: Trigger event? + if result.is_err() { + log::warn!( + "{}: mode request failed with error {:?}", + self.dev_str, + result.err().unwrap() + ); + } } else { break; } } + Err(e) => match e { + satrs::queue::GenericReceiveError::Empty => { + break; + } + satrs::queue::GenericReceiveError::TxDisconnected(_) => { + log::warn!("{}: failed to receive mode request: {:?}", self.dev_str, e); + } + }, } } } @@ -467,9 +475,8 @@ impl ModeRequestHandler requestor.sender_id() ); } - self.mode_interface - .reply_to_pus_tx - .send(GenericMessage::new(requestor, reply)) + self.mode_node + .send_mode_reply(requestor, reply) .map_err(|_| GenericSendError::RxDisconnected)?; Ok(()) } @@ -483,6 +490,24 @@ impl ModeRequestHandler } } +impl ModeNode + for PcduHandler +{ + fn id(&self) -> satrs::ComponentId { + PCDU_HANDLER.into() + } +} + +impl ModeChild + for PcduHandler +{ + type Sender = mpsc::SyncSender>; + + fn add_mode_parent(&mut self, id: satrs::ComponentId, reply_sender: Self::Sender) { + self.mode_node.add_message_target(id, reply_sender); + } +} + #[cfg(test)] mod tests { use std::sync::mpsc; @@ -490,7 +515,7 @@ mod tests { use satrs::{ mode::ModeRequest, power::SwitchStateBinary, request::GenericMessage, tmtc::PacketAsVec, }; - use satrs_example::config::components::{Apid, MGM_HANDLER_0}; + use satrs_example::config::components::{Apid, EPS_SUBSYSTEM, MGM_HANDLER_0, PCDU_HANDLER}; use satrs_minisim::eps::SwitchMapBinary; use super::*; @@ -531,7 +556,7 @@ mod tests { } pub struct PcduTestbench { - pub mode_request_tx: mpsc::Sender>, + pub mode_request_tx: mpsc::SyncSender>, pub mode_reply_rx_to_pus: mpsc::Receiver>, pub mode_reply_rx_to_parent: mpsc::Receiver>, pub composite_request_tx: mpsc::Sender>, @@ -543,19 +568,29 @@ mod tests { impl PcduTestbench { pub fn new() -> Self { - let (mode_request_tx, mode_request_rx) = mpsc::channel(); - let (mode_reply_tx_to_pus, mode_reply_rx_to_pus) = mpsc::channel(); + let (mode_request_tx, mode_request_rx) = mpsc::sync_channel(5); + let (mode_reply_tx_to_pus, mode_reply_rx_to_pus) = mpsc::sync_channel(5); let (mode_reply_tx_to_parent, mode_reply_rx_to_parent) = mpsc::sync_channel(5); - let mode_interface = MpscModeLeafInterface { - request_rx: mode_request_rx, - reply_to_pus_tx: mode_reply_tx_to_pus, - reply_to_parent_tx: mode_reply_tx_to_parent, - }; + let mode_node = + ModeRequestHandlerMpscBounded::new(PCDU_HANDLER.into(), mode_request_rx); let (composite_request_tx, composite_request_rx) = mpsc::channel(); let (hk_reply_tx, hk_reply_rx) = mpsc::channel(); let (tm_tx, tm_rx) = mpsc::channel::(); let (switch_request_tx, switch_reqest_rx) = mpsc::channel(); let shared_switch_map = Arc::new(Mutex::new(SwitchSet::default())); + let mut handler = PcduHandler::new( + UniqueApidTargetId::new(Apid::Eps as u16, 0), + "TEST_PCDU", + mode_node, + composite_request_rx, + hk_reply_tx, + switch_reqest_rx, + tm_tx, + SerialInterfaceTest::default(), + shared_switch_map, + ); + handler.add_mode_parent(EPS_SUBSYSTEM.into(), mode_reply_tx_to_parent); + handler.add_mode_parent(PUS_MODE_SERVICE.into(), mode_reply_tx_to_pus); Self { mode_request_tx, mode_reply_rx_to_pus, @@ -564,17 +599,7 @@ mod tests { hk_reply_rx, tm_rx, switch_request_tx, - handler: PcduHandler::new( - UniqueApidTargetId::new(Apid::Eps as u16, 0), - "TEST_PCDU", - mode_interface, - composite_request_rx, - hk_reply_tx, - switch_reqest_rx, - tm_tx, - SerialInterfaceTest::default(), - shared_switch_map, - ), + handler, } } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 5d7fcbf..b7657dd 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -1,3 +1,9 @@ +use logger::setup_logger; +use satrs::spacepackets::time::{cds::CdsTime, TimeWriter}; + +mod main_heap_tmtc; +mod main_static_tmtc; + mod acs; mod eps; mod events; @@ -8,719 +14,13 @@ mod pus; mod requests; mod tmtc; -use crate::eps::pcdu::{ - PcduHandler, SerialInterfaceDummy, SerialInterfaceToSim, SerialSimInterfaceWrapper, -}; -use crate::eps::PowerSwitchHelper; -use crate::events::EventHandler; -use crate::interface::udp::DynamicUdpTmHandler; -use crate::pus::stack::PusStack; -use crate::tmtc::tc_source::{TcSourceTaskDynamic, TcSourceTaskStatic}; -use crate::tmtc::tm_sink::{TmSinkDynamic, TmSinkStatic}; -use log::info; -use pus::test::create_test_service_dynamic; -use satrs::hal::std::tcp_server::ServerConfig; -use satrs::hal::std::udp_server::UdpTcServer; -use satrs::mode_tree::connect_mode_nodes; -use satrs::pus::HandlingStatus; -use satrs::request::{GenericMessage, MessageMetadata}; -use satrs::tmtc::{PacketSenderWithSharedPool, SharedPacketPool}; -use satrs_example::config::pool::{create_sched_tc_pool, create_static_pools}; -use satrs_example::config::tasks::{ - FREQ_MS_AOCS, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC, SIM_CLIENT_IDLE_DELAY_MS, -}; -use satrs_example::config::{OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT}; -use satrs_example::DeviceMode; - -use crate::acs::mgm::{ - MgmHandlerLis3Mdl, MpscModeLeafInterface, SpiDummyInterface, SpiSimInterface, - SpiSimInterfaceWrapper, -}; -use crate::interface::sim_client_udp::create_sim_client; -use crate::interface::tcp::{SyncTcpTmSource, TcpTask}; -use crate::interface::udp::{StaticUdpTmHandler, UdpTmtcServer}; -use crate::logger::setup_logger; -use crate::pus::action::{create_action_service_dynamic, create_action_service_static}; -use crate::pus::event::{create_event_service_dynamic, create_event_service_static}; -use crate::pus::hk::{create_hk_service_dynamic, create_hk_service_static}; -use crate::pus::mode::{create_mode_service_dynamic, create_mode_service_static}; -use crate::pus::scheduler::{create_scheduler_service_dynamic, create_scheduler_service_static}; -use crate::pus::test::create_test_service_static; -use crate::pus::{PusTcDistributor, PusTcMpscRouter}; -use crate::requests::{CompositeRequest, GenericRequestRouter}; -use satrs::mode::{Mode, ModeAndSubmode, ModeRequest, ModeRequestHandlerMpscBounded}; -use satrs::pus::event_man::EventRequestWithToken; -use satrs::spacepackets::{time::cds::CdsTime, time::TimeWriter}; -use satrs_example::config::components::{ - MGM_HANDLER_0, NO_SENDER, PCDU_HANDLER, TCP_SERVER, UDP_SERVER, -}; -use std::net::{IpAddr, SocketAddr}; -use std::sync::{mpsc, Mutex}; -use std::sync::{Arc, RwLock}; -use std::thread; -use std::time::Duration; - -#[allow(dead_code)] -fn static_tmtc_pool_main() { - let (tm_pool, tc_pool) = create_static_pools(); - let shared_tm_pool = Arc::new(RwLock::new(tm_pool)); - let shared_tc_pool = Arc::new(RwLock::new(tc_pool)); - let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool); - let shared_tc_pool_wrapper = SharedPacketPool::new(&shared_tc_pool); - let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50); - let (tm_sink_tx, tm_sink_rx) = mpsc::sync_channel(50); - let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50); - - let tm_sink_tx_sender = - PacketSenderWithSharedPool::new(tm_sink_tx.clone(), shared_tm_pool_wrapper.clone()); - - let (sim_request_tx, sim_request_rx) = mpsc::channel(); - let (mgm_sim_reply_tx, mgm_sim_reply_rx) = mpsc::channel(); - let (pcdu_sim_reply_tx, pcdu_sim_reply_rx) = mpsc::channel(); - let mut opt_sim_client = create_sim_client(sim_request_rx); - - let (mgm_handler_composite_tx, mgm_handler_composite_rx) = - mpsc::sync_channel::>(10); - let (pcdu_handler_composite_tx, pcdu_handler_composite_rx) = - mpsc::sync_channel::>(30); - - let (mgm_handler_mode_tx, mgm_handler_mode_rx) = - mpsc::sync_channel::>(5); - let (pcdu_handler_mode_tx, pcdu_handler_mode_rx) = - mpsc::sync_channel::>(5); - - // Some request are targetable. This map is used to retrieve sender handles based on a target ID. - let mut request_map = GenericRequestRouter::default(); - request_map - .composite_router_map - .insert(MGM_HANDLER_0.id(), mgm_handler_composite_tx); - request_map - .composite_router_map - .insert(PCDU_HANDLER.id(), pcdu_handler_composite_tx); - request_map - .mode_router_map - .insert(PCDU_HANDLER.id(), pcdu_handler_mode_tx.clone()); - - // This helper structure is used by all telecommand providers which need to send telecommands - // to the TC source. - let tc_source = PacketSenderWithSharedPool::new(tc_source_tx, shared_tc_pool_wrapper.clone()); - - // Create event handling components - // These sender handles are used to send event requests, for example to enable or disable - // certain events. - let (event_tx, event_rx) = mpsc::sync_channel(100); - let (event_request_tx, event_request_rx) = mpsc::channel::(); - - // The event task is the core handler to perform the event routing and TM handling as specified - // in the sat-rs documentation. - let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx); - - let (pus_test_tx, pus_test_rx) = mpsc::channel(); - let (pus_event_tx, pus_event_rx) = mpsc::channel(); - let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); - let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); - let (pus_action_tx, pus_action_rx) = mpsc::channel(); - let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); - - let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); - let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); - let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel(); - - let pus_router = PusTcMpscRouter { - test_tc_sender: pus_test_tx, - event_tc_sender: pus_event_tx, - sched_tc_sender: pus_sched_tx, - hk_tc_sender: pus_hk_tx, - action_tc_sender: pus_action_tx, - mode_tc_sender: pus_mode_tx, - }; - let pus_test_service = create_test_service_static( - tm_sink_tx_sender.clone(), - shared_tc_pool.clone(), - event_tx.clone(), - pus_test_rx, - ); - let pus_scheduler_service = create_scheduler_service_static( - tm_sink_tx_sender.clone(), - tc_source.clone(), - pus_sched_rx, - create_sched_tc_pool(), - ); - let pus_event_service = create_event_service_static( - tm_sink_tx_sender.clone(), - shared_tc_pool.clone(), - pus_event_rx, - event_request_tx, - ); - let pus_action_service = create_action_service_static( - tm_sink_tx_sender.clone(), - shared_tc_pool.clone(), - pus_action_rx, - request_map.clone(), - pus_action_reply_rx, - ); - let pus_hk_service = create_hk_service_static( - tm_sink_tx_sender.clone(), - shared_tc_pool.clone(), - pus_hk_rx, - request_map.clone(), - pus_hk_reply_rx, - ); - let pus_mode_service = create_mode_service_static( - tm_sink_tx_sender.clone(), - shared_tc_pool.clone(), - pus_mode_rx, - request_map, - pus_mode_reply_rx, - ); - let mut pus_stack = PusStack::new( - pus_test_service, - pus_hk_service, - pus_event_service, - pus_action_service, - pus_scheduler_service, - pus_mode_service, - ); - - let mut tmtc_task = TcSourceTaskStatic::new( - shared_tc_pool_wrapper.clone(), - tc_source_rx, - PusTcDistributor::new(tm_sink_tx_sender, pus_router), - ); - - let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_source.clone()) - .expect("creating UDP TMTC server failed"); - let mut udp_tmtc_server = UdpTmtcServer { - udp_tc_server, - tm_handler: StaticUdpTmHandler { - tm_rx: tm_server_rx, - tm_store: shared_tm_pool.clone(), - }, - }; - - let tcp_server_cfg = ServerConfig::new( - TCP_SERVER.id(), - sock_addr, - Duration::from_millis(400), - 4096, - 8192, - ); - let sync_tm_tcp_source = SyncTcpTmSource::new(200); - let mut tcp_server = TcpTask::new( - tcp_server_cfg, - sync_tm_tcp_source.clone(), - tc_source.clone(), - PACKET_ID_VALIDATOR.clone(), - ) - .expect("tcp server creation failed"); - - let mut tm_sink = TmSinkStatic::new( - shared_tm_pool_wrapper, - sync_tm_tcp_source, - tm_sink_rx, - tm_server_tx, - ); - - let (mgm_handler_mode_reply_to_parent_tx, _mgm_handler_mode_reply_to_parent_rx) = - mpsc::sync_channel(5); - - let shared_switch_set = Arc::new(Mutex::default()); - let (switch_request_tx, switch_request_rx) = mpsc::sync_channel(20); - let switch_helper = PowerSwitchHelper::new(switch_request_tx, shared_switch_set.clone()); - - let shared_mgm_set = Arc::default(); - let mgm_mode_node = - ModeRequestHandlerMpscBounded::new(MGM_HANDLER_0.into(), mgm_handler_mode_rx); - let mgm_spi_interface = if let Some(sim_client) = opt_sim_client.as_mut() { - sim_client.add_reply_recipient(satrs_minisim::SimComponent::MgmLis3Mdl, mgm_sim_reply_tx); - SpiSimInterfaceWrapper::Sim(SpiSimInterface { - sim_request_tx: sim_request_tx.clone(), - sim_reply_rx: mgm_sim_reply_rx, - }) - } else { - SpiSimInterfaceWrapper::Dummy(SpiDummyInterface::default()) - }; - let mut mgm_handler = MgmHandlerLis3Mdl::new( - MGM_HANDLER_0, - "MGM_0", - mgm_mode_node, - mgm_handler_composite_rx, - pus_hk_reply_tx.clone(), - switch_helper.clone(), - tm_sink_tx.clone(), - mgm_spi_interface, - shared_mgm_set, - ); - // Connect PUS mode service to device handler. - connect_mode_nodes( - &mut pus_stack.mode_srv, - mgm_handler_mode_tx, - &mut mgm_handler, - mgm_handler_mode_reply_to_parent_tx, - ); - - let (pcdu_handler_mode_reply_to_parent_tx, _pcdu_handler_mode_reply_to_parent_rx) = - mpsc::sync_channel(10); - let pcdu_mode_leaf_interface = MpscModeLeafInterface { - request_rx: pcdu_handler_mode_rx, - reply_to_pus_tx: pus_mode_reply_tx, - reply_to_parent_tx: pcdu_handler_mode_reply_to_parent_tx, - }; - let pcdu_serial_interface = if let Some(sim_client) = opt_sim_client.as_mut() { - sim_client.add_reply_recipient(satrs_minisim::SimComponent::Pcdu, pcdu_sim_reply_tx); - SerialSimInterfaceWrapper::Sim(SerialInterfaceToSim::new( - sim_request_tx.clone(), - pcdu_sim_reply_rx, - )) - } else { - SerialSimInterfaceWrapper::Dummy(SerialInterfaceDummy::default()) - }; - - let mut pcdu_handler = PcduHandler::new( - PCDU_HANDLER, - "PCDU", - pcdu_mode_leaf_interface, - pcdu_handler_composite_rx, - pus_hk_reply_tx, - switch_request_rx, - tm_sink_tx, - pcdu_serial_interface, - shared_switch_set, - ); - // The PCDU is a critical component which should be in normal mode immediately. - pcdu_handler_mode_tx - .send(GenericMessage::new( - MessageMetadata::new(0, NO_SENDER), - ModeRequest::SetMode { - mode_and_submode: ModeAndSubmode::new(DeviceMode::Normal as Mode, 0), - forced: false, - }, - )) - .expect("sending initial mode request failed"); - - info!("Starting TMTC and UDP task"); - let jh_udp_tmtc = thread::Builder::new() - .name("SATRS tmtc-udp".to_string()) - .spawn(move || { - info!("Running UDP server on port {SERVER_PORT}"); - loop { - udp_tmtc_server.periodic_operation(); - tmtc_task.periodic_operation(); - thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); - } - }) - .unwrap(); - - info!("Starting TCP task"); - let jh_tcp = thread::Builder::new() - .name("sat-rs tcp".to_string()) - .spawn(move || { - info!("Running TCP server on port {SERVER_PORT}"); - loop { - tcp_server.periodic_operation(); - } - }) - .unwrap(); - - info!("Starting TM funnel task"); - let jh_tm_funnel = thread::Builder::new() - .name("tm sink".to_string()) - .spawn(move || loop { - tm_sink.operation(); - }) - .unwrap(); - - let mut opt_jh_sim_client = None; - if let Some(mut sim_client) = opt_sim_client { - info!("Starting UDP sim client task"); - opt_jh_sim_client = Some( - thread::Builder::new() - .name("sat-rs sim adapter".to_string()) - .spawn(move || loop { - if sim_client.operation() == HandlingStatus::Empty { - std::thread::sleep(Duration::from_millis(SIM_CLIENT_IDLE_DELAY_MS)); - } - }) - .unwrap(), - ); - } - - info!("Starting AOCS thread"); - let jh_aocs = thread::Builder::new() - .name("sat-rs aocs".to_string()) - .spawn(move || loop { - mgm_handler.periodic_operation(); - thread::sleep(Duration::from_millis(FREQ_MS_AOCS)); - }) - .unwrap(); - - info!("Starting EPS thread"); - let jh_eps = thread::Builder::new() - .name("sat-rs eps".to_string()) - .spawn(move || loop { - // TODO: We should introduce something like a fixed timeslot helper to allow a more - // declarative API. It would also be very useful for the AOCS task. - pcdu_handler.periodic_operation(eps::pcdu::OpCode::RegularOp); - thread::sleep(Duration::from_millis(50)); - pcdu_handler.periodic_operation(eps::pcdu::OpCode::PollAndRecvReplies); - thread::sleep(Duration::from_millis(50)); - pcdu_handler.periodic_operation(eps::pcdu::OpCode::PollAndRecvReplies); - thread::sleep(Duration::from_millis(300)); - }) - .unwrap(); - - info!("Starting PUS handler thread"); - let jh_pus_handler = thread::Builder::new() - .name("sat-rs pus".to_string()) - .spawn(move || loop { - event_handler.periodic_operation(); - pus_stack.periodic_operation(); - thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); - }) - .unwrap(); - - jh_udp_tmtc - .join() - .expect("Joining UDP TMTC server thread failed"); - jh_tcp - .join() - .expect("Joining TCP TMTC server thread failed"); - jh_tm_funnel - .join() - .expect("Joining TM Funnel thread failed"); - if let Some(jh_sim_client) = opt_jh_sim_client { - jh_sim_client - .join() - .expect("Joining SIM client thread failed"); - } - jh_aocs.join().expect("Joining AOCS thread failed"); - jh_eps.join().expect("Joining EPS thread failed"); - jh_pus_handler - .join() - .expect("Joining PUS handler thread failed"); -} - -#[allow(dead_code)] -fn dyn_tmtc_pool_main() { - let (tc_source_tx, tc_source_rx) = mpsc::channel(); - let (tm_sink_tx, tm_sink_rx) = mpsc::channel(); - let (tm_server_tx, tm_server_rx) = mpsc::channel(); - - let (sim_request_tx, sim_request_rx) = mpsc::channel(); - let (mgm_sim_reply_tx, mgm_sim_reply_rx) = mpsc::channel(); - let (pcdu_sim_reply_tx, pcdu_sim_reply_rx) = mpsc::channel(); - let mut opt_sim_client = create_sim_client(sim_request_rx); - - // Some request are targetable. This map is used to retrieve sender handles based on a target ID. - let (mgm_handler_composite_tx, mgm_handler_composite_rx) = - mpsc::sync_channel::>(5); - let (pcdu_handler_composite_tx, pcdu_handler_composite_rx) = - mpsc::sync_channel::>(10); - let (mgm_handler_mode_tx, mgm_handler_mode_rx) = - mpsc::sync_channel::>(5); - let (pcdu_handler_mode_tx, pcdu_handler_mode_rx) = - mpsc::sync_channel::>(10); - - // Some request are targetable. This map is used to retrieve sender handles based on a target ID. - let mut request_map = GenericRequestRouter::default(); - request_map - .composite_router_map - .insert(MGM_HANDLER_0.id(), mgm_handler_composite_tx); - //request_map - //.mode_router_map - //.insert(MGM_HANDLER_0.id(), mgm_handler_mode_tx); - request_map - .composite_router_map - .insert(PCDU_HANDLER.id(), pcdu_handler_composite_tx); - request_map - .mode_router_map - .insert(PCDU_HANDLER.id(), pcdu_handler_mode_tx.clone()); - - // Create event handling components - // These sender handles are used to send event requests, for example to enable or disable - // certain events. - let (event_tx, event_rx) = mpsc::sync_channel(100); - let (event_request_tx, event_request_rx) = mpsc::channel::(); - // The event task is the core handler to perform the event routing and TM handling as specified - // in the sat-rs documentation. - let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx); - - let (pus_test_tx, pus_test_rx) = mpsc::channel(); - let (pus_event_tx, pus_event_rx) = mpsc::channel(); - let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); - let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); - let (pus_action_tx, pus_action_rx) = mpsc::channel(); - let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); - - let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); - let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); - let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel(); - - let pus_router = PusTcMpscRouter { - test_tc_sender: pus_test_tx, - event_tc_sender: pus_event_tx, - sched_tc_sender: pus_sched_tx, - hk_tc_sender: pus_hk_tx, - action_tc_sender: pus_action_tx, - mode_tc_sender: pus_mode_tx, - }; - - let pus_test_service = - create_test_service_dynamic(tm_sink_tx.clone(), event_tx.clone(), pus_test_rx); - let pus_scheduler_service = create_scheduler_service_dynamic( - tm_sink_tx.clone(), - tc_source_tx.clone(), - pus_sched_rx, - create_sched_tc_pool(), - ); - - let pus_event_service = - create_event_service_dynamic(tm_sink_tx.clone(), pus_event_rx, event_request_tx); - let pus_action_service = create_action_service_dynamic( - tm_sink_tx.clone(), - pus_action_rx, - request_map.clone(), - pus_action_reply_rx, - ); - let pus_hk_service = create_hk_service_dynamic( - tm_sink_tx.clone(), - pus_hk_rx, - request_map.clone(), - pus_hk_reply_rx, - ); - let pus_mode_service = create_mode_service_dynamic( - tm_sink_tx.clone(), - pus_mode_rx, - request_map, - pus_mode_reply_rx, - ); - let mut pus_stack = PusStack::new( - pus_test_service, - pus_hk_service, - pus_event_service, - pus_action_service, - pus_scheduler_service, - pus_mode_service, - ); - - let mut tmtc_task = TcSourceTaskDynamic::new( - tc_source_rx, - PusTcDistributor::new(tm_sink_tx.clone(), pus_router), - ); - - let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_source_tx.clone()) - .expect("creating UDP TMTC server failed"); - let mut udp_tmtc_server = UdpTmtcServer { - udp_tc_server, - tm_handler: DynamicUdpTmHandler { - tm_rx: tm_server_rx, - }, - }; - - let tcp_server_cfg = ServerConfig::new( - TCP_SERVER.id(), - sock_addr, - Duration::from_millis(400), - 4096, - 8192, - ); - let sync_tm_tcp_source = SyncTcpTmSource::new(200); - let mut tcp_server = TcpTask::new( - tcp_server_cfg, - sync_tm_tcp_source.clone(), - tc_source_tx.clone(), - PACKET_ID_VALIDATOR.clone(), - ) - .expect("tcp server creation failed"); - - let mut tm_funnel = TmSinkDynamic::new(sync_tm_tcp_source, tm_sink_rx, tm_server_tx); - - let shared_switch_set = Arc::new(Mutex::default()); - let (switch_request_tx, switch_request_rx) = mpsc::sync_channel(20); - let switch_helper = PowerSwitchHelper::new(switch_request_tx, shared_switch_set.clone()); - - let (mgm_handler_mode_reply_to_parent_tx, _mgm_handler_mode_reply_to_parent_rx) = - mpsc::sync_channel(5); - let shared_mgm_set = Arc::default(); - let mgm_mode_node = - ModeRequestHandlerMpscBounded::new(MGM_HANDLER_0.into(), mgm_handler_mode_rx); - - let mgm_spi_interface = if let Some(sim_client) = opt_sim_client.as_mut() { - sim_client.add_reply_recipient(satrs_minisim::SimComponent::MgmLis3Mdl, mgm_sim_reply_tx); - SpiSimInterfaceWrapper::Sim(SpiSimInterface { - sim_request_tx: sim_request_tx.clone(), - sim_reply_rx: mgm_sim_reply_rx, - }) - } else { - SpiSimInterfaceWrapper::Dummy(SpiDummyInterface::default()) - }; - let mut mgm_handler = MgmHandlerLis3Mdl::new( - MGM_HANDLER_0, - "MGM_0", - mgm_mode_node, - mgm_handler_composite_rx, - pus_hk_reply_tx.clone(), - switch_helper.clone(), - tm_sink_tx.clone(), - mgm_spi_interface, - shared_mgm_set, - ); - connect_mode_nodes( - &mut pus_stack.mode_srv, - mgm_handler_mode_tx, - &mut mgm_handler, - mgm_handler_mode_reply_to_parent_tx, - ); - - let (pcdu_handler_mode_reply_to_parent_tx, _pcdu_handler_mode_reply_to_parent_rx) = - mpsc::sync_channel(10); - let pcdu_mode_leaf_interface = MpscModeLeafInterface { - request_rx: pcdu_handler_mode_rx, - reply_to_pus_tx: pus_mode_reply_tx, - reply_to_parent_tx: pcdu_handler_mode_reply_to_parent_tx, - }; - let pcdu_serial_interface = if let Some(sim_client) = opt_sim_client.as_mut() { - sim_client.add_reply_recipient(satrs_minisim::SimComponent::Pcdu, pcdu_sim_reply_tx); - SerialSimInterfaceWrapper::Sim(SerialInterfaceToSim::new( - sim_request_tx.clone(), - pcdu_sim_reply_rx, - )) - } else { - SerialSimInterfaceWrapper::Dummy(SerialInterfaceDummy::default()) - }; - let mut pcdu_handler = PcduHandler::new( - PCDU_HANDLER, - "PCDU", - pcdu_mode_leaf_interface, - pcdu_handler_composite_rx, - pus_hk_reply_tx, - switch_request_rx, - tm_sink_tx, - pcdu_serial_interface, - shared_switch_set, - ); - // The PCDU is a critical component which should be in normal mode immediately. - pcdu_handler_mode_tx - .send(GenericMessage::new( - MessageMetadata::new(0, NO_SENDER), - ModeRequest::SetMode { - mode_and_submode: ModeAndSubmode::new(DeviceMode::Normal as Mode, 0), - forced: false, - }, - )) - .expect("sending initial mode request failed"); - - info!("Starting TMTC and UDP task"); - let jh_udp_tmtc = thread::Builder::new() - .name("sat-rs tmtc-udp".to_string()) - .spawn(move || { - info!("Running UDP server on port {SERVER_PORT}"); - loop { - udp_tmtc_server.periodic_operation(); - tmtc_task.periodic_operation(); - thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); - } - }) - .unwrap(); - - info!("Starting TCP task"); - let jh_tcp = thread::Builder::new() - .name("sat-rs tcp".to_string()) - .spawn(move || { - info!("Running TCP server on port {SERVER_PORT}"); - loop { - tcp_server.periodic_operation(); - } - }) - .unwrap(); - - info!("Starting TM funnel task"); - let jh_tm_funnel = thread::Builder::new() - .name("sat-rs tm-sink".to_string()) - .spawn(move || loop { - tm_funnel.operation(); - }) - .unwrap(); - - let mut opt_jh_sim_client = None; - if let Some(mut sim_client) = opt_sim_client { - info!("Starting UDP sim client task"); - opt_jh_sim_client = Some( - thread::Builder::new() - .name("sat-rs sim adapter".to_string()) - .spawn(move || loop { - if sim_client.operation() == HandlingStatus::Empty { - std::thread::sleep(Duration::from_millis(SIM_CLIENT_IDLE_DELAY_MS)); - } - }) - .unwrap(), - ); - } - - info!("Starting AOCS thread"); - let jh_aocs = thread::Builder::new() - .name("sat-rs aocs".to_string()) - .spawn(move || loop { - mgm_handler.periodic_operation(); - thread::sleep(Duration::from_millis(FREQ_MS_AOCS)); - }) - .unwrap(); - - info!("Starting EPS thread"); - let jh_eps = thread::Builder::new() - .name("sat-rs eps".to_string()) - .spawn(move || loop { - // TODO: We should introduce something like a fixed timeslot helper to allow a more - // declarative API. It would also be very useful for the AOCS task. - pcdu_handler.periodic_operation(eps::pcdu::OpCode::RegularOp); - thread::sleep(Duration::from_millis(50)); - pcdu_handler.periodic_operation(eps::pcdu::OpCode::PollAndRecvReplies); - thread::sleep(Duration::from_millis(50)); - pcdu_handler.periodic_operation(eps::pcdu::OpCode::PollAndRecvReplies); - thread::sleep(Duration::from_millis(300)); - }) - .unwrap(); - - info!("Starting PUS handler thread"); - let jh_pus_handler = thread::Builder::new() - .name("sat-rs pus".to_string()) - .spawn(move || loop { - pus_stack.periodic_operation(); - event_handler.periodic_operation(); - thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); - }) - .unwrap(); - - jh_udp_tmtc - .join() - .expect("Joining UDP TMTC server thread failed"); - jh_tcp - .join() - .expect("Joining TCP TMTC server thread failed"); - jh_tm_funnel - .join() - .expect("Joining TM Funnel thread failed"); - if let Some(jh_sim_client) = opt_jh_sim_client { - jh_sim_client - .join() - .expect("Joining SIM client thread failed"); - } - jh_aocs.join().expect("Joining AOCS thread failed"); - jh_eps.join().expect("Joining EPS thread failed"); - jh_pus_handler - .join() - .expect("Joining PUS handler thread failed"); -} - fn main() { setup_logger().expect("setting up logging with fern failed"); println!("Running OBSW example"); - #[cfg(not(feature = "dyn_tmtc"))] - static_tmtc_pool_main(); - #[cfg(feature = "dyn_tmtc")] - dyn_tmtc_pool_main(); + #[cfg(not(feature = "heap_tmtc"))] + main_static_tmtc::main_with_static_tmtc_pool(); + #[cfg(feature = "heap_tmtc")] + main_heap_tmtc::main_with_heap_backed_tmtc_pool(); } pub fn update_time(time_provider: &mut CdsTime, timestamp: &mut [u8]) { diff --git a/satrs-example/src/main_heap_tmtc.rs b/satrs-example/src/main_heap_tmtc.rs new file mode 100644 index 0000000..e1226cf --- /dev/null +++ b/satrs-example/src/main_heap_tmtc.rs @@ -0,0 +1,354 @@ +use crate::eps::pcdu::{ + PcduHandler, SerialInterfaceDummy, SerialInterfaceToSim, SerialSimInterfaceWrapper, +}; +use crate::eps::PowerSwitchHelper; +use crate::events::EventHandler; +use crate::interface::udp::DynamicUdpTmHandler; +use crate::pus::stack::PusStack; +use crate::pus::test::create_test_service_dynamic; +use crate::tmtc::tc_source::TcSourceTaskDynamic; +use crate::tmtc::tm_sink::TmSinkDynamic; +use log::info; +use satrs::hal::std::tcp_server::ServerConfig; +use satrs::hal::std::udp_server::UdpTcServer; +use satrs::mode_tree::connect_mode_nodes; +use satrs::pus::HandlingStatus; +use satrs::request::{GenericMessage, MessageMetadata}; +use satrs_example::config::pool::create_sched_tc_pool; +use satrs_example::config::tasks::{ + FREQ_MS_AOCS, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC, SIM_CLIENT_IDLE_DELAY_MS, +}; +use satrs_example::config::{OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT}; +use satrs_example::DeviceMode; + +use crate::acs::mgm::{ + MgmHandlerLis3Mdl, MpscModeLeafInterface, SpiDummyInterface, SpiSimInterface, + SpiSimInterfaceWrapper, +}; +use crate::interface::sim_client_udp::create_sim_client; +use crate::interface::tcp::{SyncTcpTmSource, TcpTask}; +use crate::interface::udp::UdpTmtcServer; +use crate::pus::action::create_action_service_dynamic; +use crate::pus::event::create_event_service_dynamic; +use crate::pus::hk::create_hk_service_dynamic; +use crate::pus::mode::create_mode_service_dynamic; +use crate::pus::scheduler::create_scheduler_service_dynamic; +use crate::pus::{PusTcDistributor, PusTcMpscRouter}; +use crate::requests::{CompositeRequest, GenericRequestRouter}; +use satrs::mode::{Mode, ModeAndSubmode, ModeRequest, ModeRequestHandlerMpscBounded}; +use satrs::pus::event_man::EventRequestWithToken; +use satrs_example::config::components::{ + MGM_HANDLER_0, NO_SENDER, PCDU_HANDLER, TCP_SERVER, UDP_SERVER, +}; +use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; +use std::sync::{mpsc, Mutex}; +use std::thread; +use std::time::Duration; + +#[allow(dead_code)] +pub fn main_with_heap_backed_tmtc_pool() { + let (tc_source_tx, tc_source_rx) = mpsc::channel(); + let (tm_sink_tx, tm_sink_rx) = mpsc::channel(); + let (tm_server_tx, tm_server_rx) = mpsc::channel(); + + let (sim_request_tx, sim_request_rx) = mpsc::channel(); + let (mgm_sim_reply_tx, mgm_sim_reply_rx) = mpsc::channel(); + let (pcdu_sim_reply_tx, pcdu_sim_reply_rx) = mpsc::channel(); + let mut opt_sim_client = create_sim_client(sim_request_rx); + + // Some request are targetable. This map is used to retrieve sender handles based on a target ID. + let (mgm_handler_composite_tx, mgm_handler_composite_rx) = + mpsc::sync_channel::>(5); + let (pcdu_handler_composite_tx, pcdu_handler_composite_rx) = + mpsc::sync_channel::>(10); + let (mgm_handler_mode_tx, mgm_handler_mode_rx) = + mpsc::sync_channel::>(5); + let (pcdu_handler_mode_tx, pcdu_handler_mode_rx) = + mpsc::sync_channel::>(10); + + // Some request are targetable. This map is used to retrieve sender handles based on a target ID. + let mut request_map = GenericRequestRouter::default(); + request_map + .composite_router_map + .insert(MGM_HANDLER_0.id(), mgm_handler_composite_tx); + request_map + .composite_router_map + .insert(PCDU_HANDLER.id(), pcdu_handler_composite_tx); + + // Create event handling components + // These sender handles are used to send event requests, for example to enable or disable + // certain events. + let (event_tx, event_rx) = mpsc::sync_channel(100); + let (event_request_tx, event_request_rx) = mpsc::channel::(); + // The event task is the core handler to perform the event routing and TM handling as specified + // in the sat-rs documentation. + let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx); + + let (pus_test_tx, pus_test_rx) = mpsc::channel(); + let (pus_event_tx, pus_event_rx) = mpsc::channel(); + let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); + let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); + let (pus_action_tx, pus_action_rx) = mpsc::channel(); + let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); + + let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); + let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); + let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel(); + + let pus_router = PusTcMpscRouter { + test_tc_sender: pus_test_tx, + event_tc_sender: pus_event_tx, + sched_tc_sender: pus_sched_tx, + hk_tc_sender: pus_hk_tx, + action_tc_sender: pus_action_tx, + mode_tc_sender: pus_mode_tx, + }; + + let pus_test_service = + create_test_service_dynamic(tm_sink_tx.clone(), event_tx.clone(), pus_test_rx); + let pus_scheduler_service = create_scheduler_service_dynamic( + tm_sink_tx.clone(), + tc_source_tx.clone(), + pus_sched_rx, + create_sched_tc_pool(), + ); + + let pus_event_service = + create_event_service_dynamic(tm_sink_tx.clone(), pus_event_rx, event_request_tx); + let pus_action_service = create_action_service_dynamic( + tm_sink_tx.clone(), + pus_action_rx, + request_map.clone(), + pus_action_reply_rx, + ); + let pus_hk_service = create_hk_service_dynamic( + tm_sink_tx.clone(), + pus_hk_rx, + request_map.clone(), + pus_hk_reply_rx, + ); + let pus_mode_service = create_mode_service_dynamic( + tm_sink_tx.clone(), + pus_mode_rx, + request_map, + pus_mode_reply_rx, + ); + let mut pus_stack = PusStack::new( + pus_test_service, + pus_hk_service, + pus_event_service, + pus_action_service, + pus_scheduler_service, + pus_mode_service, + ); + + let mut tmtc_task = TcSourceTaskDynamic::new( + tc_source_rx, + PusTcDistributor::new(tm_sink_tx.clone(), pus_router), + ); + + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); + let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_source_tx.clone()) + .expect("creating UDP TMTC server failed"); + let mut udp_tmtc_server = UdpTmtcServer { + udp_tc_server, + tm_handler: DynamicUdpTmHandler { + tm_rx: tm_server_rx, + }, + }; + + let tcp_server_cfg = ServerConfig::new( + TCP_SERVER.id(), + sock_addr, + Duration::from_millis(400), + 4096, + 8192, + ); + let sync_tm_tcp_source = SyncTcpTmSource::new(200); + let mut tcp_server = TcpTask::new( + tcp_server_cfg, + sync_tm_tcp_source.clone(), + tc_source_tx.clone(), + PACKET_ID_VALIDATOR.clone(), + ) + .expect("tcp server creation failed"); + + let mut tm_funnel = TmSinkDynamic::new(sync_tm_tcp_source, tm_sink_rx, tm_server_tx); + + let shared_switch_set = Arc::new(Mutex::default()); + let (switch_request_tx, switch_request_rx) = mpsc::sync_channel(20); + let switch_helper = PowerSwitchHelper::new(switch_request_tx, shared_switch_set.clone()); + + let (mgm_handler_mode_reply_to_parent_tx, _mgm_handler_mode_reply_to_parent_rx) = + mpsc::sync_channel(5); + let shared_mgm_set = Arc::default(); + let mgm_mode_node = + ModeRequestHandlerMpscBounded::new(MGM_HANDLER_0.into(), mgm_handler_mode_rx); + + let mgm_spi_interface = if let Some(sim_client) = opt_sim_client.as_mut() { + sim_client.add_reply_recipient(satrs_minisim::SimComponent::MgmLis3Mdl, mgm_sim_reply_tx); + SpiSimInterfaceWrapper::Sim(SpiSimInterface { + sim_request_tx: sim_request_tx.clone(), + sim_reply_rx: mgm_sim_reply_rx, + }) + } else { + SpiSimInterfaceWrapper::Dummy(SpiDummyInterface::default()) + }; + let mut mgm_handler = MgmHandlerLis3Mdl::new( + MGM_HANDLER_0, + "MGM_0", + mgm_mode_node, + mgm_handler_composite_rx, + pus_hk_reply_tx.clone(), + switch_helper.clone(), + tm_sink_tx.clone(), + mgm_spi_interface, + shared_mgm_set, + ); + connect_mode_nodes( + &mut pus_stack.mode_srv, + mgm_handler_mode_tx, + &mut mgm_handler, + mgm_handler_mode_reply_to_parent_tx, + ); + + let (pcdu_handler_mode_reply_to_parent_tx, _pcdu_handler_mode_reply_to_parent_rx) = + mpsc::sync_channel(10); + let pcdu_mode_leaf_interface = MpscModeLeafInterface { + request_rx: pcdu_handler_mode_rx, + reply_to_pus_tx: pus_mode_reply_tx, + reply_to_parent_tx: pcdu_handler_mode_reply_to_parent_tx, + }; + let pcdu_serial_interface = if let Some(sim_client) = opt_sim_client.as_mut() { + sim_client.add_reply_recipient(satrs_minisim::SimComponent::Pcdu, pcdu_sim_reply_tx); + SerialSimInterfaceWrapper::Sim(SerialInterfaceToSim::new( + sim_request_tx.clone(), + pcdu_sim_reply_rx, + )) + } else { + SerialSimInterfaceWrapper::Dummy(SerialInterfaceDummy::default()) + }; + let mut pcdu_handler = PcduHandler::new( + PCDU_HANDLER, + "PCDU", + pcdu_mode_leaf_interface, + pcdu_handler_composite_rx, + pus_hk_reply_tx, + switch_request_rx, + tm_sink_tx, + pcdu_serial_interface, + shared_switch_set, + ); + // The PCDU is a critical component which should be in normal mode immediately. + pcdu_handler_mode_tx + .send(GenericMessage::new( + MessageMetadata::new(0, NO_SENDER), + ModeRequest::SetMode { + mode_and_submode: ModeAndSubmode::new(DeviceMode::Normal as Mode, 0), + forced: false, + }, + )) + .expect("sending initial mode request failed"); + + info!("Starting TMTC and UDP task"); + let jh_udp_tmtc = thread::Builder::new() + .name("sat-rs tmtc-udp".to_string()) + .spawn(move || { + info!("Running UDP server on port {SERVER_PORT}"); + loop { + udp_tmtc_server.periodic_operation(); + tmtc_task.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); + } + }) + .unwrap(); + + info!("Starting TCP task"); + let jh_tcp = thread::Builder::new() + .name("sat-rs tcp".to_string()) + .spawn(move || { + info!("Running TCP server on port {SERVER_PORT}"); + loop { + tcp_server.periodic_operation(); + } + }) + .unwrap(); + + info!("Starting TM funnel task"); + let jh_tm_funnel = thread::Builder::new() + .name("sat-rs tm-sink".to_string()) + .spawn(move || loop { + tm_funnel.operation(); + }) + .unwrap(); + + let mut opt_jh_sim_client = None; + if let Some(mut sim_client) = opt_sim_client { + info!("Starting UDP sim client task"); + opt_jh_sim_client = Some( + thread::Builder::new() + .name("sat-rs sim adapter".to_string()) + .spawn(move || loop { + if sim_client.operation() == HandlingStatus::Empty { + std::thread::sleep(Duration::from_millis(SIM_CLIENT_IDLE_DELAY_MS)); + } + }) + .unwrap(), + ); + } + + info!("Starting AOCS thread"); + let jh_aocs = thread::Builder::new() + .name("sat-rs aocs".to_string()) + .spawn(move || loop { + mgm_handler.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_AOCS)); + }) + .unwrap(); + + info!("Starting EPS thread"); + let jh_eps = thread::Builder::new() + .name("sat-rs eps".to_string()) + .spawn(move || loop { + // TODO: We should introduce something like a fixed timeslot helper to allow a more + // declarative API. It would also be very useful for the AOCS task. + pcdu_handler.periodic_operation(eps::pcdu::OpCode::RegularOp); + thread::sleep(Duration::from_millis(50)); + pcdu_handler.periodic_operation(eps::pcdu::OpCode::PollAndRecvReplies); + thread::sleep(Duration::from_millis(50)); + pcdu_handler.periodic_operation(eps::pcdu::OpCode::PollAndRecvReplies); + thread::sleep(Duration::from_millis(300)); + }) + .unwrap(); + + info!("Starting PUS handler thread"); + let jh_pus_handler = thread::Builder::new() + .name("sat-rs pus".to_string()) + .spawn(move || loop { + pus_stack.periodic_operation(); + event_handler.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); + }) + .unwrap(); + + jh_udp_tmtc + .join() + .expect("Joining UDP TMTC server thread failed"); + jh_tcp + .join() + .expect("Joining TCP TMTC server thread failed"); + jh_tm_funnel + .join() + .expect("Joining TM Funnel thread failed"); + if let Some(jh_sim_client) = opt_jh_sim_client { + jh_sim_client + .join() + .expect("Joining SIM client thread failed"); + } + jh_aocs.join().expect("Joining AOCS thread failed"); + jh_eps.join().expect("Joining EPS thread failed"); + jh_pus_handler + .join() + .expect("Joining PUS handler thread failed"); +} diff --git a/satrs-example/src/main_static_tmtc.rs b/satrs-example/src/main_static_tmtc.rs new file mode 100644 index 0000000..2e7152a --- /dev/null +++ b/satrs-example/src/main_static_tmtc.rs @@ -0,0 +1,387 @@ +use crate::eps::pcdu::{ + PcduHandler, SerialInterfaceDummy, SerialInterfaceToSim, SerialSimInterfaceWrapper, +}; +use crate::eps::PowerSwitchHelper; +use crate::events::EventHandler; +use crate::pus::stack::PusStack; +use crate::tmtc::tc_source::TcSourceTaskStatic; +use crate::tmtc::tm_sink::TmSinkStatic; +use log::info; +use satrs::hal::std::tcp_server::ServerConfig; +use satrs::hal::std::udp_server::UdpTcServer; +use satrs::mode_tree::connect_mode_nodes; +use satrs::pus::HandlingStatus; +use satrs::request::{GenericMessage, MessageMetadata}; +use satrs::tmtc::{PacketSenderWithSharedPool, SharedPacketPool}; +use satrs_example::config::pool::{create_sched_tc_pool, create_static_pools}; +use satrs_example::config::tasks::{ + FREQ_MS_AOCS, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC, SIM_CLIENT_IDLE_DELAY_MS, +}; +use satrs_example::config::{OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT}; +use satrs_example::DeviceMode; + +use crate::acs::mgm::{ + MgmHandlerLis3Mdl, SpiDummyInterface, SpiSimInterface, SpiSimInterfaceWrapper, +}; +use crate::interface::sim_client_udp::create_sim_client; +use crate::interface::tcp::{SyncTcpTmSource, TcpTask}; +use crate::interface::udp::{StaticUdpTmHandler, UdpTmtcServer}; +use crate::pus::action::create_action_service_static; +use crate::pus::event::create_event_service_static; +use crate::pus::hk::create_hk_service_static; +use crate::pus::mode::create_mode_service_static; +use crate::pus::scheduler::create_scheduler_service_static; +use crate::pus::test::create_test_service_static; +use crate::pus::{PusTcDistributor, PusTcMpscRouter}; +use crate::requests::{CompositeRequest, GenericRequestRouter}; +use satrs::mode::{Mode, ModeAndSubmode, ModeRequest, ModeRequestHandlerMpscBounded}; +use satrs::pus::event_man::EventRequestWithToken; +use satrs_example::config::components::{ + MGM_HANDLER_0, NO_SENDER, PCDU_HANDLER, TCP_SERVER, UDP_SERVER, +}; +use std::net::{IpAddr, SocketAddr}; +use std::sync::{mpsc, Mutex}; +use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::Duration; + +#[allow(dead_code)] +pub fn main_with_static_tmtc_pool() { + let (tm_pool, tc_pool) = create_static_pools(); + let shared_tm_pool = Arc::new(RwLock::new(tm_pool)); + let shared_tc_pool = Arc::new(RwLock::new(tc_pool)); + let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool); + let shared_tc_pool_wrapper = SharedPacketPool::new(&shared_tc_pool); + let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50); + let (tm_sink_tx, tm_sink_rx) = mpsc::sync_channel(50); + let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50); + + let tm_sink_tx_sender = + PacketSenderWithSharedPool::new(tm_sink_tx.clone(), shared_tm_pool_wrapper.clone()); + + let (sim_request_tx, sim_request_rx) = mpsc::channel(); + let (mgm_sim_reply_tx, mgm_sim_reply_rx) = mpsc::channel(); + let (pcdu_sim_reply_tx, pcdu_sim_reply_rx) = mpsc::channel(); + let mut opt_sim_client = create_sim_client(sim_request_rx); + + let (mgm_handler_composite_tx, mgm_handler_composite_rx) = + mpsc::sync_channel::>(10); + let (pcdu_handler_composite_tx, pcdu_handler_composite_rx) = + mpsc::sync_channel::>(30); + + let (mgm_handler_mode_tx, mgm_handler_mode_rx) = + mpsc::sync_channel::>(5); + let (pcdu_handler_mode_tx, pcdu_handler_mode_rx) = + mpsc::sync_channel::>(5); + + // Some request are targetable. This map is used to retrieve sender handles based on a target ID. + let mut request_map = GenericRequestRouter::default(); + request_map + .composite_router_map + .insert(MGM_HANDLER_0.id(), mgm_handler_composite_tx); + request_map + .composite_router_map + .insert(PCDU_HANDLER.id(), pcdu_handler_composite_tx); + request_map + .mode_router_map + .insert(PCDU_HANDLER.id(), pcdu_handler_mode_tx.clone()); + + // This helper structure is used by all telecommand providers which need to send telecommands + // to the TC source. + let tc_source = PacketSenderWithSharedPool::new(tc_source_tx, shared_tc_pool_wrapper.clone()); + + // Create event handling components + // These sender handles are used to send event requests, for example to enable or disable + // certain events. + let (event_tx, event_rx) = mpsc::sync_channel(100); + let (event_request_tx, event_request_rx) = mpsc::channel::(); + + // The event task is the core handler to perform the event routing and TM handling as specified + // in the sat-rs documentation. + let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx); + + let (pus_test_tx, pus_test_rx) = mpsc::channel(); + let (pus_event_tx, pus_event_rx) = mpsc::channel(); + let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); + let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); + let (pus_action_tx, pus_action_rx) = mpsc::channel(); + let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); + + let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); + let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); + let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::sync_channel(5); + + let pus_router = PusTcMpscRouter { + test_tc_sender: pus_test_tx, + event_tc_sender: pus_event_tx, + sched_tc_sender: pus_sched_tx, + hk_tc_sender: pus_hk_tx, + action_tc_sender: pus_action_tx, + mode_tc_sender: pus_mode_tx, + }; + let pus_test_service = create_test_service_static( + tm_sink_tx_sender.clone(), + shared_tc_pool.clone(), + event_tx.clone(), + pus_test_rx, + ); + let pus_scheduler_service = create_scheduler_service_static( + tm_sink_tx_sender.clone(), + tc_source.clone(), + pus_sched_rx, + create_sched_tc_pool(), + ); + let pus_event_service = create_event_service_static( + tm_sink_tx_sender.clone(), + shared_tc_pool.clone(), + pus_event_rx, + event_request_tx, + ); + let pus_action_service = create_action_service_static( + tm_sink_tx_sender.clone(), + shared_tc_pool.clone(), + pus_action_rx, + request_map.clone(), + pus_action_reply_rx, + ); + let pus_hk_service = create_hk_service_static( + tm_sink_tx_sender.clone(), + shared_tc_pool.clone(), + pus_hk_rx, + request_map.clone(), + pus_hk_reply_rx, + ); + let pus_mode_service = create_mode_service_static( + tm_sink_tx_sender.clone(), + shared_tc_pool.clone(), + pus_mode_rx, + request_map, + pus_mode_reply_rx, + ); + let mut pus_stack = PusStack::new( + pus_test_service, + pus_hk_service, + pus_event_service, + pus_action_service, + pus_scheduler_service, + pus_mode_service, + ); + + let mut tmtc_task = TcSourceTaskStatic::new( + shared_tc_pool_wrapper.clone(), + tc_source_rx, + PusTcDistributor::new(tm_sink_tx_sender, pus_router), + ); + + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); + let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_source.clone()) + .expect("creating UDP TMTC server failed"); + let mut udp_tmtc_server = UdpTmtcServer { + udp_tc_server, + tm_handler: StaticUdpTmHandler { + tm_rx: tm_server_rx, + tm_store: shared_tm_pool.clone(), + }, + }; + + let tcp_server_cfg = ServerConfig::new( + TCP_SERVER.id(), + sock_addr, + Duration::from_millis(400), + 4096, + 8192, + ); + let sync_tm_tcp_source = SyncTcpTmSource::new(200); + let mut tcp_server = TcpTask::new( + tcp_server_cfg, + sync_tm_tcp_source.clone(), + tc_source.clone(), + PACKET_ID_VALIDATOR.clone(), + ) + .expect("tcp server creation failed"); + + let mut tm_sink = TmSinkStatic::new( + shared_tm_pool_wrapper, + sync_tm_tcp_source, + tm_sink_rx, + tm_server_tx, + ); + + let shared_switch_set = Arc::new(Mutex::default()); + let (switch_request_tx, switch_request_rx) = mpsc::sync_channel(20); + let switch_helper = PowerSwitchHelper::new(switch_request_tx, shared_switch_set.clone()); + + let shared_mgm_set = Arc::default(); + let mgm_mode_node = + ModeRequestHandlerMpscBounded::new(MGM_HANDLER_0.into(), mgm_handler_mode_rx); + let mgm_spi_interface = if let Some(sim_client) = opt_sim_client.as_mut() { + sim_client.add_reply_recipient(satrs_minisim::SimComponent::MgmLis3Mdl, mgm_sim_reply_tx); + SpiSimInterfaceWrapper::Sim(SpiSimInterface { + sim_request_tx: sim_request_tx.clone(), + sim_reply_rx: mgm_sim_reply_rx, + }) + } else { + SpiSimInterfaceWrapper::Dummy(SpiDummyInterface::default()) + }; + let mut mgm_handler = MgmHandlerLis3Mdl::new( + MGM_HANDLER_0, + "MGM_0", + mgm_mode_node, + mgm_handler_composite_rx, + pus_hk_reply_tx.clone(), + switch_helper.clone(), + tm_sink_tx.clone(), + mgm_spi_interface, + shared_mgm_set, + ); + // Connect PUS service to device handler. + connect_mode_nodes( + &mut pus_stack.mode_srv, + mgm_handler_mode_tx, + &mut mgm_handler, + pus_mode_reply_tx.clone(), + ); + + let pcdu_serial_interface = if let Some(sim_client) = opt_sim_client.as_mut() { + sim_client.add_reply_recipient(satrs_minisim::SimComponent::Pcdu, pcdu_sim_reply_tx); + SerialSimInterfaceWrapper::Sim(SerialInterfaceToSim::new( + sim_request_tx.clone(), + pcdu_sim_reply_rx, + )) + } else { + SerialSimInterfaceWrapper::Dummy(SerialInterfaceDummy::default()) + }; + let pcdu_mode_node = + ModeRequestHandlerMpscBounded::new(PCDU_HANDLER.into(), pcdu_handler_mode_rx); + let mut pcdu_handler = PcduHandler::new( + PCDU_HANDLER, + "PCDU", + pcdu_mode_node, + pcdu_handler_composite_rx, + pus_hk_reply_tx, + switch_request_rx, + tm_sink_tx, + pcdu_serial_interface, + shared_switch_set, + ); + connect_mode_nodes( + &mut pus_stack.mode_srv, + pcdu_handler_mode_tx.clone(), + &mut pcdu_handler, + pus_mode_reply_tx, + ); + + // The PCDU is a critical component which should be in normal mode immediately. + pcdu_handler_mode_tx + .send(GenericMessage::new( + MessageMetadata::new(0, NO_SENDER), + ModeRequest::SetMode { + mode_and_submode: ModeAndSubmode::new(DeviceMode::Normal as Mode, 0), + forced: false, + }, + )) + .expect("sending initial mode request failed"); + + info!("Starting TMTC and UDP task"); + let jh_udp_tmtc = thread::Builder::new() + .name("SATRS tmtc-udp".to_string()) + .spawn(move || { + info!("Running UDP server on port {SERVER_PORT}"); + loop { + udp_tmtc_server.periodic_operation(); + tmtc_task.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); + } + }) + .unwrap(); + + info!("Starting TCP task"); + let jh_tcp = thread::Builder::new() + .name("sat-rs tcp".to_string()) + .spawn(move || { + info!("Running TCP server on port {SERVER_PORT}"); + loop { + tcp_server.periodic_operation(); + } + }) + .unwrap(); + + info!("Starting TM funnel task"); + let jh_tm_funnel = thread::Builder::new() + .name("tm sink".to_string()) + .spawn(move || loop { + tm_sink.operation(); + }) + .unwrap(); + + let mut opt_jh_sim_client = None; + if let Some(mut sim_client) = opt_sim_client { + info!("Starting UDP sim client task"); + opt_jh_sim_client = Some( + thread::Builder::new() + .name("sat-rs sim adapter".to_string()) + .spawn(move || loop { + if sim_client.operation() == HandlingStatus::Empty { + std::thread::sleep(Duration::from_millis(SIM_CLIENT_IDLE_DELAY_MS)); + } + }) + .unwrap(), + ); + } + + info!("Starting AOCS thread"); + let jh_aocs = thread::Builder::new() + .name("sat-rs aocs".to_string()) + .spawn(move || loop { + mgm_handler.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_AOCS)); + }) + .unwrap(); + + info!("Starting EPS thread"); + let jh_eps = thread::Builder::new() + .name("sat-rs eps".to_string()) + .spawn(move || loop { + // TODO: We should introduce something like a fixed timeslot helper to allow a more + // declarative API. It would also be very useful for the AOCS task. + // + // TODO: The fixed timeslot handler exists.. use it. + pcdu_handler.periodic_operation(crate::eps::pcdu::OpCode::RegularOp); + thread::sleep(Duration::from_millis(50)); + pcdu_handler.periodic_operation(crate::eps::pcdu::OpCode::PollAndRecvReplies); + thread::sleep(Duration::from_millis(50)); + pcdu_handler.periodic_operation(crate::eps::pcdu::OpCode::PollAndRecvReplies); + thread::sleep(Duration::from_millis(300)); + }) + .unwrap(); + + info!("Starting PUS handler thread"); + let jh_pus_handler = thread::Builder::new() + .name("sat-rs pus".to_string()) + .spawn(move || loop { + event_handler.periodic_operation(); + pus_stack.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); + }) + .unwrap(); + + jh_udp_tmtc + .join() + .expect("Joining UDP TMTC server thread failed"); + jh_tcp + .join() + .expect("Joining TCP TMTC server thread failed"); + jh_tm_funnel + .join() + .expect("Joining TM Funnel thread failed"); + if let Some(jh_sim_client) = opt_jh_sim_client { + jh_sim_client + .join() + .expect("Joining SIM client thread failed"); + } + jh_aocs.join().expect("Joining AOCS thread failed"); + jh_eps.join().expect("Joining EPS thread failed"); + jh_pus_handler + .join() + .expect("Joining PUS handler thread failed"); +}