From 8df56ca63aa8d16d2091d1493330d0bfdc3a2aa2 Mon Sep 17 00:00:00 2001 From: lkoester Date: Thu, 26 Jan 2023 10:58:44 +0100 Subject: [PATCH] merged main --- satrs-core/src/pus/scheduling.rs | 270 ++++++++++++++++++++++++------- satrs-example/src/main.rs | 2 + satrs-example/src/pus.rs | 119 ++++++++------ satrs-example/src/tmtc.rs | 52 +++++- 4 files changed, 329 insertions(+), 114 deletions(-) diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index 87393db..71dee84 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -1,7 +1,10 @@ -use crate::pool::StoreAddr; +use crate::pool::{PoolProvider, StoreAddr, StoreError}; use alloc::collections::btree_map::{Entry, Range}; use core::time::Duration; -use spacepackets::time::UnixTimestamp; +use spacepackets::ecss::PusPacket; +use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc}; +use spacepackets::time::cds::DaysLen24Bits; +use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; use std::collections::BTreeMap; use std::time::SystemTimeError; use std::vec; @@ -37,6 +40,12 @@ impl PusScheduler { } } + #[cfg(feature = "std")] + #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] + pub fn new_with_current_init_time(time_margin: Duration) -> Result { + Ok(Self::new(UnixTimestamp::from_now()?, time_margin)) + } + pub fn num_scheduled_telecommands(&self) -> u64 { let mut num_entries = 0; for entries in &self.tc_map { @@ -57,9 +66,16 @@ impl PusScheduler { self.enabled = false; } - pub fn reset(&mut self) { + /// This will disable the scheduler and clear the schedule as specified in 6.11.4.4. + /// Be careful with this command as it will delete all the commands in the schedule. + /// + /// The holding store for the telecommands needs to be passed so all the stored telecommands + /// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error + /// will be returned but the method will still try to delete all the commands in the schedule. + pub fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError> { self.enabled = false; self.tc_map.clear(); + return Ok(()) } pub fn update_time(&mut self, current_time: UnixTimestamp) { @@ -70,7 +86,11 @@ impl PusScheduler { &self.current_time } - pub fn insert_tc(&mut self, time_stamp: UnixTimestamp, addr: StoreAddr) -> bool { + pub fn insert_unwrapped_and_stored_tc( + &mut self, + time_stamp: UnixTimestamp, + addr: StoreAddr, + ) -> bool { if time_stamp < self.current_time + self.time_margin { return false; } @@ -85,6 +105,73 @@ impl PusScheduler { true } + pub fn insert_unwrapped_tc( + &mut self, + time_stamp: UnixTimestamp, + tc: &[u8], + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + let check_tc = PusTc::from_bytes(tc).unwrap(); + if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { + // TODO: should not be able to schedule a scheduled tc + return Err(()); + } + + match pool.add(tc) { + Ok(addr) => { + let worked = self.insert_unwrapped_and_stored_tc(time_stamp, addr); + if worked { + return Ok(addr); + } else { + return Err(()); + } + } + Err(err) => { + return Err(()); + } + } + } + + // insert_wrapped_tc() + // (&dyn CcsdsTimeProvider)> + pub fn insert_wrapped_tc( + &mut self, + pus_tc: &PusTc, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + if PusPacket::service(pus_tc) != 11 || PusPacket::subservice(pus_tc) != 4 { + return Err(()); + } + + return if let Some(user_data) = pus_tc.user_data() { + let mut stamp: TimeStamp = match TimeReader::from_bytes(user_data) { + Ok(stamp) => stamp, + Err(error) => return Err(()), + }; + let unix_stamp = stamp.unix_stamp(); + let stamp_len = stamp.len_as_bytes(); + self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool) + } else { + Err(()) + } + } + + pub fn insert_wrapped_tc_cds_short( + &mut self, + pus_tc: &PusTc, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + self.insert_wrapped_tc::(pus_tc, pool) + } + + pub fn insert_wrapped_tc_cds_long( + &mut self, + pus_tc: &PusTc, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + self.insert_wrapped_tc::>(pus_tc, pool) + } + pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec> { self.tc_map.range(..=self.current_time) } @@ -96,20 +183,47 @@ impl PusScheduler { Ok(()) } - pub fn release_telecommands(&mut self, mut releaser: R) { + + /// Utility method which calls [Self::telecommands_to_release] and then calls a releaser + /// closure for each telecommand which should be released. This function will also delete + /// the telecommands from the holding store after calling the release closure. + /// + /// # Arguments + /// + /// * `releaser` - Closure where the first argument is whether the scheduler is enabled and + /// the second argument is the store address. This closure should return whether the + /// command should be deleted. + /// * `store` - The holding store of the telecommands. + pub fn release_telecommands bool>( + &mut self, + mut releaser: R, + tc_store: &mut (impl PoolProvider + ?Sized), + ) -> Result { let tcs_to_release = self.telecommands_to_release(); + let mut released_tcs = 0; + let mut store_error = Ok(()); for tc in tcs_to_release { for addr in tc.1 { - releaser(self.enabled, addr); + let should_delete = releaser(self.enabled, addr); + released_tcs += 1; + if should_delete { + let res = tc_store.delete(*addr); + if res.is_err() { + store_error = res; + } + } } } self.tc_map.retain(|k, _| k > &self.current_time); + store_error + .map(|_| released_tcs) + .map_err(|e| (released_tcs, e)) } } #[cfg(test)] mod tests { - use crate::pool::StoreAddr; + use crate::pool::{LocalPool, PoolCfg, StoreAddr}; use crate::pus::scheduling::PusScheduler; use spacepackets::ecss::PacketTypeCodes::UnsignedInt; use spacepackets::time::UnixTimestamp; @@ -118,6 +232,9 @@ mod tests { use std::time::Duration; use std::vec::Vec; use std::{println, vec}; + use heapless::pool::Pool; + use spacepackets::SpHeader; + use spacepackets::tc::PusTc; #[test] fn basic() { @@ -132,40 +249,24 @@ mod tests { fn reset() { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - - let worked = scheduler.insert_tc( - UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 1, - }, - ); + let first_addr = pool.add(&[0, 1, 2]).unwrap(); + let worked = scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr.clone()); assert!(worked); - let worked = scheduler.insert_tc( - UnixTimestamp::new_only_seconds(200), - StoreAddr { - pool_idx: 0, - packet_idx: 2, - }, - ); + let second_addr = pool.add(&[2, 3, 4]).unwrap(); + let worked = scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr.clone()); assert!(worked); - let worked = scheduler.insert_tc( - UnixTimestamp::new_only_seconds(300), - StoreAddr { - pool_idx: 0, - packet_idx: 2, - }, - ); + let third_addr = pool.add(&[5, 6, 7]).unwrap(); + let worked = scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(300), third_addr.clone()); assert!(worked); assert_eq!(scheduler.num_scheduled_telecommands(), 3); assert!(scheduler.is_enabled()); - scheduler.reset(); + scheduler.reset().unwrap(); assert!(!scheduler.is_enabled()); assert_eq!(scheduler.num_scheduled_telecommands(), 0); } @@ -175,7 +276,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let worked = scheduler.insert_tc( + let worked = scheduler.insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(100), StoreAddr { pool_idx: 0, @@ -185,7 +286,7 @@ mod tests { assert!(worked); - let worked = scheduler.insert_tc( + let worked = scheduler.insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(100), StoreAddr { pool_idx: 0, @@ -195,7 +296,7 @@ mod tests { assert!(worked); - let worked = scheduler.insert_tc( + let worked = scheduler.insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(300), StoreAddr { pool_idx: 0, @@ -222,21 +323,11 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - scheduler.insert_tc( - UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 1, - }, - ); + let first_addr = pool.add(&[2, 2, 2]).unwrap(); + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr); - scheduler.insert_tc( - UnixTimestamp::new_only_seconds(200), - StoreAddr { - pool_idx: 0, - packet_idx: 2, - }, - ); + let second_addr = pool.add(&[5, 6, 7]).unwrap(); + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr); let mut i = 0; let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| { @@ -290,21 +381,11 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - scheduler.insert_tc( - UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 1, - }, - ); + let first_addr = pool.add(&[2, 2, 2]).unwrap(); + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr); - scheduler.insert_tc( - UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 1, - }, - ); + let second_addr = pool.add(&[2, 2, 2]).unwrap(); + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), second_addr); let mut i = 0; let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { @@ -335,4 +416,71 @@ mod tests { // check that 2 total tcs have been released assert_eq!(i, 2); } + + #[test] + fn insert_unwrapped_tc() { + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + + let addr = scheduler.insert_unwrapped_tc(UnixTimestamp::new_only_seconds(100), &[1,2,3,4], &mut pool).unwrap(); + + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + + scheduler.update_time(UnixTimestamp::new_only_seconds(101)); + + let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { + common_check(boolvar, store_addr, vec![addr], &mut i); + true + }; + + scheduler.release_telecommands(&mut test_closure, &mut pool).unwrap(); + } + + fn scheduled_tc() -> PusTc<'static> { + let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap(); + PusTc::new_simple(&mut sph, 11, 4, None, true) + } + + #[test] + fn insert_wrapped_tc() { + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + + let tc = base_ping_tc_simple_ctor(); + + let addr = scheduler.insert_wrapped_tc( &tc, &mut pool).unwrap(); + + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + + scheduler.update_time(UnixTimestamp::new_only_seconds(101)); + + let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { + common_check(boolvar, store_addr, vec![addr], &mut i); + true + }; + + scheduler.release_telecommands(&mut test_closure, &mut pool).unwrap(); + } + + fn base_ping_tc_simple_ctor() -> PusTc<'static> { + let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap(); + PusTc::new_simple(&mut sph, 17, 1, None, true) + } + + fn insert_wrong_subservice() { + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + + let tc = base_ping_tc_simple_ctor(); + + let addr = scheduler.insert_wrapped_tc( &tc, &mut pool).unwrap(); + + assert_eq!(scheduler.num_scheduled_telecommands(), 0); + } } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 32cc6e6..549ffce 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -1,3 +1,5 @@ +extern crate core; + mod ccsds; mod hk; mod pus; diff --git a/satrs-example/src/pus.rs b/satrs-example/src/pus.rs index a6c40f4..0084ca5 100644 --- a/satrs-example/src/pus.rs +++ b/satrs-example/src/pus.rs @@ -2,7 +2,7 @@ use crate::hk::{CollectionIntervalFactor, HkRequest}; use crate::requests::{Request, RequestWithToken}; use crate::tmtc::{PusTcSource, TmStore}; use satrs_core::events::EventU32; -use satrs_core::pool::StoreAddr; +use satrs_core::pool::{StoreAddr, StoreError}; use satrs_core::pus::event::Subservices; use satrs_core::pus::event_man::{EventRequest, EventRequestWithToken}; use satrs_core::pus::hk; @@ -13,15 +13,18 @@ use satrs_core::res_code::ResultU16; use satrs_core::tmtc::tm_helper::PusTmWithCdsShortHelper; use satrs_core::tmtc::{AddressableId, PusServiceProvider}; use satrs_core::{ - spacepackets::ecss::PusPacket, spacepackets::tc::PusTc, spacepackets::time::cds::TimeProvider, - spacepackets::time::TimeWriter, spacepackets::SpHeader, + spacepackets, spacepackets::ecss::PusPacket, spacepackets::tc::PusTc, + spacepackets::time::cds::TimeProvider, spacepackets::time::TimeWriter, spacepackets::SpHeader, }; use satrs_example::{hk_err, tmtc_err}; use std::collections::HashMap; +use std::rc::Rc; use std::sync::mpsc::Sender; use std::time::Duration; use satrs_core::pus::scheduling::{PusScheduler, ScheduleSubservice}; use satrs_core::spacepackets::time::{CcsdsTimeProvider, UnixTimestamp}; +use std::sync::{Arc, LockResult, Mutex}; + pub struct PusReceiver { pub tm_helper: PusTmWithCdsShortHelper, @@ -34,7 +37,7 @@ pub struct PusReceiver { request_map: HashMap>, stamper: TimeProvider, time_stamp: [u8; 7], - scheduler: PusScheduler, + scheduler: Arc>, } impl PusReceiver { @@ -46,8 +49,8 @@ impl PusReceiver { tc_source: PusTcSource, event_request_tx: Sender, request_map: HashMap>, + scheduler: Arc>, ) -> Self { - let scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); Self { tm_helper: PusTmWithCdsShortHelper::new(apid), tm_tx, @@ -58,7 +61,7 @@ impl PusReceiver { request_map, stamper: TimeProvider::new_with_u16_days(0, 0), time_stamp: [0; 7], - scheduler + scheduler, } } } @@ -299,69 +302,95 @@ impl PusReceiver { match pus_tc.subservice() { 1 => { + let mut scheduler = self.scheduler.lock().expect("Lock of scheduler failed"); + let start_token = self .verif_reporter .start_success(token, Some(&self.time_stamp)) .expect("Error sending start success"); - self.scheduler.enable(); - if self.scheduler.is_enabled() { + + scheduler.enable(); + if scheduler.is_enabled() { self.verif_reporter .completion_success(start_token, Some(&self.time_stamp)) .expect("Error sending completion success"); } else { - // TODO: ??? - //self.verif_reporter - // .completion_failure(start_token, &tmtc_err::NOT_ENOUGH_APP_DATA, none) + + panic!("Failed to enable scheduler"); } - }, + } 2 => { - let start_token = self - .verif_reporter - .start_success(token, Some(&self.time_stamp)) - .expect("Error sending start success"); - self.scheduler.disable(); - if ! self.scheduler.is_enabled() { - self.verif_reporter - .completion_success(start_token, Some(&self.time_stamp)) - .expect("Error sending completion success"); - } else { - // TODO: ??? - //self.verif_reporter - // .completion_failure(start_token, &tmtc_err::NOT_ENOUGH_APP_DATA, none) - } - }, - 3 => { + let mut scheduler = self.scheduler.lock().expect("Lock of scheduler failed"); let start_token = self .verif_reporter .start_success(token, Some(&self.time_stamp)) .expect("Error sending start success"); - self.scheduler.reset(); - if !self.scheduler.is_enabled() && self.scheduler.num_scheduled_telecommands() == 0 { + scheduler.disable(); + if !scheduler.is_enabled() { self.verif_reporter .completion_success(start_token, Some(&self.time_stamp)) .expect("Error sending completion success"); } else { - // TODO: ??? - //self.verif_reporter - // .completion_failure(start_token, &tmtc_err::NOT_ENOUGH_APP_DATA, none) + + panic!("Failed to disable scheduler"); } - }, + } + 3 => { + let mut scheduler = self.scheduler.lock().expect("Lock of scheduler failed"); + + let start_token = self + .verif_reporter + .start_success(token, Some(&self.time_stamp)) + .expect("Error sending start success"); + match self.tc_source.tc_store.pool.write() { + Ok(mut pool) => { + match scheduler.reset(pool.as_mut()) { + Ok(_) => { + self.verif_reporter + .completion_success(start_token, Some(&self.time_stamp)) + .expect("Error sending completion success"); + } + Err(_) => { + // TODO + } + } + } + Err(_) => {} + } + } 4 => { - self.update_time_stamp(); - let unix_time = UnixTimestamp::new_only_seconds(self.stamper.unix_seconds()); - let worked = self.scheduler.insert_tc(unix_time, ); - }, + let mut scheduler = self.scheduler.lock().expect("Lock of scheduler failed"); + let start_token = self + .verif_reporter + .start_success(token, Some(&self.time_stamp)) + .expect("Error sending start success"); + match self.tc_source.tc_store.pool.write() { + Ok(mut pool) => { + scheduler.insert_wrapped_tc::( + pus_tc, + pool.as_mut(), + ); + } + Err(_) => {} + } + + //let addr = self.tc_source.tc_store.add_pus_tc().unwrap(); + //let unix_time = UnixTimestamp::new_only_seconds(self.stamper.unix_seconds()); + //let worked = self.scheduler.insert_tc(unix_time, ); + } _ => { self.verif_reporter - .start_failure( - token, - FailParams::new(Some(&self.time_stamp), &tmtc_err::NOT_ENOUGH_APP_DATA, None), - ) - .expect("Sending start failure TM failed"); + .start_failure( + token, + FailParams::new( + Some(&self.time_stamp), + &tmtc_err::NOT_ENOUGH_APP_DATA, + None, + ), + ) + .expect("Sending start failure TM failed"); return; } } - - } } diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index c8b8e61..728c19a 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -5,7 +5,9 @@ use std::collections::HashMap; use std::error::Error; use std::fmt::{Display, Formatter}; use std::net::SocketAddr; +use std::rc::Rc; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; +use std::sync::{Arc, LockResult, Mutex}; use std::thread; use std::time::Duration; @@ -42,6 +44,12 @@ pub struct TcArgs { pub tc_receiver: Receiver, } +impl TcArgs { + fn split(self) -> (PusTcSource, Receiver) { + (self.tc_source, self.tc_receiver) + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum MpscStoreAndSendError { StoreError(StoreError), @@ -153,6 +161,10 @@ impl ReceivesCcsdsTc for PusTcSource { } } pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { + let mut scheduler = Arc::new(Mutex::new( + PusScheduler::new_with_current_init_time(Duration::from_secs(5)).unwrap(), + )); + let mut sched_clone = scheduler.clone(); let mut pus_receiver = PusReceiver::new( PUS_APID, tm_args.tm_sink_sender, @@ -161,6 +173,7 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { tc_args.tc_source.clone(), args.event_request_tx, args.request_map, + sched_clone, ); let ccsds_receiver = CcsdsReceiver { tc_source: tc_args.tc_source.clone(), @@ -174,22 +187,48 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { tm_rx: tm_args.tm_server_rx, tm_store: tm_args.tm_store.pool.clone(), }; + + let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| true; + let (mut tc_source, mut tc_receiver) = tc_args.split(); + loop { - core_tmtc_loop(&mut udp_tmtc_server, &mut tc_args, &mut pus_receiver); + let mut tmtc_sched = scheduler.clone(); + core_tmtc_loop( + &mut udp_tmtc_server, + &mut tc_source, + &mut tc_receiver, + &mut pus_receiver, + tmtc_sched, + ); thread::sleep(Duration::from_millis(400)); } } fn core_tmtc_loop( udp_tmtc_server: &mut UdpTmtcServer, - tc_args: &mut TcArgs, + tc_source: &mut PusTcSource, + tc_receiver: &mut Receiver, pus_receiver: &mut PusReceiver, + scheduler: Arc>, ) { + let releaser = |enabled: bool, addr: &StoreAddr| { + tc_source.tc_source.send(*addr); + true + }; + + let mut scheduler = scheduler.lock().expect("Lock of scheduler failed"); + match tc_source.tc_store.pool.write() { + Ok(mut pool) => match scheduler.release_telecommands(releaser, pool.as_mut()) { + Ok(_) => {} + Err(_) => {} + }, + Err(_) => {} + } + while poll_tc_server(udp_tmtc_server) {} - match tc_args.tc_receiver.try_recv() { + match tc_receiver.try_recv() { Ok(addr) => { - let pool = tc_args - .tc_source + let pool = tc_source .tc_store .pool .read() @@ -241,9 +280,6 @@ fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { } } -fn poll_tc_scheduler(scheduler: &mut PusScheduler) { - match scheduler.release_telecommands() -} fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) { while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() {