mod can; mod can_ids; mod ccsds; mod device_handler; mod example_main; mod hk; mod logger; mod pus; mod requests; mod tmtc; mod aocs; mod cam; mod action; use crate::hk::{AcsHkIds, HkRequest}; use crate::requests::{Request, RequestWithToken}; use crate::tmtc::{ core_tmtc_task, OtherArgs, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmStore, PUS_APID, }; use eurosim_obsw::{RequestTargetId, OBSW_SERVER_ADDR, SERVER_PORT}; use satrs_core::event_man::{ EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider, }; use satrs_core::events::EventU32; use satrs_core::pool::{LocalPool, PoolCfg, StoreAddr}; use satrs_core::pus::event_man::{ DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, PusEventDispatcher, }; use satrs_core::pus::hk::Subservice; use satrs_core::pus::verification::{ MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender, }; use satrs_core::pus::{EcssTmError, EcssTmErrorWithSend, EcssTmSenderCore}; use satrs_core::seq_count::{ SeqCountProviderSyncClonable, SequenceCountProvider, SequenceCountProviderCore, }; use satrs_core::{ spacepackets::time::cds::TimeProvider, spacepackets::time::TimeWriter, spacepackets::tm::{PusTm, PusTmSecondaryHeader}, spacepackets::{SequenceFlags, SpHeader}, }; use crate::can_ids::{can_id_to_package_id, load_package_ids, PackageId, PackageModel, ThreadId}; use embedded_can::{Id, StandardId}; use log::{info, warn}; use satrs_core::tmtc::tm_helper::PusTmWithCdsShortHelper; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::mpsc::{channel, RecvError, TryRecvError}; use std::sync::{mpsc, Arc, RwLock}; use std::thread; use std::time::Duration; //use libc::time64_t; use crate::action::ActionRequest; use crate::cam::CameraRequest; #[derive(Clone)] struct EventTmSender { store_helper: TmStore, sender: mpsc::Sender, } impl EventTmSender { fn new(store_helper: TmStore, sender: mpsc::Sender) -> Self { Self { store_helper, sender, } } } impl EcssTmSenderCore for EventTmSender { type Error = mpsc::SendError; fn send_tm(&mut self, tm: PusTm) -> Result<(), EcssTmErrorWithSend> { let addr = self.store_helper.add_pus_tm(&tm); self.sender .send(addr) .map_err(EcssTmErrorWithSend::SendError) } } fn main() { println!("Running SESPSat OBSW"); logger::setup_logger().unwrap(); let tm_pool = LocalPool::new(PoolCfg::new(vec![ (30, 32), (15, 64), (15, 128), (15, 256), (15, 1024), (15, 2048), ])); let tm_store = TmStore { pool: Arc::new(RwLock::new(Box::new(tm_pool))), }; let tc_pool = LocalPool::new(PoolCfg::new(vec![ (30, 32), (15, 64), (15, 128), (15, 256), (15, 1024), (15, 2048), ])); let tc_store = TcStore { pool: Arc::new(RwLock::new(Box::new(tc_pool))), }; let seq_count_provider = SeqCountProviderSyncClonable::default(); let aocs_seq_count_provider = seq_count_provider.clone(); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); let (tc_source_tx, tc_source_rx) = channel(); let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel(); let verif_sender = MpscVerifSender::new(tm_store.pool.clone(), tm_funnel_tx.clone()); let verif_cfg = VerificationReporterCfg::new( PUS_APID, #[allow(clippy::box_default)] Box::new(seq_count_provider.clone()), 1, 2, 8, ) .unwrap(); let verif_reporter = VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); // Create event handling components let (event_request_tx, event_request_rx) = channel::(); let (event_sender, event_man_rx) = channel(); let event_recv = MpscEventReceiver::::new(event_man_rx); let mut event_man = EventManagerWithMpscQueue::new(Box::new(event_recv)); let event_reporter = EventReporter::new(PUS_APID, 128).unwrap(); let pus_tm_backend = DefaultPusMgmtBackendProvider::::default(); let mut pus_event_dispatcher = PusEventDispatcher::new(event_reporter, Box::new(pus_tm_backend)); let (pus_event_man_tx, pus_event_man_rx) = channel(); let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx); let mut reporter_event_handler = verif_reporter.clone(); let mut reporter_aocs = verif_reporter.clone(); let mut reporter_pld = verif_reporter.clone(); event_man.subscribe_all(pus_event_man_send_provider.id()); let mut request_map = HashMap::new(); let (acs_thread_tx, acs_thread_rx) = channel::(); let (pld_thread_tx, pld_thread_rx) = channel::(); request_map.insert(RequestTargetId::AcsSubsystem as u32, acs_thread_tx); request_map.insert(RequestTargetId::PldSubsystem as u32, pld_thread_tx); //add here receivers for tmtc task to send requests to //request_map.insert(RequestTargetId::CanTask as u32, can_thread_tx); let tc_source = PusTcSource { tc_store, tc_source: tc_source_tx, }; // Create clones here to allow moving the values let core_args = OtherArgs { sock_addr, verif_reporter, event_sender, event_request_tx, request_map, }; let tc_args = TcArgs { tc_source, tc_receiver: tc_source_rx, }; let tm_args = TmArgs { tm_store: tm_store.clone(), tm_sink_sender: tm_funnel_tx.clone(), tm_server_rx, }; let (aocs_can_tx, aocs_can_rx) = mpsc::channel::(); let (power_can_tx, power_can_rx) = mpsc::channel::(); let (pld_can_tx, pld_can_rx) = mpsc::channel::(); // make tx thread id hashmap let mut can_senders = HashMap::new(); can_senders.insert(ThreadId::AOCSThread, aocs_can_tx); can_senders.insert(ThreadId::PowerThread, power_can_tx); can_senders.insert(ThreadId::PLDThread, pld_can_tx); // get package id hashmap let package_ids_rx = load_package_ids(); // checks for packet ids println!("{:?}", package_ids_rx[&PackageId::PCDUStatusRequest]); println!( "{:?}", package_ids_rx[&PackageId::CameraImageRequestConfirmation] ); let test = can_id_to_package_id(Id::Standard(StandardId::new(65).expect("Invalid Id"))); if let Some(id) = test { println!("{:?}", package_ids_rx[&id]); } let socket0 = can::CanRxHandler::new_socket("can0", can_senders, package_ids_rx).unwrap(); info!("Starting TMTC task"); let builder0 = thread::Builder::new().name("TMTCThread".into()); let jh0 = builder0.spawn(move || { core_tmtc_task(core_args, tc_args, tm_args); }); info!("Starting CAN Socket listening task"); let builder1 = thread::Builder::new().name("CanRxHandler".into()); let jh1 = builder1.spawn(move || loop { let frame = socket0.rx_socket(); if let Some(frame) = frame { let forward = socket0.forward_frame(frame); } }); /* let allowed_ids_range = 101..110; let mut allowed_ids_aocs = Vec::new(); for id in allowed_ids_range { allowed_ids_aocs.push(Id::Standard(StandardId::new(id).unwrap())); } */ let package_map_aocs_tx = load_package_ids(); let aocs_tm_funnel_tx = tm_funnel_tx.clone(); let mut aocs_tm_store = tm_store.clone(); /* // AOCS Thread let socket1 = can::CanTxHandler::new_socket("can0", ThreadId::AOCSThread, package_map_aocs_tx).unwrap(); info!("Starting AOCS receiving thread"); let builder2 = thread::Builder::new().name("AOCSThread".into()); let jh2 = builder2.spawn(move || { let mut time_stamp_buf: [u8; 7] = [0; 7]; let mut huge_buf: [u8; 8192] = [0; 8192]; let data: [u8; 3] = [1, 2, 3]; let current_mgm_data = MgmData::default(); //let current_mgt_data = MgtData::default(); //let current_ loop { // device handling //info!("Sending {:?}", PackageId::AOCSControlMGT1); //socket1.tx_socket(PackageId::AOCSControlMGT1, &data); //info!("Waiting for {:?}", PackageId::AOCSDataMGM1); let msg = aocs_can_rx.try_recv(); //current_mgm_data.x = new_data match aocs_can_rx.try_recv() { Ok(package) => match package.package_id() { _ => warn!("Incorrect Id"), }, Err(_) => {} } // telecommand handling match acs_thread_rx.try_recv() { Ok(request_with_token) => { match request_with_token.0 { Request::HkRequest(hk_req) => { match hk_req { HkRequest::OneShot(id) => { assert_eq!(id.target_id, RequestTargetId::AcsSubsystem as u32); if id.unique_id == 0 { let mut sp_header = SpHeader::tm_unseg( 0x02, aocs_seq_count_provider.get_and_increment(), 0, ) .unwrap(); let cds_stamp = TimeProvider::from_now_with_u16_days().unwrap(); cds_stamp.write_to_bytes(&mut time_stamp_buf); let mut len = id.write_to_be_bytes(&mut huge_buf).unwrap(); let json_string = "asdf"; huge_buf[8..json_string.len() + 8] .copy_from_slice(json_string.as_bytes()); len += json_string.len(); let tm_sec_header = PusTmSecondaryHeader::new_simple( 3, Subservice::TmHkPacket as u8, &time_stamp_buf, ); let hk_tm = PusTm::new( &mut sp_header, tm_sec_header, Some(&huge_buf[0..len]), true, ); let addr = aocs_tm_store.add_pus_tm(&hk_tm); aocs_tm_funnel_tx.send(addr).expect("sending failed"); /* let start_token = self //implement this for verification .verif_reporter .start_success(token, &self.time_stamp) .expect("Error sending start success"); self.tm_tx .send(addr) .expect("Sending TM to TM funnel failed"); self.verif_reporter .completion_success(start_token, &self.time_stamp) .expect("Error sending completion success"); */ } } HkRequest::Enable(_) => {} HkRequest::Disable(_) => {} HkRequest::ModifyCollectionInterval(_, _) => {} } } _ => {} } } Err(_) => {} } } }); */ let package_map_pld_tx = load_package_ids(); let pld_tm_funnel_tx = tm_funnel_tx.clone(); let mut pld_tm_store = tm_store.clone(); let PLDCanSocket = can::CanTxHandler::new_socket("can0", ThreadId::PLDThread, package_map_pld_tx).unwrap(); println!("Starting Payload Handling task"); let builder3 = thread::Builder::new().name("PLDThread".into()); let jh3 = builder3.spawn(move || { let mut time_stamp_buf: [u8; 7] = [0; 7]; loop { match pld_thread_rx.try_recv() { Ok(request_with_token) => { match request_with_token.0 { Request::ActionRequest(action_id) => { match action_id { ActionRequest::ImageRequest(target_id) => { assert_eq!(target_id, RequestTargetId::PldSubsystem); // get current time stamp let cds_stamp = TimeProvider::from_now_with_u16_days().unwrap(); cds_stamp.write_to_bytes(&mut time_stamp_buf); // send start verification and get token let start_token = reporter_pld .start_success(request_with_token.1, &time_stamp_buf) .expect("Error sending start success."); // make can bus package to camera let data = [1]; PLDCanSocket.tx_socket(PackageId::CameraImageRequest, &data); //let timeout = Duration::from_millis(400); // loop to allow early exit incase verif never arrives // wait for request verification loop { match pld_can_rx.recv() { Ok(msg) => { if msg.package_id() == PackageId::CameraImageRequestConfirmation && msg.data()[0] == 1 { break; } } Err(_) => { warn!("Error receiving Can Bus Message."); } } } // wait for start of execution loop { match pld_can_rx.recv() { Ok(msg) => { if msg.package_id() == PackageId::CameraImageExecutionStart && msg.data()[0] == 1 { break; } } Err(_) => { warn!("Error receiving Can Bus Message."); } } } // wait for end of execution loop { match pld_can_rx.recv() { Ok(msg) => { if msg.package_id() == PackageId::CameraImageExectutionEnd && msg.data()[0] == 1 { let cds_stamp = TimeProvider::from_now_with_u16_days().unwrap(); cds_stamp.write_to_bytes(&mut time_stamp_buf); // send end verification with token reporter_pld .completion_success(start_token, &time_stamp_buf) .expect("Error sending start success."); break; } } Err(_) => { warn!("Error receiving Can Bus Message."); } } } } ActionRequest::OrientationRequest(_) => {} } } _ => {} } } Err(_) => {} } //let addr = pld_tm_store.add_pus_tm(&hk_tm); //pld_tm_funnel_tx.send(a).expect("sending failed"); } }); println!("Starting TM funnel task"); let builder4 = thread::Builder::new().name("TMFunnelThread".into()); let jh4 = builder4.spawn(move || { let tm_funnel = TmFunnel { tm_server_tx, tm_funnel_rx, }; loop { if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() { tm_funnel .tm_server_tx .send(addr) .expect("Sending TM to server failed"); } } }); jh0.unwrap().join().expect("Joining UDP TMTC server thread failed"); jh1.unwrap() .join() .expect("Joining CAN Bus Listening thread failed"); //jh2.unwrap().join().expect("Joing AOCS thread failed"); jh3.unwrap().join().expect("Joing AOCS thread failed"); jh4.unwrap().join().expect("Joing AOCS thread failed"); /* jh1.join().expect("Joining TM Funnel thread failed"); jh2.join().expect("Joining Event Manager thread failed"); jh3.join().expect("Joining AOCS thread failed"); */ } #[derive(Default)] struct MgmData { x: i16, y: i16, z: i16, }