From bb21345f0198aa1148ba8330c7b83ea98999bbd9 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 5 Feb 2024 16:42:10 +0100 Subject: [PATCH] start cleaning up example app --- satrs-core/src/pool.rs | 2 +- satrs-example/src/acs.rs | 112 ++++++++++++++++++++ satrs-example/src/main.rs | 182 +++++++-------------------------- satrs-example/src/requests.rs | 2 +- satrs-example/src/tm_funnel.rs | 74 ++++++++++++++ satrs-example/src/tmtc.rs | 5 - 6 files changed, 227 insertions(+), 150 deletions(-) create mode 100644 satrs-example/src/acs.rs create mode 100644 satrs-example/src/tm_funnel.rs diff --git a/satrs-core/src/pool.rs b/satrs-core/src/pool.rs index 0111d18..3ab67a1 100644 --- a/satrs-core/src/pool.rs +++ b/satrs-core/src/pool.rs @@ -337,7 +337,7 @@ mod alloc_mod { } impl StaticPoolConfig { - pub fn new(cfg: Vec<(NumBlocks, usize)>) -> Self { + pub const fn new(cfg: Vec<(NumBlocks, usize)>) -> Self { StaticPoolConfig { cfg } } diff --git a/satrs-example/src/acs.rs b/satrs-example/src/acs.rs new file mode 100644 index 0000000..fa3a75b --- /dev/null +++ b/satrs-example/src/acs.rs @@ -0,0 +1,112 @@ +use std::sync::mpsc::{self, TryRecvError}; + +use log::{info, warn}; +use satrs_core::pool::StoreAddr; +use satrs_core::pus::verification::VerificationReporterWithSender; +use satrs_core::spacepackets::ecss::hk::Subservice as HkSubservice; +use satrs_core::tmtc::tm_helper::SharedTmStore; +use satrs_core::{ + hk::HkRequest, + spacepackets::{ + ecss::tm::{PusTmCreator, PusTmSecondaryHeader}, + time::cds::{DaysLen16Bits, TimeProvider}, + SequenceFlags, SpHeader, + }, +}; +use satrs_example::{RequestTargetId, PUS_APID}; + +use crate::{ + hk::{AcsHkIds, HkUniqueId}, + requests::{Request, RequestWithToken}, + update_time, +}; + +pub struct AcsTask { + timestamp: [u8; 7], + time_provider: TimeProvider, + tm_store: SharedTmStore, + tm_funnel: mpsc::Sender, + request_rx: mpsc::Receiver, + verif_reporter: VerificationReporterWithSender, +} + +impl AcsTask { + pub fn new( + tm_store: SharedTmStore, + tm_funnel: mpsc::Sender, + request_rx: mpsc::Receiver, + verif_reporter: VerificationReporterWithSender, + ) -> Self { + Self { + timestamp: [0; 7], + time_provider: TimeProvider::new_with_u16_days(0, 0), + tm_store, + tm_funnel, + request_rx, + verif_reporter, + } + } + + fn handle_hk_request(&mut self, target_id: u32, unique_id: u32) { + assert_eq!(target_id, RequestTargetId::AcsSubsystem as u32); + if unique_id == AcsHkIds::TestMgmSet as u32 { + let mut sp_header = SpHeader::tm(PUS_APID, SequenceFlags::Unsegmented, 0, 0).unwrap(); + let sec_header = PusTmSecondaryHeader::new_simple( + 3, + HkSubservice::TmHkPacket as u8, + &self.timestamp, + ); + let mut buf: [u8; 8] = [0; 8]; + let hk_id = HkUniqueId::new(target_id, unique_id); + hk_id.write_to_be_bytes(&mut buf).unwrap(); + let pus_tm = PusTmCreator::new(&mut sp_header, sec_header, &buf, true); + let addr = self + .tm_store + .add_pus_tm(&pus_tm) + .expect("Adding PUS TM failed"); + self.tm_funnel.send(addr).expect("Sending HK TM failed"); + } + // TODO: Verification failure for invalid unique IDs. + } + pub fn periodic_operation(&mut self) { + match self.request_rx.try_recv() { + Ok(request) => { + info!( + "ACS thread: Received HK request {:?}", + request.targeted_request + ); + update_time(&mut self.time_provider, &mut self.timestamp); + match request.targeted_request.request { + Request::Hk(hk_req) => match hk_req { + HkRequest::OneShot(unique_id) => self.handle_hk_request( + request.targeted_request.target_id_with_apid.target_id(), + unique_id, + ), + HkRequest::Enable(_) => {} + HkRequest::Disable(_) => {} + HkRequest::ModifyCollectionInterval(_, _) => {} + }, + Request::Mode(_mode_req) => { + warn!("mode request handling not implemented yet") + } + Request::Action(_action_req) => { + warn!("action request handling not implemented yet") + } + } + let started_token = self + .verif_reporter + .start_success(request.token, Some(&self.timestamp)) + .expect("Sending start success failed"); + self.verif_reporter + .completion_success(started_token, Some(&self.timestamp)) + .expect("Sending completion success failed"); + } + Err(e) => match e { + TryRecvError::Empty => {} + TryRecvError::Disconnected => { + warn!("ACS thread: Message Queue TX disconnected!") + } + }, + } + } +} diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index ed1ef32..527565e 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -1,18 +1,21 @@ +mod acs; mod ccsds; mod hk; mod logger; mod pus; mod requests; mod tcp; +mod tm_funnel; mod tmtc; mod udp; -use log::{info, warn}; +use crate::tm_funnel::TmFunnel; +use log::info; use satrs_core::hal::std::tcp_server::ServerConfig; use satrs_core::hal::std::udp_server::UdpTcServer; +use crate::acs::AcsTask; use crate::ccsds::CcsdsReceiver; -use crate::hk::{AcsHkIds, HkUniqueId}; use crate::logger::setup_logger; use crate::pus::action::{Pus8Wrapper, PusService8ActionHandler}; use crate::pus::event::Pus5Wrapper; @@ -20,22 +23,20 @@ use crate::pus::hk::{Pus3Wrapper, PusService3HkHandler}; use crate::pus::scheduler::Pus11Wrapper; use crate::pus::test::Service17CustomWrapper; use crate::pus::{PusReceiver, PusTcMpscRouter}; -use crate::requests::{Request, RequestWithToken}; +use crate::requests::RequestWithToken; use crate::tcp::{SyncTcpTmSource, TcpTask}; -use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmtcTask}; +use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmtcTask}; use crate::udp::UdpTmtcServer; use satrs_core::event_man::{ EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider, }; use satrs_core::events::EventU32; -use satrs_core::hk::HkRequest; -use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StaticPoolConfig}; +use satrs_core::pool::{StaticMemoryPool, StaticPoolConfig}; use satrs_core::pus::event_man::{ DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, PusEventDispatcher, }; use satrs_core::pus::event_srv::PusService5EventHandler; -use satrs_core::pus::hk::Subservice as HkSubservice; use satrs_core::pus::scheduler::PusScheduler; use satrs_core::pus::scheduler_srv::PusService11SchedHandler; use satrs_core::pus::test::PusService17TestHandler; @@ -45,12 +46,7 @@ use satrs_core::pus::verification::{ use satrs_core::pus::{ EcssTcInSharedStoreConverter, MpscTcReceiver, MpscTmInStoreSender, PusServiceHelper, }; -use satrs_core::seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}; -use satrs_core::spacepackets::ecss::tm::{PusTmCreator, PusTmZeroCopyWriter}; -use satrs_core::spacepackets::{ - ecss::tm::PusTmSecondaryHeader, time::cds::TimeProvider, time::TimeWriter, SequenceFlags, - SpHeader, -}; +use satrs_core::spacepackets::{time::cds::TimeProvider, time::TimeWriter}; use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::{CcsdsDistributor, TargetId}; use satrs_core::ChannelId; @@ -60,7 +56,7 @@ use satrs_example::{ }; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; -use std::sync::mpsc::{channel, TryRecvError}; +use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; @@ -98,8 +94,6 @@ fn main() { (15, 2048), ])); - let seq_count_provider = CcsdsSimpleSeqCountProvider::new(); - let mut msg_counter_map: HashMap = HashMap::new(); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); let (tc_source_tx, tc_source_rx) = channel(); let (tm_funnel_tx, tm_funnel_rx) = channel(); @@ -161,9 +155,6 @@ fn main() { tm_udp_server_rx: tm_server_rx, }; - let aocs_tm_funnel = tm_funnel_tx.clone(); - let aocs_tm_store = shared_tm_store.clone(); - let (pus_test_tx, pus_test_rx) = channel(); let (pus_event_tx, pus_event_rx) = channel(); let (pus_sched_tx, pus_sched_rx) = channel(); @@ -304,6 +295,30 @@ fn main() { tm_store: tm_args.tm_store.clone_backing_pool(), }; + let mut acs_task = AcsTask::new( + shared_tm_store.clone(), + tm_funnel_tx.clone(), + acs_thread_rx, + reporter_aocs, + ); + + let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); + let sync_tm_tcp_source = SyncTcpTmSource::new(200); + let mut tcp_server = TcpTask::new( + tcp_server_cfg, + sync_tm_tcp_source.clone(), + tcp_ccsds_distributor, + ) + .expect("tcp server creation failed"); + + let mut tm_funnel = TmFunnel::new( + shared_tm_store, + sync_tm_tcp_source, + tm_funnel_rx, + tm_server_tx, + ); + info!("Starting TMTC and UDP task"); let jh_udp_tmtc = thread::Builder::new() .name("TMTC and UDP".to_string()) @@ -317,15 +332,6 @@ fn main() { }) .unwrap(); - let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); - let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); - let mut sync_tm_tcp_source = SyncTcpTmSource::new(200); - let mut tcp_server = TcpTask::new( - tcp_server_cfg, - sync_tm_tcp_source.clone(), - tcp_ccsds_distributor, - ) - .expect("tcp server creation failed"); info!("Starting TCP task"); let jh_tcp = thread::Builder::new() .name("TCP".to_string()) @@ -340,43 +346,8 @@ fn main() { info!("Starting TM funnel task"); let jh1 = thread::Builder::new() .name("TM Funnel".to_string()) - .spawn(move || { - let tm_funnel = TmFunnel { - tm_server_tx, - tm_funnel_rx, - }; - loop { - if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() { - // Read the TM, set sequence counter and message counter, and finally update - // the CRC. - let shared_pool = shared_tm_store.clone_backing_pool(); - let mut pool_guard = shared_pool.write().expect("Locking TM pool failed"); - let tm_raw = pool_guard - .modify(&addr) - .expect("Reading TM from pool failed"); - let mut zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw) - .expect("Creating TM zero copy writer failed"); - zero_copy_writer.set_apid(PUS_APID); - zero_copy_writer.set_seq_count(seq_count_provider.get_and_increment()); - let entry = msg_counter_map - .entry(zero_copy_writer.service()) - .or_insert(0); - zero_copy_writer.set_msg_count(*entry); - if *entry == u16::MAX { - *entry = 0; - } else { - *entry += 1; - } - - // This operation has to come last! - zero_copy_writer.finish(); - tm_funnel - .tm_server_tx - .send(addr) - .expect("Sending TM to server failed"); - sync_tm_tcp_source.add_tm(tm_raw); - } - } + .spawn(move || loop { + tm_funnel.operation(); }) .unwrap(); @@ -442,84 +413,9 @@ fn main() { info!("Starting AOCS thread"); let jh3 = thread::Builder::new() .name("AOCS".to_string()) - .spawn(move || { - let mut timestamp: [u8; 7] = [0; 7]; - let mut time_provider = TimeProvider::new_with_u16_days(0, 0); - loop { - // TODO: Move this into a separate function/task/module.. - match acs_thread_rx.try_recv() { - Ok(request) => { - info!( - "ACS thread: Received HK request {:?}", - request.targeted_request - ); - update_time(&mut time_provider, &mut timestamp); - match request.targeted_request.request { - Request::Hk(hk_req) => match hk_req { - HkRequest::OneShot(unique_id) => { - // TODO: We should check whether the unique ID is even valid. - let target = request.targeted_request.target_id; - assert_eq!( - target.target_id(), - RequestTargetId::AcsSubsystem as u32 - ); - if request.targeted_request.target_id.target - == AcsHkIds::TestMgmSet as u32 - { - let mut sp_header = SpHeader::tm( - PUS_APID, - SequenceFlags::Unsegmented, - 0, - 0, - ) - .unwrap(); - let sec_header = PusTmSecondaryHeader::new_simple( - 3, - HkSubservice::TmHkPacket as u8, - ×tamp, - ); - let mut buf: [u8; 8] = [0; 8]; - let hk_id = HkUniqueId::new(target.target_id(), unique_id); - hk_id.write_to_be_bytes(&mut buf).unwrap(); - let pus_tm = PusTmCreator::new( - &mut sp_header, - sec_header, - &buf, - true, - ); - let addr = aocs_tm_store - .add_pus_tm(&pus_tm) - .expect("Adding PUS TM failed"); - aocs_tm_funnel.send(addr).expect("Sending HK TM failed"); - } - } - HkRequest::Enable(_) => {} - HkRequest::Disable(_) => {} - HkRequest::ModifyCollectionInterval(_, _) => {} - }, - Request::Mode(_mode_req) => { - warn!("mode request handling not implemented yet") - } - Request::Action(_action_req) => { - warn!("action request handling not implemented yet") - } - } - let started_token = reporter_aocs - .start_success(request.token, Some(×tamp)) - .expect("Sending start success failed"); - reporter_aocs - .completion_success(started_token, Some(×tamp)) - .expect("Sending completion success failed"); - } - Err(e) => match e { - TryRecvError::Empty => {} - TryRecvError::Disconnected => { - warn!("ACS thread: Message Queue TX disconnected!") - } - }, - } - thread::sleep(Duration::from_millis(500)); - } + .spawn(move || loop { + acs_task.periodic_operation(); + thread::sleep(Duration::from_millis(500)); }) .unwrap(); diff --git a/satrs-example/src/requests.rs b/satrs-example/src/requests.rs index bcae8d7..031f0f4 100644 --- a/satrs-example/src/requests.rs +++ b/satrs-example/src/requests.rs @@ -22,7 +22,7 @@ pub enum Request { #[derive(Clone, Eq, PartialEq, Debug, new)] pub struct TargetedRequest { - pub(crate) target_id: TargetIdWithApid, + pub(crate) target_id_with_apid: TargetIdWithApid, pub(crate) request: Request, } diff --git a/satrs-example/src/tm_funnel.rs b/satrs-example/src/tm_funnel.rs new file mode 100644 index 0000000..cca3bb1 --- /dev/null +++ b/satrs-example/src/tm_funnel.rs @@ -0,0 +1,74 @@ +use std::{ + collections::HashMap, + sync::mpsc::{Receiver, Sender}, +}; + +use satrs_core::{ + pool::{PoolProviderMemInPlace, StoreAddr}, + seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, + spacepackets::ecss::tm::PusTmZeroCopyWriter, + tmtc::tm_helper::SharedTmStore, +}; +use satrs_example::PUS_APID; + +use crate::tcp::SyncTcpTmSource; + +pub struct TmFunnel { + shared_tm_store: SharedTmStore, + seq_count_provider: CcsdsSimpleSeqCountProvider, + sync_tm_tcp_source: SyncTcpTmSource, + msg_counter_map: HashMap, + tm_funnel_rx: Receiver, + tm_server_tx: Sender, +} + +impl TmFunnel { + pub fn new( + shared_tm_store: SharedTmStore, + sync_tm_tcp_source: SyncTcpTmSource, + tm_funnel_rx: Receiver, + tm_server_tx: Sender, + ) -> Self { + Self { + shared_tm_store, + seq_count_provider: CcsdsSimpleSeqCountProvider::new(), + msg_counter_map: HashMap::new(), + sync_tm_tcp_source, + tm_funnel_rx, + tm_server_tx, + } + } + + pub fn operation(&mut self) { + if let Ok(addr) = self.tm_funnel_rx.recv() { + // Read the TM, set sequence counter and message counter, and finally update + // the CRC. + let shared_pool = self.shared_tm_store.clone_backing_pool(); + let mut pool_guard = shared_pool.write().expect("Locking TM pool failed"); + let tm_raw = pool_guard + .modify(&addr) + .expect("Reading TM from pool failed"); + let mut zero_copy_writer = + PusTmZeroCopyWriter::new(tm_raw).expect("Creating TM zero copy writer failed"); + zero_copy_writer.set_apid(PUS_APID); + zero_copy_writer.set_seq_count(self.seq_count_provider.get_and_increment()); + let entry = self + .msg_counter_map + .entry(zero_copy_writer.service()) + .or_insert(0); + zero_copy_writer.set_msg_count(*entry); + if *entry == u16::MAX { + *entry = 0; + } else { + *entry += 1; + } + + // This operation has to come last! + zero_copy_writer.finish(); + self.tm_server_tx + .send(addr) + .expect("Sending TM to server failed"); + self.sync_tm_tcp_source.add_tm(tm_raw); + } + } +} diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index b4a584d..9bbe1ba 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -53,11 +53,6 @@ impl TcStore { } } -pub struct TmFunnel { - pub tm_funnel_rx: Receiver, - pub tm_server_tx: Sender, -} - #[derive(Clone)] pub struct PusTcSource { pub tc_source: Sender,