diff --git a/satrs-example/src/acs/mgm.rs b/satrs-example/src/acs/mgm.rs index 4e4fd07..aa85dad 100644 --- a/satrs-example/src/acs/mgm.rs +++ b/satrs-example/src/acs/mgm.rs @@ -134,7 +134,7 @@ pub struct MgmData { pub struct MpscModeLeafInterface { pub request_rx: mpsc::Receiver>, pub reply_to_pus_tx: mpsc::Sender>, - pub reply_to_parent_tx: mpsc::Sender>, + pub reply_to_parent_tx: mpsc::SyncSender>, } #[derive(Default)] diff --git a/satrs-example/src/eps/mod.rs b/satrs-example/src/eps/mod.rs index 9ff3f7d..351cf76 100644 --- a/satrs-example/src/eps/mod.rs +++ b/satrs-example/src/eps/mod.rs @@ -1,16 +1,9 @@ use derive_new::new; -use std::{ - borrow::BorrowMut, - cell::RefCell, - collections::VecDeque, - sync::mpsc, - time::{Duration, Instant}, -}; +use std::{cell::RefCell, collections::VecDeque, sync::mpsc, time::Duration}; use satrs::{ power::{ - PowerSwitchInfo, PowerSwitcherCommandSender, SwitchId, SwitchRequest, SwitchState, - SwitchStateBinary, + PowerSwitchInfo, PowerSwitcherCommandSender, SwitchRequest, SwitchState, SwitchStateBinary, }, queue::GenericSendError, request::{GenericMessage, MessageMetadata}, @@ -26,17 +19,14 @@ pub mod pcdu; pub struct PowerSwitchHelper { switcher_tx: mpsc::SyncSender>, shared_switch_set: SharedSwitchSet, - #[new(default)] - switch_cmd_sent_instant: Option, } #[derive(Debug, Error, Copy, Clone, PartialEq, Eq)] pub enum SwitchCommandingError { - #[error("invalid switch id")] - InvalidSwitchId(SwitchId), #[error("send error: {0}")] Send(#[from] GenericSendError), } + #[derive(Debug, Error, Copy, Clone, PartialEq, Eq)] pub enum SwitchInfoError { /// This is a configuration error which should not occur. @@ -196,6 +186,7 @@ impl PowerSwitcherCommandSender for TestSwitchHelper { } } +#[allow(dead_code)] impl TestSwitchHelper { // Helper function which can be used to force a switch to another state for test purposes. pub fn set_switch_state(&mut self, switch: PcduSwitch, state: SwitchState) { diff --git a/satrs-example/src/eps/pcdu.rs b/satrs-example/src/eps/pcdu.rs index 639b2cd..6684648 100644 --- a/satrs-example/src/eps/pcdu.rs +++ b/satrs-example/src/eps/pcdu.rs @@ -22,7 +22,8 @@ use satrs_minisim::{ use crate::{acs::mgm::MpscModeLeafInterface, pus::hk::HkReply, requests::CompositeRequest}; pub trait SerialInterface { - type Error; + type Error: core::fmt::Debug; + /// Send some data via the serial interface. fn send(&self, data: &[u8]) -> Result<(), Self::Error>; /// Receive all replies received on the serial interface so far. This function takes a closure @@ -289,13 +290,18 @@ impl PcduHandler todo!("failed to convert switch ID {:?} to typed PCDU switch", e), }, Err(e) => match e { - mpsc::TryRecvError::Empty => todo!(), - mpsc::TryRecvError::Disconnected => todo!(), + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + log::warn!("switch request receiver has disconnected"); + break; + } }, }; } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 00e704c..1782bb6 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -76,13 +76,14 @@ fn static_tmtc_pool_main() { let mut opt_sim_client = create_sim_client(sim_request_rx); let (mgm_handler_composite_tx, mgm_handler_composite_rx) = - mpsc::channel::>(); + mpsc::sync_channel::>(10); let (pcdu_handler_composite_tx, pcdu_handler_composite_rx) = - mpsc::channel::>(); + mpsc::sync_channel::>(30); - let (mgm_handler_mode_tx, mgm_handler_mode_rx) = mpsc::channel::>(); + let (mgm_handler_mode_tx, mgm_handler_mode_rx) = + mpsc::sync_channel::>(5); let (pcdu_handler_mode_tx, pcdu_handler_mode_rx) = - mpsc::channel::>(); + mpsc::sync_channel::>(5); // Some request are targetable. This map is used to retrieve sender handles based on a target ID. let mut request_map = GenericRequestRouter::default(); @@ -221,7 +222,7 @@ fn static_tmtc_pool_main() { ); let (mgm_handler_mode_reply_to_parent_tx, _mgm_handler_mode_reply_to_parent_rx) = - mpsc::channel(); + mpsc::sync_channel(5); let shared_switch_set = Arc::new(Mutex::default()); let (switch_request_tx, switch_request_rx) = mpsc::sync_channel(20); @@ -256,7 +257,7 @@ fn static_tmtc_pool_main() { ); let (pcdu_handler_mode_reply_to_parent_tx, _pcdu_handler_mode_reply_to_parent_rx) = - mpsc::channel(); + mpsc::sync_channel(10); let pcdu_mode_leaf_interface = MpscModeLeafInterface { request_rx: pcdu_handler_mode_rx, reply_to_pus_tx: pus_mode_reply_tx, @@ -388,27 +389,38 @@ fn static_tmtc_pool_main() { #[allow(dead_code)] fn dyn_tmtc_pool_main() { let (tc_source_tx, tc_source_rx) = mpsc::channel(); - let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); + let (tm_sink_tx, tm_sink_rx) = mpsc::channel(); let (tm_server_tx, tm_server_rx) = mpsc::channel(); let (sim_request_tx, sim_request_rx) = mpsc::channel(); let (mgm_sim_reply_tx, mgm_sim_reply_rx) = mpsc::channel(); - // let (pcdu_sim_reply_tx, pcdu_sim_reply_rx) = mpsc::channel(); + let (pcdu_sim_reply_tx, pcdu_sim_reply_rx) = mpsc::channel(); let mut opt_sim_client = create_sim_client(sim_request_rx); // Some request are targetable. This map is used to retrieve sender handles based on a target ID. let (mgm_handler_composite_tx, mgm_handler_composite_rx) = - mpsc::channel::>(); - let (mgm_handler_mode_tx, mgm_handler_mode_rx) = mpsc::channel::>(); + mpsc::sync_channel::>(5); + let (pcdu_handler_composite_tx, pcdu_handler_composite_rx) = + mpsc::sync_channel::>(10); + let (mgm_handler_mode_tx, mgm_handler_mode_rx) = + mpsc::sync_channel::>(5); + let (pcdu_handler_mode_tx, pcdu_handler_mode_rx) = + mpsc::sync_channel::>(10); // Some request are targetable. This map is used to retrieve sender handles based on a target ID. let mut request_map = GenericRequestRouter::default(); request_map .composite_router_map - .insert(MGM_HANDLER_0.raw(), mgm_handler_composite_tx); + .insert(MGM_HANDLER_0.id(), mgm_handler_composite_tx); request_map .mode_router_map - .insert(MGM_HANDLER_0.raw(), mgm_handler_mode_tx); + .insert(MGM_HANDLER_0.id(), mgm_handler_mode_tx); + request_map + .composite_router_map + .insert(PCDU_HANDLER.id(), pcdu_handler_composite_tx); + request_map + .mode_router_map + .insert(PCDU_HANDLER.id(), pcdu_handler_mode_tx); // Create event handling components // These sender handles are used to send event requests, for example to enable or disable @@ -417,7 +429,7 @@ fn dyn_tmtc_pool_main() { let (event_request_tx, event_request_rx) = mpsc::channel::(); // The event task is the core handler to perform the event routing and TM handling as specified // in the sat-rs documentation. - let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_rx, event_request_rx); + let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx); let (pus_test_tx, pus_test_rx) = mpsc::channel(); let (pus_event_tx, pus_event_rx) = mpsc::channel(); @@ -440,30 +452,30 @@ fn dyn_tmtc_pool_main() { }; let pus_test_service = - create_test_service_dynamic(tm_funnel_tx.clone(), event_tx.clone(), pus_test_rx); + create_test_service_dynamic(tm_sink_tx.clone(), event_tx.clone(), pus_test_rx); let pus_scheduler_service = create_scheduler_service_dynamic( - tm_funnel_tx.clone(), + tm_sink_tx.clone(), tc_source_tx.clone(), pus_sched_rx, create_sched_tc_pool(), ); let pus_event_service = - create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx); + create_event_service_dynamic(tm_sink_tx.clone(), pus_event_rx, event_request_tx); let pus_action_service = create_action_service_dynamic( - tm_funnel_tx.clone(), + tm_sink_tx.clone(), pus_action_rx, request_map.clone(), pus_action_reply_rx, ); let pus_hk_service = create_hk_service_dynamic( - tm_funnel_tx.clone(), + tm_sink_tx.clone(), pus_hk_rx, request_map.clone(), pus_hk_reply_rx, ); let pus_mode_service = create_mode_service_dynamic( - tm_funnel_tx.clone(), + tm_sink_tx.clone(), pus_mode_rx, request_map, pus_mode_reply_rx, @@ -479,7 +491,7 @@ fn dyn_tmtc_pool_main() { let mut tmtc_task = TcSourceTaskDynamic::new( tc_source_rx, - PusTcDistributor::new(tm_funnel_tx.clone(), pus_router), + PusTcDistributor::new(tm_sink_tx.clone(), pus_router), ); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); @@ -508,18 +520,18 @@ fn dyn_tmtc_pool_main() { ) .expect("tcp server creation failed"); - let mut tm_funnel = TmSinkDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); + let mut tm_funnel = TmSinkDynamic::new(sync_tm_tcp_source, tm_sink_rx, tm_server_tx); let shared_switch_set = Arc::new(Mutex::default()); let (switch_request_tx, switch_request_rx) = mpsc::sync_channel(20); let switch_helper = PowerSwitchHelper::new(switch_request_tx, shared_switch_set.clone()); let (mgm_handler_mode_reply_to_parent_tx, _mgm_handler_mode_reply_to_parent_rx) = - mpsc::channel(); + mpsc::sync_channel(5); let shared_mgm_set = Arc::default(); let mode_leaf_interface = MpscModeLeafInterface { request_rx: mgm_handler_mode_rx, - reply_to_pus_tx: pus_mode_reply_tx, + reply_to_pus_tx: pus_mode_reply_tx.clone(), reply_to_parent_tx: mgm_handler_mode_reply_to_parent_tx, }; @@ -537,13 +549,41 @@ fn dyn_tmtc_pool_main() { "MGM_0", mode_leaf_interface, mgm_handler_composite_rx, - pus_hk_reply_tx, + pus_hk_reply_tx.clone(), switch_helper.clone(), - tm_funnel_tx, + tm_sink_tx.clone(), mgm_spi_interface, shared_mgm_set, ); + let (pcdu_handler_mode_reply_to_parent_tx, _pcdu_handler_mode_reply_to_parent_rx) = + mpsc::sync_channel(10); + let pcdu_mode_leaf_interface = MpscModeLeafInterface { + request_rx: pcdu_handler_mode_rx, + reply_to_pus_tx: pus_mode_reply_tx, + reply_to_parent_tx: pcdu_handler_mode_reply_to_parent_tx, + }; + let pcdu_serial_interface = if let Some(sim_client) = opt_sim_client.as_mut() { + sim_client.add_reply_recipient(satrs_minisim::SimComponent::Pcdu, pcdu_sim_reply_tx); + SerialSimInterfaceWrapper::Sim(SerialInterfaceToSim::new( + sim_request_tx.clone(), + pcdu_sim_reply_rx, + )) + } else { + SerialSimInterfaceWrapper::Dummy(SerialInterfaceDummy::default()) + }; + let mut pcdu_handler = PcduHandler::new( + PCDU_HANDLER, + "PCDU", + pcdu_mode_leaf_interface, + pcdu_handler_composite_rx, + pus_hk_reply_tx, + switch_request_rx, + tm_sink_tx, + pcdu_serial_interface, + shared_switch_set, + ); + info!("Starting TMTC and UDP task"); let jh_udp_tmtc = thread::Builder::new() .name("sat-rs tmtc-udp".to_string()) @@ -600,6 +640,21 @@ fn dyn_tmtc_pool_main() { }) .unwrap(); + info!("Starting EPS thread"); + let jh_eps = thread::Builder::new() + .name("sat-rs eps".to_string()) + .spawn(move || loop { + // TODO: We should introduce something like a fixed timeslot helper to allow a more + // declarative API. It would also be very useful for the AOCS task. + pcdu_handler.periodic_operation(eps::pcdu::OpCode::RegularOp); + thread::sleep(Duration::from_millis(50)); + pcdu_handler.periodic_operation(eps::pcdu::OpCode::PollAndRecvReplies); + thread::sleep(Duration::from_millis(50)); + pcdu_handler.periodic_operation(eps::pcdu::OpCode::PollAndRecvReplies); + thread::sleep(Duration::from_millis(300)); + }) + .unwrap(); + info!("Starting PUS handler thread"); let jh_pus_handler = thread::Builder::new() .name("sat-rs pus".to_string()) @@ -625,6 +680,7 @@ fn dyn_tmtc_pool_main() { .expect("Joining SIM client thread failed"); } jh_aocs.join().expect("Joining AOCS thread failed"); + jh_eps.join().expect("Joining EPS thread failed"); jh_pus_handler .join() .expect("Joining PUS handler thread failed"); diff --git a/satrs-example/src/requests.rs b/satrs-example/src/requests.rs index 445e05e..316a486 100644 --- a/satrs-example/src/requests.rs +++ b/satrs-example/src/requests.rs @@ -28,8 +28,9 @@ pub enum CompositeRequest { pub struct GenericRequestRouter { pub id: ComponentId, // All messages which do not have a dedicated queue. - pub composite_router_map: HashMap>>, - pub mode_router_map: HashMap>>, + pub composite_router_map: + HashMap>>, + pub mode_router_map: HashMap>>, } impl Default for GenericRequestRouter {