use std::{ env::temp_dir, net::{IpAddr, SocketAddr}, sync::{atomic::AtomicBool, mpsc, Arc}, thread, time::Duration, }; use log::info; use ops_sat_rs::config::{ cfg_file::create_app_config, components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, pool::create_sched_tc_pool, set_up_ground_dir, set_up_home_path, set_up_low_prio_ground_dir, tasks::{FREQ_MS_CAMERA_HANDLING, FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY}, HOME_PATH, STOP_FILE_NAME, VALID_PACKET_ID_LIST, VERSION, }; use ops_sat_rs::config::{components::CAMERA_HANDLER, tasks::FREQ_MS_EVENT_HANDLING}; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use ops_sat_rs::TimeStampHelper; use satrs::{ hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, pus::event_man::EventRequestWithToken, }; use crate::{controller::ControllerPathCollection, tmtc::tm_sink::TmFunnelDynamic}; use crate::{controller::ExperimentController, pus::test::create_test_service}; use crate::{ events::EventHandler, pus::{ hk::create_hk_service, mode::create_mode_service, scheduler::create_scheduler_service, PusTcDistributor, PusTcMpscRouter, }, }; use crate::{handlers::camera::Ims100BatchHandler, pus::event::create_event_service}; use crate::{ interface::tcp_server::{SyncTcpTmSource, TcpTask}, interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer}, logger::setup_logger, }; use crate::{interface::tcp_spp_client::TcpSppClientStd, tmtc::tc_source::TcSourceTaskDynamic}; use crate::{ pus::{action::create_action_service, stack::PusStack}, requests::GenericRequestRouter, }; mod controller; mod events; mod handlers; mod interface; mod logger; mod pus; mod requests; mod tmtc; fn main() { let version_str = VERSION.unwrap_or("?"); println!("OPS-SAT Rust Experiment OBSW v{}", version_str); setup_logger().expect("setting up logging with fern failed"); let home_path = set_up_home_path(); set_up_low_prio_ground_dir(home_path.clone()); set_up_ground_dir(home_path.clone()); let app_cfg = create_app_config(home_path.clone()); info!("App Configuration: {:?}", app_cfg); let stop_signal = Arc::new(AtomicBool::new(false)); let (tc_source_tx, tc_source_rx) = mpsc::channel(); let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); let (tm_tcp_server_tx, tm_tcp_server_rx) = mpsc::channel(); let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); let (pus_test_tx, pus_test_rx) = mpsc::channel(); let (pus_event_tx, pus_event_rx) = mpsc::channel(); let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); let (pus_action_tx, pus_action_rx) = mpsc::channel(); let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); // Create event handling components // These sender handles are used to send event requests, for example to enable or disable // certain events. let (event_tx, event_rx) = mpsc::sync_channel(100); let (event_request_tx, event_request_rx) = mpsc::channel::(); // The event task is the core handler to perform the event routing and TM handling as specified // in the sat-rs documentation. let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_rx, event_request_rx); let (pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); let (_pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); let (_pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel(); let (controller_composite_tx, controller_composite_rx) = mpsc::channel(); // let (controller_action_reply_tx, controller_action_reply_rx) = mpsc::channel(); let (camera_composite_tx, camera_composite_rx) = mpsc::channel(); // Some request are targetable. This map is used to retrieve sender handles based on a target ID. let mut request_map = GenericRequestRouter::default(); request_map .composite_router_map .insert(CONTROLLER_ID.id(), controller_composite_tx); request_map .composite_router_map .insert(CAMERA_HANDLER.id(), camera_composite_tx); let pus_router = PusTcMpscRouter { test_tc_sender: pus_test_tx, event_tc_sender: pus_event_tx, sched_tc_sender: pus_sched_tx, hk_tc_sender: pus_hk_tx, action_tc_sender: pus_action_tx, mode_tc_sender: pus_mode_tx, }; let pus_test_service = create_test_service(tm_funnel_tx.clone(), event_tx.clone(), pus_test_rx); let pus_scheduler_service = create_scheduler_service( tm_funnel_tx.clone(), tc_source_tx.clone(), pus_sched_rx, create_sched_tc_pool(), ); let pus_event_service = create_event_service(tm_funnel_tx.clone(), pus_event_rx, event_request_tx); let pus_action_service = create_action_service( tm_funnel_tx.clone(), pus_action_rx, request_map.clone(), pus_action_reply_rx, ); let pus_hk_service = create_hk_service( tm_funnel_tx.clone(), pus_hk_rx, request_map.clone(), pus_hk_reply_rx, ); let pus_mode_service = create_mode_service( tm_funnel_tx.clone(), pus_mode_rx, request_map, pus_mode_reply_rx, ); let mut pus_stack = PusStack::new( pus_test_service, pus_hk_service, pus_event_service, pus_action_service, pus_scheduler_service, pus_mode_service, ); let mut tmtc_task = TcSourceTaskDynamic::new( tc_source_rx, PusTcDistributor::new(tm_funnel_tx.clone(), pus_router), ); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); let udp_tc_server_result = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_source_tx.clone()); if udp_tc_server_result.is_err() { log::error!("UDP server creation failed"); } let mut opt_udp_tmtc_server = None; if let Ok(udp_tc_server) = udp_tc_server_result { opt_udp_tmtc_server = Some(UdpTmtcServer { udp_tc_server, tm_handler: DynamicUdpTmHandler { tm_rx: tm_tcp_server_rx, }, }); } let tcp_server_cfg = ServerConfig::new( TCP_SERVER.id(), sock_addr, Duration::from_millis(400), 4096, 8192, ); let sync_tm_tcp_source = SyncTcpTmSource::new(200); let mut tcp_server = TcpTask::new( tcp_server_cfg, sync_tm_tcp_source.clone(), tc_source_tx.clone(), VALID_PACKET_ID_LIST.to_vec(), stop_signal.clone(), ) .expect("tcp server creation failed"); let mut tm_sink = TmFunnelDynamic::new( sync_tm_tcp_source, tm_funnel_rx, tm_tcp_server_tx, tm_tcp_client_tx, stop_signal.clone(), ); let mut home_path_stop_file = home_path.clone(); home_path_stop_file.push(STOP_FILE_NAME); let mut tmp_path_stop_file = temp_dir(); tmp_path_stop_file.push(STOP_FILE_NAME); let mut controller = ExperimentController::new( controller_composite_rx, pus_action_reply_tx.clone(), stop_signal.clone(), ControllerPathCollection::new(&home_path), ); let mut tcp_spp_client = TcpSppClientStd::new( TCP_SPP_CLIENT.id(), tc_source_tx, tm_tcp_client_rx, VALID_PACKET_ID_LIST, STOP_CHECK_FREQUENCY, app_cfg.tcp_spp_server_port, ) .expect("creating TCP SPP client failed"); let timestamp_helper = TimeStampHelper::default(); // TODO: If the host feature is active, we should use an image executor // which only displays the execution parameters and does not try // to call the batch application which does not exist. let mut camera_handler: Ims100BatchHandler = Ims100BatchHandler::new_with_default_img_executor( CAMERA_HANDLER, HOME_PATH.get().unwrap(), camera_composite_rx, tm_funnel_tx.clone(), pus_action_reply_tx.clone(), timestamp_helper, ); // Main Task Thread Definitions // Main Experiment Control Task info!("Starting CTRL task"); let ctrl_stop_signal = stop_signal.clone(); let jh_ctrl_thread = thread::Builder::new() .name("ops-sat ctrl".to_string()) .spawn(move || loop { controller.perform_operation(); if ctrl_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } thread::sleep(Duration::from_millis(FREQ_MS_CTRL)); }) .unwrap(); // TMTC and UDP Task info!("Starting TMTC and UDP task"); let tmtc_stop_signal = stop_signal.clone(); let jh_udp_tmtc = thread::Builder::new() .name("ops-sat tmtc-udp".to_string()) .spawn(move || { info!("Running UDP server on port {SERVER_PORT}"); loop { if let Some(ref mut udp_tmtc_server) = opt_udp_tmtc_server { udp_tmtc_server.periodic_operation(); } tmtc_task.periodic_operation(); if tmtc_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); } }) .unwrap(); // TCP Server Task let tcp_server_stop_signal = stop_signal.clone(); info!("Starting TCP server task"); let jh_tcp_server = thread::Builder::new() .name("ops-sat tcp-server".to_string()) .spawn(move || { info!("Running TCP server on port {SERVER_PORT}"); loop { tcp_server.periodic_operation(); if tcp_server_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } } }) .unwrap(); // TCP SPP Client Task // We could also move this to the existing TCP server thread, but we would have to adapt // the server code for this so we do not block anymore and we pause manually if both the client // and server are IDLE and have nothing to do.. let tcp_client_stop_signal = stop_signal.clone(); info!("Starting TCP SPP client task"); let jh_tcp_client = thread::Builder::new() .name("ops-sat tcp-client".to_string()) .spawn(move || { info!("Running TCP SPP client"); loop { match tcp_spp_client.operation() { Ok(_result) => (), Err(e) => { log::error!("TCP SPP client error: {}", e); } } if tcp_client_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } } }) .unwrap(); // TM Funnel Task info!("Starting TM funnel task"); let tm_sink_stop_signal = stop_signal.clone(); let jh_tm_funnel = thread::Builder::new() .name("ops-sat tm-sink".to_string()) .spawn(move || loop { tm_sink.operation(); if tm_sink_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } }) .unwrap(); info!("Starting event handling task"); let event_stop_signal = stop_signal.clone(); let jh_event_handling = thread::Builder::new() .name("sat-rs events".to_string()) .spawn(move || loop { event_handler.periodic_operation(); if event_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING)); }) .unwrap(); // PUS Handler Task info!("Starting PUS handlers task"); let pus_stop_signal = stop_signal.clone(); let jh_pus_handler = thread::Builder::new() .name("ops-sat pus".to_string()) .spawn(move || loop { pus_stack.periodic_operation(); if pus_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); }) .unwrap(); // Camera Handler Task info!("Starting camera handler task"); let camera_stop_signal = stop_signal.clone(); let jh_camera_handler = thread::Builder::new() .name("ops-sat camera".to_string()) .spawn(move || loop { camera_handler.periodic_operation(); if camera_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } thread::sleep(Duration::from_millis(FREQ_MS_CAMERA_HANDLING)); }) .unwrap(); // Join Threads jh_ctrl_thread .join() .expect("Joining Controller thread failed"); jh_udp_tmtc .join() .expect("Joining UDP TMTC server thread failed"); jh_tcp_server .join() .expect("Joining TCP TMTC server thread failed"); jh_tcp_client .join() .expect("Joining TCP TMTC client thread failed"); jh_tm_funnel .join() .expect("Joining TM Funnel thread failed"); jh_pus_handler .join() .expect("Joining PUS handlers thread failed"); jh_event_handling .join() .expect("Joining PUS handlers thread failed"); jh_camera_handler .join() .expect("Joining camera handler thread failed"); }