From 82b7717b496521a4e23e8a4337b323347bdc2480 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 4 Jul 2023 18:51:54 +0200 Subject: [PATCH] next helper class --- satrs-example/src/main.rs | 46 ++++++-- satrs-example/src/pus/mod.rs | 60 +++++++++-- satrs-example/src/pus/scheduler.rs | 166 +++++++++++++++++++++++++++++ satrs-example/src/pus/test.rs | 105 +++++++++++------- satrs-example/src/tmtc.rs | 19 ++-- 5 files changed, 327 insertions(+), 69 deletions(-) create mode 100644 satrs-example/src/pus/scheduler.rs diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 916db1b..bc569d2 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -9,6 +9,8 @@ use log::{info, warn}; use crate::hk::AcsHkIds; use crate::logging::setup_logger; +use crate::pus::test::PusService17TestHandler; +use crate::pus::PusTcMpscRouter; use crate::requests::{Request, RequestWithToken}; use crate::tmtc::{ core_tmtc_task, OtherArgs, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, PUS_APID, @@ -35,16 +37,15 @@ use satrs_core::spacepackets::{ tm::{PusTm, PusTmSecondaryHeader}, SequenceFlags, SpHeader, }; -use satrs_core::tmtc::tm_helper::SharedTmStore; +use satrs_core::tmtc::tm_helper::{PusTmWithCdsShortHelper, SharedTmStore}; use satrs_core::tmtc::AddressableId; use satrs_example::{RequestTargetId, OBSW_SERVER_ADDR, SERVER_PORT}; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::mpsc::{channel, TryRecvError}; -use std::sync::{Arc, RwLock}; +use std::sync::{mpsc, Arc, RwLock}; use std::thread; use std::time::Duration; -use crate::pus::test::PusService17Handler; fn main() { setup_logger().expect("setting up logging with fern failed"); @@ -126,14 +127,14 @@ fn main() { request_map.insert(RequestTargetId::AcsSubsystem as u32, acs_thread_tx); let tc_source = PusTcSource { - tc_store, + tc_store: tc_store.clone(), tc_source: tc_source_tx, }; // Create clones here to allow moving the values let core_args = OtherArgs { sock_addr, - verif_reporter, + verif_reporter: verif_reporter.clone(), event_sender, event_request_tx, request_map, @@ -152,13 +153,32 @@ fn main() { let aocs_to_funnel = tm_funnel_tx.clone(); let mut aocs_tm_store = tm_store.clone(); - let pus17_handler = PusService17Handler::new() + let (pus_test_tx, pus_test_rx) = channel(); + let (pus_event_tx, pus_event_rx) = channel(); + let (pus_sched_tx, pus_sched_rx) = channel(); + let (pus_hk_tx, pus_hk_rx) = channel(); + let (pus_action_tx, pus_action_rx) = channel(); + let pus_router = PusTcMpscRouter { + test_service_receiver: pus_test_tx, + event_service_receiver: pus_event_tx, + sched_service_receiver: pus_sched_tx, + hk_service_receiver: pus_hk_tx, + action_service_receiver: pus_action_tx, + }; + let mut pus17_handler = PusService17TestHandler::new( + pus_test_rx, + tc_store.pool.clone(), + PusTmWithCdsShortHelper::new(PUS_APID), + tm_funnel_tx.clone(), + tm_store.clone(), + verif_reporter.clone(), + ); info!("Starting TMTC task"); let jh0 = thread::Builder::new() .name("TMTC".to_string()) .spawn(move || { - core_tmtc_task(core_args, tc_args, tm_args); + core_tmtc_task(core_args, tc_args, tm_args, pus_router); }) .unwrap(); @@ -315,10 +335,16 @@ fn main() { info!("Starting PUS handler thread"); let jh4 = thread::Builder::new() - .name("AOCS".to_string()) + .name("PUS".to_string()) .spawn(move || { - - }); + loop { + // TODO: Better error handling + let res = pus17_handler.periodic_operation(); + res.expect("some PUS17 error"); + thread::sleep(Duration::from_millis(400)); + } + }) + .unwrap(); jh0.join().expect("Joining UDP TMTC server thread failed"); jh1.join().expect("Joining TM Funnel thread failed"); jh2.join().expect("Joining Event Manager thread failed"); diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index 8146c1e..01300c2 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -1,10 +1,11 @@ +use crate::pus::test::PusService17TestHandler; use crate::tmtc::MpscStoreAndSendError; use satrs_core::events::EventU32; use satrs_core::hk::{CollectionIntervalFactor, HkRequest}; use satrs_core::mode::{ModeAndSubmode, ModeRequest}; use satrs_core::objects::ObjectId; use satrs_core::params::Params; -use satrs_core::pool::{PoolProvider, StoreAddr}; +use satrs_core::pool::{PoolProvider, SharedPool, StoreAddr}; use satrs_core::pus::event_man::{EventRequest, EventRequestWithToken}; use satrs_core::pus::hk; use satrs_core::pus::mode::Subservice; @@ -30,10 +31,46 @@ use std::cell::RefCell; use std::collections::HashMap; use std::convert::TryFrom; use std::rc::Rc; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::{Receiver, SendError, Sender}; +pub mod scheduler; pub mod test; +pub struct PusServiceBase { + tc_rx: Receiver, + tc_store: SharedPool, + tm_helper: PusTmWithCdsShortHelper, + tm_tx: Sender, + tm_store: SharedTmStore, + verification_handler: StdVerifReporterWithSender, + stamp_buf: [u8; 7], + pus_buf: [u8; 2048], + handled_tcs: u32, +} + +impl PusServiceBase { + pub fn new( + receiver: Receiver, + tc_pool: SharedPool, + tm_helper: PusTmWithCdsShortHelper, + tm_tx: Sender, + tm_store: SharedTmStore, + verification_handler: StdVerifReporterWithSender, + ) -> Self { + Self { + tc_rx: receiver, + tc_store: tc_pool, + tm_helper, + tm_tx, + tm_store, + verification_handler, + stamp_buf: [0; 7], + pus_buf: [0; 2048], + handled_tcs: 0, + } + } +} + // pub trait PusTcRouter { // type Error; // fn route_pus_tc( @@ -218,12 +255,19 @@ impl PusReceiver { let service = PusServiceId::try_from(service); match service { Ok(standard_service) => match standard_service { - PusServiceId::Test => self - .tc_args - .pus_router - .test_service_receiver - .send((store_addr, accepted_token)) - .unwrap(), + PusServiceId::Test => { + let res = self + .tc_args + .pus_router + .test_service_receiver + .send((store_addr, accepted_token)); + match res { + Ok(_) => {} + Err(e) => { + println!("Error {e}") + } + } + } PusServiceId::Housekeeping => self .tc_args .pus_router diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs new file mode 100644 index 0000000..cf35b5a --- /dev/null +++ b/satrs-example/src/pus/scheduler.rs @@ -0,0 +1,166 @@ +use crate::pus::{AcceptedTc, PusServiceBase}; +use delegate::delegate; +use satrs_core::pool::{SharedPool, StoreAddr}; +use satrs_core::pus::scheduling::PusScheduler; +use satrs_core::pus::verification::{ + pus_11_generic_tc_check, FailParams, StdVerifReporterWithSender, TcStateAccepted, + VerificationToken, +}; +use satrs_core::pus::GenericTcCheckError; +use satrs_core::spacepackets::ecss::scheduling; +use satrs_core::spacepackets::tc::PusTc; +use satrs_core::spacepackets::time::cds::TimeProvider; +use satrs_core::tmtc::tm_helper::{PusTmWithCdsShortHelper, SharedTmStore}; +use satrs_example::tmtc_err; +use std::sync::mpsc::{Receiver, Sender}; + +pub struct PusService11SchedHandler { + psb: PusServiceBase, + scheduler: PusScheduler, +} + +impl PusService11SchedHandler { + pub fn new( + receiver: Receiver, + tc_pool: SharedPool, + tm_helper: PusTmWithCdsShortHelper, + tm_tx: Sender, + tm_store: SharedTmStore, + verification_handler: StdVerifReporterWithSender, + scheduler: PusScheduler, + ) -> Self { + Self { + psb: PusServiceBase::new( + receiver, + tc_pool, + tm_helper, + tm_tx, + tm_store, + verification_handler, + ), + scheduler, + } + } + // TODO: Return errors which occured + pub fn periodic_operation(&mut self) -> Result { + Ok(self.psb.handled_tcs) + } + + pub fn handle_one_tc(&mut self, addr: StoreAddr, token: VerificationToken) { + let time_provider = TimeProvider::from_now_with_u16_days().unwrap(); + // TODO: Better error handling + { + // Keep locked section as short as possible. + let mut tc_pool = self.psb.tc_store.write().unwrap(); + let tc_guard = tc_pool.read_with_guard(addr); + let tc_raw = tc_guard.read().unwrap(); + self.psb.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); + } + let (tc, tc_size) = PusTc::from_bytes(&self.psb.pus_buf).unwrap(); + let subservice = match pus_11_generic_tc_check(&tc) { + Ok(subservice) => subservice, + Err(e) => match e { + GenericTcCheckError::NotEnoughAppData => { + self.psb + .verification_handler + .start_failure( + token, + FailParams::new( + Some(&self.psb.stamp_buf), + &tmtc_err::NOT_ENOUGH_APP_DATA, + None, + ), + ) + .expect("could not sent verification error"); + return; + } + GenericTcCheckError::InvalidSubservice => { + self.psb + .verification_handler + .start_failure( + token, + FailParams::new( + Some(&self.psb.stamp_buf), + &tmtc_err::INVALID_PUS_SUBSERVICE, + None, + ), + ) + .expect("could not sent verification error"); + return; + } + }, + }; + match subservice { + scheduling::Subservice::TcEnableScheduling => { + let start_token = self + .psb + .verification_handler + .start_success(token, Some(&self.psb.stamp_buf)) + .expect("Error sending start success"); + + self.scheduler.enable(); + if self.scheduler.is_enabled() { + self.psb + .verification_handler + .completion_success(start_token, Some(&self.psb.stamp_buf)) + .expect("Error sending completion success"); + } else { + panic!("Failed to enable scheduler"); + } + } + scheduling::Subservice::TcDisableScheduling => { + let start_token = self + .psb + .verification_handler + .start_success(token, Some(&self.psb.stamp_buf)) + .expect("Error sending start success"); + + self.scheduler.disable(); + if !self.scheduler.is_enabled() { + self.psb + .verification_handler + .completion_success(start_token, Some(&self.psb.stamp_buf)) + .expect("Error sending completion success"); + } else { + panic!("Failed to disable scheduler"); + } + } + scheduling::Subservice::TcResetScheduling => { + let start_token = self + .psb + .verification_handler + .start_success(token, Some(&self.psb.stamp_buf)) + .expect("Error sending start success"); + + let mut pool = self.psb.tc_store.write().expect("Locking pool failed"); + + self.scheduler + .reset(pool.as_mut()) + .expect("Error resetting TC Pool"); + + self.psb + .verification_handler + .completion_success(start_token, Some(&self.psb.stamp_buf)) + .expect("Error sending completion success"); + } + scheduling::Subservice::TcInsertActivity => { + let start_token = self + .psb + .verification_handler + .start_success(token, Some(&self.psb.stamp_buf)) + .expect("error sending start success"); + + let mut pool = self.psb.tc_store.write().expect("locking pool failed"); + self.scheduler + .insert_wrapped_tc::(&tc, pool.as_mut()) + .expect("insertion of activity into pool failed"); + + self.psb + .verification_handler + .completion_success(start_token, Some(&self.psb.stamp_buf)) + .expect("sending completion success failed"); + } + _ => {} + } + } +} diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index ccc9d2a..8ed6bd6 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -1,5 +1,8 @@ -use crate::pus::AcceptedTc; +use crate::pus::{AcceptedTc, PusServiceBase}; +use delegate::delegate; use log::info; +use satrs_core::events::EventU32; +use satrs_core::params::Params; use satrs_core::pool::{SharedPool, StoreAddr}; use satrs_core::pus::verification::{ StdVerifReporterWithSender, TcStateAccepted, VerificationToken, @@ -11,45 +14,49 @@ use satrs_core::spacepackets::time::cds::TimeProvider; use satrs_core::spacepackets::time::TimeWriter; use satrs_core::spacepackets::tm::PusTm; use satrs_core::tmtc::tm_helper::{PusTmWithCdsShortHelper, SharedTmStore}; +use satrs_example::TEST_EVENT; use std::sync::mpsc::{Receiver, Sender, TryRecvError}; -pub struct PusService17Handler { - tc_rx: Receiver, - tc_store: SharedPool, - tm_helper: PusTmWithCdsShortHelper, - tm_tx: Sender, - tm_store: SharedTmStore, - verification_handler: StdVerifReporterWithSender, - stamp_buf: [u8; 7], - pus_buf: [u8; 2048], - handled_tcs: u32, +pub struct SatrsTestServiceCustomHandler { + pub event_sender: Sender<(EventU32, Option)>, } -impl PusService17Handler { - pub fn new(receiver: Receiver, tc_pool: SharedPool, tm_helper: PusTmWithCdsShortHelper, tm_tx: Sender, tm_store: SharedTmStore, verification_handler: StdVerifReporterWithSender) -> Self { +pub struct PusService17TestHandler { + psb: PusServiceBase, +} + +impl PusService17TestHandler { + pub fn new( + receiver: Receiver, + tc_pool: SharedPool, + tm_helper: PusTmWithCdsShortHelper, + tm_tx: Sender, + tm_store: SharedTmStore, + verification_handler: StdVerifReporterWithSender, + ) -> Self { Self { - tc_rx: receiver, - tc_store: tc_pool, - tm_helper, - tm_tx, - tm_store, - verification_handler, - stamp_buf: [0; 7], - pus_buf: [0; 2048], - handled_tcs: 0 + psb: PusServiceBase::new( + receiver, + tc_pool, + tm_helper, + tm_tx, + tm_store, + verification_handler, + ), } } + // TODO: Return errors which occured pub fn periodic_operation(&mut self) -> Result { - self.handled_tcs = 0; + self.psb.handled_tcs = 0; loop { - match self.tc_rx.try_recv() { + match self.psb.tc_rx.try_recv() { Ok((addr, token)) => { self.handle_one_tc(addr, token); } Err(e) => { match e { - TryRecvError::Empty => return Ok(self.handled_tcs), + TryRecvError::Empty => return Ok(self.psb.handled_tcs), TryRecvError::Disconnected => { // TODO: Replace panic by something cleaner panic!("PusService17Handler: Sender disconnected"); @@ -62,34 +69,56 @@ impl PusService17Handler { pub fn handle_one_tc(&mut self, addr: StoreAddr, token: VerificationToken) { let time_provider = TimeProvider::from_now_with_u16_days().unwrap(); // TODO: Better error handling - let (addr, token) = self.tc_rx.try_recv().unwrap(); { // Keep locked section as short as possible. - let mut tc_pool = self.tc_store.write().unwrap(); + let mut tc_pool = self.psb.tc_store.write().unwrap(); let tc_guard = tc_pool.read_with_guard(addr); let tc_raw = tc_guard.read().unwrap(); - self.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); + self.psb.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); } - let tc = PusTc::from_bytes(&self.pus_buf).unwrap(); + let (tc, tc_size) = PusTc::from_bytes(&self.psb.pus_buf).unwrap(); // TODO: Robustness: Check that service is 17 - if tc.0.subservice() == 1 { + if tc.subservice() == 1 { info!("Received PUS ping command TC[17,1]"); info!("Sending ping reply PUS TM[17,2]"); - time_provider.write_to_bytes(&mut self.stamp_buf).unwrap(); + time_provider + .write_to_bytes(&mut self.psb.stamp_buf) + .unwrap(); let start_token = self + .psb .verification_handler - .start_success(token, Some(&self.stamp_buf)) + .start_success(token, Some(&self.psb.stamp_buf)) .expect("Error sending start success"); // Sequence count will be handled centrally in TM funnel. - let ping_reply = self.tm_helper.create_pus_tm_with_stamp(17, 2, None, &time_provider, 0); - let addr = self.tm_store.add_pus_tm(&ping_reply); - self.tm_tx + let ping_reply = + self.psb + .tm_helper + .create_pus_tm_with_stamp(17, 2, None, &time_provider, 0); + let addr = self.psb.tm_store.add_pus_tm(&ping_reply); + self.psb + .tm_tx .send(addr) .expect("Sending TM to TM funnel failed"); - self.verification_handler - .completion_success(start_token, Some(&self.stamp_buf)) + self.psb + .verification_handler + .completion_success(start_token, Some(&self.psb.stamp_buf)) .expect("Error sending completion success"); - self.handled_tcs += 1; + self.psb.handled_tcs += 1; } + // TODO: How to handle invalid subservice? + // TODO: How do we handle custom code like this? Custom subservice handler via trait? + // if tc.subservice() == 128 { + // info!("Generating test event"); + // self.event_sender + // .send((TEST_EVENT.into(), None)) + // .expect("Sending test event failed"); + // let start_token = + // verification_handler + // .start_success(token, Some(&stamp_buf)) + // .expect("Error sending start success"); + // verification_handler + // .completion_success(start_token, Some(&stamp_buf)) + // .expect("Error sending completion success"); + // } } diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index ded9090..320cf5a 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -152,7 +152,12 @@ impl ReceivesCcsdsTc for PusTcSource { } } -pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { +pub fn core_tmtc_task( + args: OtherArgs, + mut tc_args: TcArgs, + tm_args: TmArgs, + pus_router: PusTcMpscRouter, +) { let scheduler = Rc::new(RefCell::new( PusScheduler::new_with_current_init_time(Duration::from_secs(5)).unwrap(), )); @@ -164,18 +169,6 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { verif_reporter: args.verif_reporter, seq_count_provider: args.seq_count_provider.clone(), }; - let (pus_test_tx, pus_tedt_rx) = mpsc::channel(); - let (pus_event_tx, pus_event_rx) = mpsc::channel(); - let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); - let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); - let (pus_action_tx, pus_action_rx) = mpsc::channel(); - let pus_router = PusTcMpscRouter { - test_service_receiver: pus_test_tx, - event_service_receiver: pus_event_tx, - sched_service_receiver: pus_sched_tx, - hk_service_receiver: pus_hk_tx, - action_service_receiver: pus_action_tx, - }; let pus_tc_args = PusTcArgs { pus_router, event_sender: args.event_sender,