From dce29035a2a6cf4d822838426a62555ef0def6b4 Mon Sep 17 00:00:00 2001 From: lkoester Date: Wed, 25 Jan 2023 10:15:21 +0100 Subject: [PATCH 01/11] merging main --- satrs-core/src/pus/scheduling.rs | 12 +++++ satrs-example/src/pus.rs | 91 ++++++++++++++++++++++++++++++++ satrs-example/src/tmtc.rs | 5 ++ 3 files changed, 108 insertions(+) diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index 3205a59..87393db 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -7,6 +7,18 @@ use std::time::SystemTimeError; use std::vec; use std::vec::Vec; + +//TODO: Move to spacepackets +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum ScheduleSubservice { + EnableScheduling = 1, + DisableScheduling = 2, + ResetScheduling = 3, + InsertActivity = 4, + DeleteActivity = 5, + +} + #[derive(Debug)] pub struct PusScheduler { tc_map: BTreeMap>, diff --git a/satrs-example/src/pus.rs b/satrs-example/src/pus.rs index 108b086..a6c40f4 100644 --- a/satrs-example/src/pus.rs +++ b/satrs-example/src/pus.rs @@ -19,6 +19,9 @@ use satrs_core::{ use satrs_example::{hk_err, tmtc_err}; use std::collections::HashMap; use std::sync::mpsc::Sender; +use std::time::Duration; +use satrs_core::pus::scheduling::{PusScheduler, ScheduleSubservice}; +use satrs_core::spacepackets::time::{CcsdsTimeProvider, UnixTimestamp}; pub struct PusReceiver { pub tm_helper: PusTmWithCdsShortHelper, @@ -31,6 +34,7 @@ pub struct PusReceiver { request_map: HashMap>, stamper: TimeProvider, time_stamp: [u8; 7], + scheduler: PusScheduler, } impl PusReceiver { @@ -43,6 +47,7 @@ impl PusReceiver { event_request_tx: Sender, request_map: HashMap>, ) -> Self { + let scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); Self { tm_helper: PusTmWithCdsShortHelper::new(apid), tm_tx, @@ -53,6 +58,7 @@ impl PusReceiver { request_map, stamper: TimeProvider::new_with_u16_days(0, 0), time_stamp: [0; 7], + scheduler } } } @@ -78,6 +84,8 @@ impl PusServiceProvider for PusReceiver { self.handle_event_request(pus_tc, accepted_token); } else if service == 3 { self.handle_hk_request(pus_tc, accepted_token); + } else if service == 11 { + self.handle_scheduled_tc(pus_tc, accepted_token); } else { self.update_time_stamp(); self.verif_reporter @@ -201,6 +209,7 @@ impl PusReceiver { )); } } + fn handle_event_request(&mut self, pus_tc: &PusTc, token: VerificationToken) { let send_start_failure = |verif_reporter: &mut StdVerifReporterWithSender, timestamp: &[u8; 7], @@ -273,4 +282,86 @@ impl PusReceiver { } } } + + fn handle_scheduled_tc(&mut self, pus_tc: &PusTc, token: VerificationToken) { + if pus_tc.user_data().is_none() { + self.update_time_stamp(); + self.verif_reporter + .start_failure( + token, + FailParams::new(Some(&self.time_stamp), &tmtc_err::NOT_ENOUGH_APP_DATA, None), + ) + .expect("Sending start failure TM failed"); + return; + } + + self.update_time_stamp(); + + match pus_tc.subservice() { + 1 => { + let start_token = self + .verif_reporter + .start_success(token, Some(&self.time_stamp)) + .expect("Error sending start success"); + self.scheduler.enable(); + if self.scheduler.is_enabled() { + self.verif_reporter + .completion_success(start_token, Some(&self.time_stamp)) + .expect("Error sending completion success"); + } else { + // TODO: ??? + //self.verif_reporter + // .completion_failure(start_token, &tmtc_err::NOT_ENOUGH_APP_DATA, none) + } + }, + 2 => { + let start_token = self + .verif_reporter + .start_success(token, Some(&self.time_stamp)) + .expect("Error sending start success"); + self.scheduler.disable(); + if ! self.scheduler.is_enabled() { + self.verif_reporter + .completion_success(start_token, Some(&self.time_stamp)) + .expect("Error sending completion success"); + } else { + // TODO: ??? + //self.verif_reporter + // .completion_failure(start_token, &tmtc_err::NOT_ENOUGH_APP_DATA, none) + } + }, + 3 => { + let start_token = self + .verif_reporter + .start_success(token, Some(&self.time_stamp)) + .expect("Error sending start success"); + self.scheduler.reset(); + if !self.scheduler.is_enabled() && self.scheduler.num_scheduled_telecommands() == 0 { + self.verif_reporter + .completion_success(start_token, Some(&self.time_stamp)) + .expect("Error sending completion success"); + } else { + // TODO: ??? + //self.verif_reporter + // .completion_failure(start_token, &tmtc_err::NOT_ENOUGH_APP_DATA, none) + } + }, + 4 => { + self.update_time_stamp(); + let unix_time = UnixTimestamp::new_only_seconds(self.stamper.unix_seconds()); + let worked = self.scheduler.insert_tc(unix_time, ); + }, + _ => { + self.verif_reporter + .start_failure( + token, + FailParams::new(Some(&self.time_stamp), &tmtc_err::NOT_ENOUGH_APP_DATA, None), + ) + .expect("Sending start failure TM failed"); + return; + } + } + + + } } diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 4314deb..c8b8e61 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -14,6 +14,7 @@ use crate::pus::PusReceiver; use crate::requests::RequestWithToken; use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; use satrs_core::pus::event_man::EventRequestWithToken; +use satrs_core::pus::scheduling::PusScheduler; use satrs_core::pus::verification::StdVerifReporterWithSender; use satrs_core::spacepackets::{ecss::PusPacket, tc::PusTc, tm::PusTm, SpHeader}; use satrs_core::tmtc::{ @@ -240,6 +241,10 @@ fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { } } +fn poll_tc_scheduler(scheduler: &mut PusScheduler) { + match scheduler.release_telecommands() +} + fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) { while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() { let mut store_lock = udp_tmtc_server -- 2.43.0 From 8df56ca63aa8d16d2091d1493330d0bfdc3a2aa2 Mon Sep 17 00:00:00 2001 From: lkoester Date: Thu, 26 Jan 2023 10:58:44 +0100 Subject: [PATCH 02/11] merged main --- satrs-core/src/pus/scheduling.rs | 270 ++++++++++++++++++++++++------- satrs-example/src/main.rs | 2 + satrs-example/src/pus.rs | 119 ++++++++------ satrs-example/src/tmtc.rs | 52 +++++- 4 files changed, 329 insertions(+), 114 deletions(-) diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index 87393db..71dee84 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -1,7 +1,10 @@ -use crate::pool::StoreAddr; +use crate::pool::{PoolProvider, StoreAddr, StoreError}; use alloc::collections::btree_map::{Entry, Range}; use core::time::Duration; -use spacepackets::time::UnixTimestamp; +use spacepackets::ecss::PusPacket; +use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc}; +use spacepackets::time::cds::DaysLen24Bits; +use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; use std::collections::BTreeMap; use std::time::SystemTimeError; use std::vec; @@ -37,6 +40,12 @@ impl PusScheduler { } } + #[cfg(feature = "std")] + #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] + pub fn new_with_current_init_time(time_margin: Duration) -> Result { + Ok(Self::new(UnixTimestamp::from_now()?, time_margin)) + } + pub fn num_scheduled_telecommands(&self) -> u64 { let mut num_entries = 0; for entries in &self.tc_map { @@ -57,9 +66,16 @@ impl PusScheduler { self.enabled = false; } - pub fn reset(&mut self) { + /// This will disable the scheduler and clear the schedule as specified in 6.11.4.4. + /// Be careful with this command as it will delete all the commands in the schedule. + /// + /// The holding store for the telecommands needs to be passed so all the stored telecommands + /// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error + /// will be returned but the method will still try to delete all the commands in the schedule. + pub fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError> { self.enabled = false; self.tc_map.clear(); + return Ok(()) } pub fn update_time(&mut self, current_time: UnixTimestamp) { @@ -70,7 +86,11 @@ impl PusScheduler { &self.current_time } - pub fn insert_tc(&mut self, time_stamp: UnixTimestamp, addr: StoreAddr) -> bool { + pub fn insert_unwrapped_and_stored_tc( + &mut self, + time_stamp: UnixTimestamp, + addr: StoreAddr, + ) -> bool { if time_stamp < self.current_time + self.time_margin { return false; } @@ -85,6 +105,73 @@ impl PusScheduler { true } + pub fn insert_unwrapped_tc( + &mut self, + time_stamp: UnixTimestamp, + tc: &[u8], + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + let check_tc = PusTc::from_bytes(tc).unwrap(); + if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { + // TODO: should not be able to schedule a scheduled tc + return Err(()); + } + + match pool.add(tc) { + Ok(addr) => { + let worked = self.insert_unwrapped_and_stored_tc(time_stamp, addr); + if worked { + return Ok(addr); + } else { + return Err(()); + } + } + Err(err) => { + return Err(()); + } + } + } + + // insert_wrapped_tc() + // (&dyn CcsdsTimeProvider)> + pub fn insert_wrapped_tc( + &mut self, + pus_tc: &PusTc, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + if PusPacket::service(pus_tc) != 11 || PusPacket::subservice(pus_tc) != 4 { + return Err(()); + } + + return if let Some(user_data) = pus_tc.user_data() { + let mut stamp: TimeStamp = match TimeReader::from_bytes(user_data) { + Ok(stamp) => stamp, + Err(error) => return Err(()), + }; + let unix_stamp = stamp.unix_stamp(); + let stamp_len = stamp.len_as_bytes(); + self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool) + } else { + Err(()) + } + } + + pub fn insert_wrapped_tc_cds_short( + &mut self, + pus_tc: &PusTc, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + self.insert_wrapped_tc::(pus_tc, pool) + } + + pub fn insert_wrapped_tc_cds_long( + &mut self, + pus_tc: &PusTc, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + self.insert_wrapped_tc::>(pus_tc, pool) + } + pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec> { self.tc_map.range(..=self.current_time) } @@ -96,20 +183,47 @@ impl PusScheduler { Ok(()) } - pub fn release_telecommands(&mut self, mut releaser: R) { + + /// Utility method which calls [Self::telecommands_to_release] and then calls a releaser + /// closure for each telecommand which should be released. This function will also delete + /// the telecommands from the holding store after calling the release closure. + /// + /// # Arguments + /// + /// * `releaser` - Closure where the first argument is whether the scheduler is enabled and + /// the second argument is the store address. This closure should return whether the + /// command should be deleted. + /// * `store` - The holding store of the telecommands. + pub fn release_telecommands bool>( + &mut self, + mut releaser: R, + tc_store: &mut (impl PoolProvider + ?Sized), + ) -> Result { let tcs_to_release = self.telecommands_to_release(); + let mut released_tcs = 0; + let mut store_error = Ok(()); for tc in tcs_to_release { for addr in tc.1 { - releaser(self.enabled, addr); + let should_delete = releaser(self.enabled, addr); + released_tcs += 1; + if should_delete { + let res = tc_store.delete(*addr); + if res.is_err() { + store_error = res; + } + } } } self.tc_map.retain(|k, _| k > &self.current_time); + store_error + .map(|_| released_tcs) + .map_err(|e| (released_tcs, e)) } } #[cfg(test)] mod tests { - use crate::pool::StoreAddr; + use crate::pool::{LocalPool, PoolCfg, StoreAddr}; use crate::pus::scheduling::PusScheduler; use spacepackets::ecss::PacketTypeCodes::UnsignedInt; use spacepackets::time::UnixTimestamp; @@ -118,6 +232,9 @@ mod tests { use std::time::Duration; use std::vec::Vec; use std::{println, vec}; + use heapless::pool::Pool; + use spacepackets::SpHeader; + use spacepackets::tc::PusTc; #[test] fn basic() { @@ -132,40 +249,24 @@ mod tests { fn reset() { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - - let worked = scheduler.insert_tc( - UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 1, - }, - ); + let first_addr = pool.add(&[0, 1, 2]).unwrap(); + let worked = scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr.clone()); assert!(worked); - let worked = scheduler.insert_tc( - UnixTimestamp::new_only_seconds(200), - StoreAddr { - pool_idx: 0, - packet_idx: 2, - }, - ); + let second_addr = pool.add(&[2, 3, 4]).unwrap(); + let worked = scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr.clone()); assert!(worked); - let worked = scheduler.insert_tc( - UnixTimestamp::new_only_seconds(300), - StoreAddr { - pool_idx: 0, - packet_idx: 2, - }, - ); + let third_addr = pool.add(&[5, 6, 7]).unwrap(); + let worked = scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(300), third_addr.clone()); assert!(worked); assert_eq!(scheduler.num_scheduled_telecommands(), 3); assert!(scheduler.is_enabled()); - scheduler.reset(); + scheduler.reset().unwrap(); assert!(!scheduler.is_enabled()); assert_eq!(scheduler.num_scheduled_telecommands(), 0); } @@ -175,7 +276,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let worked = scheduler.insert_tc( + let worked = scheduler.insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(100), StoreAddr { pool_idx: 0, @@ -185,7 +286,7 @@ mod tests { assert!(worked); - let worked = scheduler.insert_tc( + let worked = scheduler.insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(100), StoreAddr { pool_idx: 0, @@ -195,7 +296,7 @@ mod tests { assert!(worked); - let worked = scheduler.insert_tc( + let worked = scheduler.insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(300), StoreAddr { pool_idx: 0, @@ -222,21 +323,11 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - scheduler.insert_tc( - UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 1, - }, - ); + let first_addr = pool.add(&[2, 2, 2]).unwrap(); + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr); - scheduler.insert_tc( - UnixTimestamp::new_only_seconds(200), - StoreAddr { - pool_idx: 0, - packet_idx: 2, - }, - ); + let second_addr = pool.add(&[5, 6, 7]).unwrap(); + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr); let mut i = 0; let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| { @@ -290,21 +381,11 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - scheduler.insert_tc( - UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 1, - }, - ); + let first_addr = pool.add(&[2, 2, 2]).unwrap(); + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr); - scheduler.insert_tc( - UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 1, - }, - ); + let second_addr = pool.add(&[2, 2, 2]).unwrap(); + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), second_addr); let mut i = 0; let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { @@ -335,4 +416,71 @@ mod tests { // check that 2 total tcs have been released assert_eq!(i, 2); } + + #[test] + fn insert_unwrapped_tc() { + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + + let addr = scheduler.insert_unwrapped_tc(UnixTimestamp::new_only_seconds(100), &[1,2,3,4], &mut pool).unwrap(); + + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + + scheduler.update_time(UnixTimestamp::new_only_seconds(101)); + + let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { + common_check(boolvar, store_addr, vec![addr], &mut i); + true + }; + + scheduler.release_telecommands(&mut test_closure, &mut pool).unwrap(); + } + + fn scheduled_tc() -> PusTc<'static> { + let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap(); + PusTc::new_simple(&mut sph, 11, 4, None, true) + } + + #[test] + fn insert_wrapped_tc() { + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + + let tc = base_ping_tc_simple_ctor(); + + let addr = scheduler.insert_wrapped_tc( &tc, &mut pool).unwrap(); + + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + + scheduler.update_time(UnixTimestamp::new_only_seconds(101)); + + let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { + common_check(boolvar, store_addr, vec![addr], &mut i); + true + }; + + scheduler.release_telecommands(&mut test_closure, &mut pool).unwrap(); + } + + fn base_ping_tc_simple_ctor() -> PusTc<'static> { + let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap(); + PusTc::new_simple(&mut sph, 17, 1, None, true) + } + + fn insert_wrong_subservice() { + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + + let tc = base_ping_tc_simple_ctor(); + + let addr = scheduler.insert_wrapped_tc( &tc, &mut pool).unwrap(); + + assert_eq!(scheduler.num_scheduled_telecommands(), 0); + } } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 32cc6e6..549ffce 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -1,3 +1,5 @@ +extern crate core; + mod ccsds; mod hk; mod pus; diff --git a/satrs-example/src/pus.rs b/satrs-example/src/pus.rs index a6c40f4..0084ca5 100644 --- a/satrs-example/src/pus.rs +++ b/satrs-example/src/pus.rs @@ -2,7 +2,7 @@ use crate::hk::{CollectionIntervalFactor, HkRequest}; use crate::requests::{Request, RequestWithToken}; use crate::tmtc::{PusTcSource, TmStore}; use satrs_core::events::EventU32; -use satrs_core::pool::StoreAddr; +use satrs_core::pool::{StoreAddr, StoreError}; use satrs_core::pus::event::Subservices; use satrs_core::pus::event_man::{EventRequest, EventRequestWithToken}; use satrs_core::pus::hk; @@ -13,15 +13,18 @@ use satrs_core::res_code::ResultU16; use satrs_core::tmtc::tm_helper::PusTmWithCdsShortHelper; use satrs_core::tmtc::{AddressableId, PusServiceProvider}; use satrs_core::{ - spacepackets::ecss::PusPacket, spacepackets::tc::PusTc, spacepackets::time::cds::TimeProvider, - spacepackets::time::TimeWriter, spacepackets::SpHeader, + spacepackets, spacepackets::ecss::PusPacket, spacepackets::tc::PusTc, + spacepackets::time::cds::TimeProvider, spacepackets::time::TimeWriter, spacepackets::SpHeader, }; use satrs_example::{hk_err, tmtc_err}; use std::collections::HashMap; +use std::rc::Rc; use std::sync::mpsc::Sender; use std::time::Duration; use satrs_core::pus::scheduling::{PusScheduler, ScheduleSubservice}; use satrs_core::spacepackets::time::{CcsdsTimeProvider, UnixTimestamp}; +use std::sync::{Arc, LockResult, Mutex}; + pub struct PusReceiver { pub tm_helper: PusTmWithCdsShortHelper, @@ -34,7 +37,7 @@ pub struct PusReceiver { request_map: HashMap>, stamper: TimeProvider, time_stamp: [u8; 7], - scheduler: PusScheduler, + scheduler: Arc>, } impl PusReceiver { @@ -46,8 +49,8 @@ impl PusReceiver { tc_source: PusTcSource, event_request_tx: Sender, request_map: HashMap>, + scheduler: Arc>, ) -> Self { - let scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); Self { tm_helper: PusTmWithCdsShortHelper::new(apid), tm_tx, @@ -58,7 +61,7 @@ impl PusReceiver { request_map, stamper: TimeProvider::new_with_u16_days(0, 0), time_stamp: [0; 7], - scheduler + scheduler, } } } @@ -299,69 +302,95 @@ impl PusReceiver { match pus_tc.subservice() { 1 => { + let mut scheduler = self.scheduler.lock().expect("Lock of scheduler failed"); + let start_token = self .verif_reporter .start_success(token, Some(&self.time_stamp)) .expect("Error sending start success"); - self.scheduler.enable(); - if self.scheduler.is_enabled() { + + scheduler.enable(); + if scheduler.is_enabled() { self.verif_reporter .completion_success(start_token, Some(&self.time_stamp)) .expect("Error sending completion success"); } else { - // TODO: ??? - //self.verif_reporter - // .completion_failure(start_token, &tmtc_err::NOT_ENOUGH_APP_DATA, none) + + panic!("Failed to enable scheduler"); } - }, + } 2 => { - let start_token = self - .verif_reporter - .start_success(token, Some(&self.time_stamp)) - .expect("Error sending start success"); - self.scheduler.disable(); - if ! self.scheduler.is_enabled() { - self.verif_reporter - .completion_success(start_token, Some(&self.time_stamp)) - .expect("Error sending completion success"); - } else { - // TODO: ??? - //self.verif_reporter - // .completion_failure(start_token, &tmtc_err::NOT_ENOUGH_APP_DATA, none) - } - }, - 3 => { + let mut scheduler = self.scheduler.lock().expect("Lock of scheduler failed"); let start_token = self .verif_reporter .start_success(token, Some(&self.time_stamp)) .expect("Error sending start success"); - self.scheduler.reset(); - if !self.scheduler.is_enabled() && self.scheduler.num_scheduled_telecommands() == 0 { + scheduler.disable(); + if !scheduler.is_enabled() { self.verif_reporter .completion_success(start_token, Some(&self.time_stamp)) .expect("Error sending completion success"); } else { - // TODO: ??? - //self.verif_reporter - // .completion_failure(start_token, &tmtc_err::NOT_ENOUGH_APP_DATA, none) + + panic!("Failed to disable scheduler"); } - }, + } + 3 => { + let mut scheduler = self.scheduler.lock().expect("Lock of scheduler failed"); + + let start_token = self + .verif_reporter + .start_success(token, Some(&self.time_stamp)) + .expect("Error sending start success"); + match self.tc_source.tc_store.pool.write() { + Ok(mut pool) => { + match scheduler.reset(pool.as_mut()) { + Ok(_) => { + self.verif_reporter + .completion_success(start_token, Some(&self.time_stamp)) + .expect("Error sending completion success"); + } + Err(_) => { + // TODO + } + } + } + Err(_) => {} + } + } 4 => { - self.update_time_stamp(); - let unix_time = UnixTimestamp::new_only_seconds(self.stamper.unix_seconds()); - let worked = self.scheduler.insert_tc(unix_time, ); - }, + let mut scheduler = self.scheduler.lock().expect("Lock of scheduler failed"); + let start_token = self + .verif_reporter + .start_success(token, Some(&self.time_stamp)) + .expect("Error sending start success"); + match self.tc_source.tc_store.pool.write() { + Ok(mut pool) => { + scheduler.insert_wrapped_tc::( + pus_tc, + pool.as_mut(), + ); + } + Err(_) => {} + } + + //let addr = self.tc_source.tc_store.add_pus_tc().unwrap(); + //let unix_time = UnixTimestamp::new_only_seconds(self.stamper.unix_seconds()); + //let worked = self.scheduler.insert_tc(unix_time, ); + } _ => { self.verif_reporter - .start_failure( - token, - FailParams::new(Some(&self.time_stamp), &tmtc_err::NOT_ENOUGH_APP_DATA, None), - ) - .expect("Sending start failure TM failed"); + .start_failure( + token, + FailParams::new( + Some(&self.time_stamp), + &tmtc_err::NOT_ENOUGH_APP_DATA, + None, + ), + ) + .expect("Sending start failure TM failed"); return; } } - - } } diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index c8b8e61..728c19a 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -5,7 +5,9 @@ use std::collections::HashMap; use std::error::Error; use std::fmt::{Display, Formatter}; use std::net::SocketAddr; +use std::rc::Rc; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; +use std::sync::{Arc, LockResult, Mutex}; use std::thread; use std::time::Duration; @@ -42,6 +44,12 @@ pub struct TcArgs { pub tc_receiver: Receiver, } +impl TcArgs { + fn split(self) -> (PusTcSource, Receiver) { + (self.tc_source, self.tc_receiver) + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum MpscStoreAndSendError { StoreError(StoreError), @@ -153,6 +161,10 @@ impl ReceivesCcsdsTc for PusTcSource { } } pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { + let mut scheduler = Arc::new(Mutex::new( + PusScheduler::new_with_current_init_time(Duration::from_secs(5)).unwrap(), + )); + let mut sched_clone = scheduler.clone(); let mut pus_receiver = PusReceiver::new( PUS_APID, tm_args.tm_sink_sender, @@ -161,6 +173,7 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { tc_args.tc_source.clone(), args.event_request_tx, args.request_map, + sched_clone, ); let ccsds_receiver = CcsdsReceiver { tc_source: tc_args.tc_source.clone(), @@ -174,22 +187,48 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { tm_rx: tm_args.tm_server_rx, tm_store: tm_args.tm_store.pool.clone(), }; + + let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| true; + let (mut tc_source, mut tc_receiver) = tc_args.split(); + loop { - core_tmtc_loop(&mut udp_tmtc_server, &mut tc_args, &mut pus_receiver); + let mut tmtc_sched = scheduler.clone(); + core_tmtc_loop( + &mut udp_tmtc_server, + &mut tc_source, + &mut tc_receiver, + &mut pus_receiver, + tmtc_sched, + ); thread::sleep(Duration::from_millis(400)); } } fn core_tmtc_loop( udp_tmtc_server: &mut UdpTmtcServer, - tc_args: &mut TcArgs, + tc_source: &mut PusTcSource, + tc_receiver: &mut Receiver, pus_receiver: &mut PusReceiver, + scheduler: Arc>, ) { + let releaser = |enabled: bool, addr: &StoreAddr| { + tc_source.tc_source.send(*addr); + true + }; + + let mut scheduler = scheduler.lock().expect("Lock of scheduler failed"); + match tc_source.tc_store.pool.write() { + Ok(mut pool) => match scheduler.release_telecommands(releaser, pool.as_mut()) { + Ok(_) => {} + Err(_) => {} + }, + Err(_) => {} + } + while poll_tc_server(udp_tmtc_server) {} - match tc_args.tc_receiver.try_recv() { + match tc_receiver.try_recv() { Ok(addr) => { - let pool = tc_args - .tc_source + let pool = tc_source .tc_store .pool .read() @@ -241,9 +280,6 @@ fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { } } -fn poll_tc_scheduler(scheduler: &mut PusScheduler) { - match scheduler.release_telecommands() -} fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) { while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() { -- 2.43.0 From 8734fa0499506238d9dcd1878bf1fe648f8a3278 Mon Sep 17 00:00:00 2001 From: lkoester Date: Thu, 26 Jan 2023 14:52:51 +0100 Subject: [PATCH 03/11] issues with testing wrapped/unwrapped tc --- satrs-core/src/pus/scheduling.rs | 326 +++++++++++++++++++------------ satrs-example/src/pus.rs | 19 +- satrs-example/src/tmtc.rs | 1 - 3 files changed, 214 insertions(+), 132 deletions(-) diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index 71dee84..6f009b4 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -1,15 +1,52 @@ use crate::pool::{PoolProvider, StoreAddr, StoreError}; use alloc::collections::btree_map::{Entry, Range}; +use core::fmt::{Debug, Display, Formatter}; use core::time::Duration; -use spacepackets::ecss::PusPacket; +use spacepackets::ecss::{PusError, PusPacket}; use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc}; use spacepackets::time::cds::DaysLen24Bits; use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; use std::collections::BTreeMap; +use std::error::Error; use std::time::SystemTimeError; use std::vec; use std::vec::Vec; +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ScheduleError { + PusError(PusError), + TimeMarginTooShort, + NestedScheduledTC, + StoreError(StoreError), + TCDataEmpty, + TimestampError(TimestampError), +} + +impl Display for ScheduleError { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + todo!() + } +} + +impl From for ScheduleError { + fn from(e: PusError) -> Self { + ScheduleError::PusError(e) + } +} + +impl From for ScheduleError { + fn from(e: StoreError) -> Self { + ScheduleError::StoreError(e) + } +} + +impl From for ScheduleError { + fn from(e: TimestampError) -> Self { + ScheduleError::TimestampError(e) + } +} + +impl Error for ScheduleError {} //TODO: Move to spacepackets #[derive(Debug, PartialEq, Copy, Clone)] @@ -19,7 +56,6 @@ pub enum ScheduleSubservice { ResetScheduling = 3, InsertActivity = 4, DeleteActivity = 5, - } #[derive(Debug)] @@ -74,8 +110,17 @@ impl PusScheduler { /// will be returned but the method will still try to delete all the commands in the schedule. pub fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError> { self.enabled = false; + let mut deletion_ok = Ok(()); + for tc_lists in &mut self.tc_map { + for tc in tc_lists.1 { + let res = store.delete(*tc); + if res.is_err() { + deletion_ok = res; + } + } + } self.tc_map.clear(); - return Ok(()) + deletion_ok } pub fn update_time(&mut self, current_time: UnixTimestamp) { @@ -90,9 +135,9 @@ impl PusScheduler { &mut self, time_stamp: UnixTimestamp, addr: StoreAddr, - ) -> bool { + ) -> Result<(), ScheduleError> { if time_stamp < self.current_time + self.time_margin { - return false; + return Err(ScheduleError::TimeMarginTooShort); } match self.tc_map.entry(time_stamp) { Entry::Vacant(e) => { @@ -102,7 +147,7 @@ impl PusScheduler { v.get_mut().push(addr); } } - true + Ok(()) } pub fn insert_unwrapped_tc( @@ -110,57 +155,45 @@ impl PusScheduler { time_stamp: UnixTimestamp, tc: &[u8], pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - let check_tc = PusTc::from_bytes(tc).unwrap(); + ) -> Result { + let check_tc = PusTc::from_bytes(tc)?; if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { // TODO: should not be able to schedule a scheduled tc - return Err(()); + return Err(ScheduleError::NestedScheduledTC); } match pool.add(tc) { Ok(addr) => { - let worked = self.insert_unwrapped_and_stored_tc(time_stamp, addr); - if worked { - return Ok(addr); - } else { - return Err(()); - } + self.insert_unwrapped_and_stored_tc(time_stamp, addr)?; + Ok(addr) } Err(err) => { - return Err(()); + return Err(err.into()); } } } - // insert_wrapped_tc() // (&dyn CcsdsTimeProvider)> pub fn insert_wrapped_tc( &mut self, pus_tc: &PusTc, pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - if PusPacket::service(pus_tc) != 11 || PusPacket::subservice(pus_tc) != 4 { - return Err(()); - } - + ) -> Result { return if let Some(user_data) = pus_tc.user_data() { - let mut stamp: TimeStamp = match TimeReader::from_bytes(user_data) { - Ok(stamp) => stamp, - Err(error) => return Err(()), - }; + let mut stamp: TimeStamp = TimeReader::from_bytes(user_data)?; let unix_stamp = stamp.unix_stamp(); let stamp_len = stamp.len_as_bytes(); self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool) } else { - Err(()) - } + Err(ScheduleError::TCDataEmpty) + }; } pub fn insert_wrapped_tc_cds_short( &mut self, pus_tc: &PusTc, pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { + ) -> Result { self.insert_wrapped_tc::(pus_tc, pool) } @@ -168,7 +201,7 @@ impl PusScheduler { &mut self, pus_tc: &PusTc, pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { + ) -> Result { self.insert_wrapped_tc::>(pus_tc, pool) } @@ -183,7 +216,6 @@ impl PusScheduler { Ok(()) } - /// Utility method which calls [Self::telecommands_to_release] and then calls a releaser /// closure for each telecommand which should be released. This function will also delete /// the telecommands from the holding store after calling the release closure. @@ -223,18 +255,18 @@ impl PusScheduler { #[cfg(test)] mod tests { - use crate::pool::{LocalPool, PoolCfg, StoreAddr}; - use crate::pus::scheduling::PusScheduler; + use crate::pool::{LocalPool, PoolCfg, PoolProvider, StoreAddr}; + use crate::pus::scheduling::{PusScheduler, ScheduleError}; use spacepackets::ecss::PacketTypeCodes::UnsignedInt; + use spacepackets::tc::PusTc; use spacepackets::time::UnixTimestamp; + use spacepackets::{CcsdsPacket, SpHeader}; use std::sync::mpsc; use std::sync::mpsc::{channel, Receiver, TryRecvError}; use std::time::Duration; use std::vec::Vec; use std::{println, vec}; - use heapless::pool::Pool; - use spacepackets::SpHeader; - use spacepackets::tc::PusTc; + use spacepackets::ecss::PusPacket; #[test] fn basic() { @@ -247,28 +279,42 @@ mod tests { #[test] fn reset() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let first_addr = pool.add(&[0, 1, 2]).unwrap(); - let worked = scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr.clone()); - assert!(worked); + let first_addr = pool.add(&[0, 1, 2]).unwrap(); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(100), + first_addr.clone(), + ) + .unwrap(); let second_addr = pool.add(&[2, 3, 4]).unwrap(); - let worked = scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr.clone()); - - assert!(worked); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(200), + second_addr.clone(), + ) + .unwrap(); let third_addr = pool.add(&[5, 6, 7]).unwrap(); - let worked = scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(300), third_addr.clone()); - - assert!(worked); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(300), + third_addr.clone(), + ) + .unwrap(); assert_eq!(scheduler.num_scheduled_telecommands(), 3); assert!(scheduler.is_enabled()); - scheduler.reset().unwrap(); + scheduler.reset(&mut pool).expect("deletion of TCs failed"); assert!(!scheduler.is_enabled()); assert_eq!(scheduler.num_scheduled_telecommands(), 0); + assert!(!pool.has_element_at(&first_addr).unwrap()); + assert!(!pool.has_element_at(&second_addr).unwrap()); + assert!(!pool.has_element_at(&third_addr).unwrap()); } #[test] @@ -276,35 +322,35 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let worked = scheduler.insert_unwrapped_and_stored_tc( - UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 1, - }, - ); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(100), + StoreAddr { + pool_idx: 0, + packet_idx: 1, + }, + ) + .unwrap(); - assert!(worked); + let worked = scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(100), + StoreAddr { + pool_idx: 0, + packet_idx: 2, + }, + ) + .unwrap(); - let worked = scheduler.insert_unwrapped_and_stored_tc( - UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 2, - }, - ); - - assert!(worked); - - let worked = scheduler.insert_unwrapped_and_stored_tc( - UnixTimestamp::new_only_seconds(300), - StoreAddr { - pool_idx: 0, - packet_idx: 2, - }, - ); - - assert!(worked); + let worked = scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(300), + StoreAddr { + pool_idx: 0, + packet_idx: 2, + }, + ) + .unwrap(); assert_eq!(scheduler.num_scheduled_telecommands(), 3); } @@ -318,8 +364,20 @@ mod tests { assert_eq!(scheduler.current_time(), &time); } + fn common_check( + enabled: bool, + store_addr: &StoreAddr, + expected_store_addrs: Vec, + counter: &mut usize, + ) { + assert_eq!(enabled, true); + assert!(expected_store_addrs.contains(store_addr)); + *counter += 1; + } + #[test] fn release_basic() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -331,46 +389,44 @@ mod tests { let mut i = 0; let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| { - assert_eq!(boolvar, true); - assert_eq!( - store_addr, - &StoreAddr { - pool_idx: 0, - packet_idx: 1, - } - ); - i += 1; + common_check(boolvar, store_addr, vec![first_addr], &mut i); + true }; // test 1: too early, no tcs scheduler.update_time(UnixTimestamp::new_only_seconds(99)); - scheduler.release_telecommands(&mut test_closure_1); + scheduler + .release_telecommands(&mut test_closure_1, &mut pool) + .expect("deletion failed"); // test 2: exact time stamp of tc, releases 1 tc scheduler.update_time(UnixTimestamp::new_only_seconds(100)); - scheduler.release_telecommands(&mut test_closure_1); + let mut released = scheduler + .release_telecommands(&mut test_closure_1, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 1); + assert!(!pool.has_element_at(&first_addr).unwrap()); // test 3, late timestamp, release 1 overdue tc let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| { - assert_eq!(boolvar, true); - assert_eq!( - store_addr, - &StoreAddr { - pool_idx: 0, - packet_idx: 2, - } - ); - i += 1; + common_check(boolvar, store_addr, vec![second_addr], &mut i); + true }; scheduler.update_time(UnixTimestamp::new_only_seconds(206)); - scheduler.release_telecommands(&mut test_closure_2); + released = scheduler + .release_telecommands(&mut test_closure_2, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 1); + assert!(!pool.has_element_at(&second_addr).unwrap()); //test 4: no tcs left - scheduler.release_telecommands(&mut test_closure_2); + scheduler + .release_telecommands(&mut test_closure_2, &mut pool) + .expect("deletion failed"); // check that 2 total tcs have been released assert_eq!(i, 2); @@ -378,6 +434,7 @@ mod tests { #[test] fn release_multi_with_same_time() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -389,34 +446,52 @@ mod tests { let mut i = 0; let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { - assert_eq!(boolvar, true); - assert_eq!( - store_addr, - &StoreAddr { - pool_idx: 0, - packet_idx: 1, - } - ); - i += 1; + common_check(boolvar, store_addr, vec![first_addr, second_addr], &mut i); + true }; // test 1: too early, no tcs scheduler.update_time(UnixTimestamp::new_only_seconds(99)); - scheduler.release_telecommands(&mut test_closure); + let mut released = scheduler + .release_telecommands(&mut test_closure, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 0); // test 2: exact time stamp of tc, releases 2 tc scheduler.update_time(UnixTimestamp::new_only_seconds(100)); - scheduler.release_telecommands(&mut test_closure); + released = scheduler + .release_telecommands(&mut test_closure, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 2); + assert!(!pool.has_element_at(&first_addr).unwrap()); + assert!(!pool.has_element_at(&second_addr).unwrap()); //test 3: no tcs left - scheduler.release_telecommands(&mut test_closure); + released = scheduler + .release_telecommands(&mut test_closure, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 0); // check that 2 total tcs have been released assert_eq!(i, 2); } + fn scheduled_tc() -> PusTc<'static> { + let contained_tc = base_ping_tc_simple_ctor(); + let len = contained_tc.total_len() as u16; + let mut sph = SpHeader::tc_unseg(0x02, 0x34, len).unwrap(); + + PusTc::new_simple(&mut sph, 11, 4, contained_tc.raw(), true) + } + + fn base_ping_tc_simple_ctor() -> PusTc<'static> { + let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap(); + PusTc::new_simple(&mut sph, 17, 1, None, true) + } + + /* #[test] fn insert_unwrapped_tc() { let mut scheduler = @@ -424,12 +499,14 @@ mod tests { let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); - let addr = scheduler.insert_unwrapped_tc(UnixTimestamp::new_only_seconds(100), &[1,2,3,4], &mut pool).unwrap(); + + let addr = scheduler.insert_unwrapped_tc(UnixTimestamp::new_only_seconds(100), &[1,2,3], &mut pool).unwrap(); assert_eq!(scheduler.num_scheduled_telecommands(), 1); scheduler.update_time(UnixTimestamp::new_only_seconds(101)); + let mut i = 0; let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { common_check(boolvar, store_addr, vec![addr], &mut i); true @@ -438,10 +515,7 @@ mod tests { scheduler.release_telecommands(&mut test_closure, &mut pool).unwrap(); } - fn scheduled_tc() -> PusTc<'static> { - let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap(); - PusTc::new_simple(&mut sph, 11, 4, None, true) - } + */ #[test] fn insert_wrapped_tc() { @@ -450,27 +524,35 @@ mod tests { let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); - let tc = base_ping_tc_simple_ctor(); + let tc = scheduled_tc(); - let addr = scheduler.insert_wrapped_tc( &tc, &mut pool).unwrap(); + if let Some(data) = tc.user_data() { + + } else { + panic!(); + } + + let addr = scheduler + .insert_wrapped_tc::(&tc, &mut pool) + .unwrap(); assert_eq!(scheduler.num_scheduled_telecommands(), 1); scheduler.update_time(UnixTimestamp::new_only_seconds(101)); + let mut i = 0; let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { common_check(boolvar, store_addr, vec![addr], &mut i); true }; - scheduler.release_telecommands(&mut test_closure, &mut pool).unwrap(); - } - - fn base_ping_tc_simple_ctor() -> PusTc<'static> { - let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap(); - PusTc::new_simple(&mut sph, 17, 1, None, true) + scheduler + .release_telecommands(&mut test_closure, &mut pool) + .unwrap(); } + /* + #[test] fn insert_wrong_subservice() { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -479,8 +561,10 @@ mod tests { let tc = base_ping_tc_simple_ctor(); - let addr = scheduler.insert_wrapped_tc( &tc, &mut pool).unwrap(); + let addr = scheduler.insert_wrapped_tc::( &tc, &mut pool).unwrap(); assert_eq!(scheduler.num_scheduled_telecommands(), 0); } + + */ } diff --git a/satrs-example/src/pus.rs b/satrs-example/src/pus.rs index 0084ca5..9c4dbe3 100644 --- a/satrs-example/src/pus.rs +++ b/satrs-example/src/pus.rs @@ -6,10 +6,12 @@ use satrs_core::pool::{StoreAddr, StoreError}; use satrs_core::pus::event::Subservices; use satrs_core::pus::event_man::{EventRequest, EventRequestWithToken}; use satrs_core::pus::hk; +use satrs_core::pus::scheduling::{PusScheduler, ScheduleSubservice}; use satrs_core::pus::verification::{ FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; use satrs_core::res_code::ResultU16; +use satrs_core::spacepackets::time::{CcsdsTimeProvider, UnixTimestamp}; use satrs_core::tmtc::tm_helper::PusTmWithCdsShortHelper; use satrs_core::tmtc::{AddressableId, PusServiceProvider}; use satrs_core::{ @@ -20,11 +22,8 @@ use satrs_example::{hk_err, tmtc_err}; use std::collections::HashMap; use std::rc::Rc; use std::sync::mpsc::Sender; -use std::time::Duration; -use satrs_core::pus::scheduling::{PusScheduler, ScheduleSubservice}; -use satrs_core::spacepackets::time::{CcsdsTimeProvider, UnixTimestamp}; use std::sync::{Arc, LockResult, Mutex}; - +use std::time::Duration; pub struct PusReceiver { pub tm_helper: PusTmWithCdsShortHelper, @@ -315,7 +314,6 @@ impl PusReceiver { .completion_success(start_token, Some(&self.time_stamp)) .expect("Error sending completion success"); } else { - panic!("Failed to enable scheduler"); } } @@ -331,7 +329,6 @@ impl PusReceiver { .completion_success(start_token, Some(&self.time_stamp)) .expect("Error sending completion success"); } else { - panic!("Failed to disable scheduler"); } } @@ -366,10 +363,12 @@ impl PusReceiver { .expect("Error sending start success"); match self.tc_source.tc_store.pool.write() { Ok(mut pool) => { - scheduler.insert_wrapped_tc::( - pus_tc, - pool.as_mut(), - ); + scheduler + .insert_wrapped_tc::( + pus_tc, + pool.as_mut(), + ) + .expect("TODO: panic message"); } Err(_) => {} } diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 728c19a..1693f7c 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -280,7 +280,6 @@ fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { } } - fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) { while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() { let mut store_lock = udp_tmtc_server -- 2.43.0 From 806ef66eaccd5094db48e560ab9960c7a9381c35 Mon Sep 17 00:00:00 2001 From: lkoester Date: Thu, 26 Jan 2023 15:19:53 +0100 Subject: [PATCH 04/11] quick change to tmtc --- satrs-example/src/tmtc.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 1693f7c..770d88d 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -164,6 +164,7 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { let mut scheduler = Arc::new(Mutex::new( PusScheduler::new_with_current_init_time(Duration::from_secs(5)).unwrap(), )); + let mut sched_clone = scheduler.clone(); let mut pus_receiver = PusReceiver::new( PUS_APID, @@ -175,10 +176,13 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { args.request_map, sched_clone, ); + let ccsds_receiver = CcsdsReceiver { tc_source: tc_args.tc_source.clone(), }; + let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let udp_tc_server = UdpTcServer::new(args.sock_addr, 2048, Box::new(ccsds_distributor)) .expect("Creating UDP TMTC server failed"); -- 2.43.0 From e876433396f48f633f4b97a1e74f79749b4a10aa Mon Sep 17 00:00:00 2001 From: lkoester Date: Thu, 26 Jan 2023 17:34:55 +0100 Subject: [PATCH 05/11] fixed tc creation --- satrs-core/src/pus/scheduling.rs | 61 ++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index 6f009b4..f5d6f27 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -15,7 +15,7 @@ use std::vec::Vec; #[derive(Debug, Clone, PartialEq, Eq)] pub enum ScheduleError { PusError(PusError), - TimeMarginTooShort, + TimeMarginTooShort(UnixTimestamp, UnixTimestamp), NestedScheduledTC, StoreError(StoreError), TCDataEmpty, @@ -24,7 +24,26 @@ pub enum ScheduleError { impl Display for ScheduleError { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { - todo!() + match self { + ScheduleError::PusError(e) => { + write!(f, "Pus Error: {}", e) + } + ScheduleError::TimeMarginTooShort(current_time, timestamp) => { + write!(f, "Error: time margin too short, current time: {:?}, time stamp: {:?}", current_time, timestamp) + } + ScheduleError::NestedScheduledTC => { + write!(f, "Error: nested scheduling is not allowed") + } + ScheduleError::StoreError(e) => { + write!(f, "Store Error: {}", e) + } + ScheduleError::TCDataEmpty => { + write!(f, "Error: empty TC Data field") + } + ScheduleError::TimestampError(e) => { + write!(f, "Timestamp Error: {}", e) + } + } } } @@ -137,7 +156,7 @@ impl PusScheduler { addr: StoreAddr, ) -> Result<(), ScheduleError> { if time_stamp < self.current_time + self.time_margin { - return Err(ScheduleError::TimeMarginTooShort); + return Err(ScheduleError::TimeMarginTooShort(self.current_time, time_stamp)); } match self.tc_map.entry(time_stamp) { Entry::Vacant(e) => { @@ -259,7 +278,7 @@ mod tests { use crate::pus::scheduling::{PusScheduler, ScheduleError}; use spacepackets::ecss::PacketTypeCodes::UnsignedInt; use spacepackets::tc::PusTc; - use spacepackets::time::UnixTimestamp; + use spacepackets::time::{cds, TimeWriter, UnixTimestamp}; use spacepackets::{CcsdsPacket, SpHeader}; use std::sync::mpsc; use std::sync::mpsc::{channel, Receiver, TryRecvError}; @@ -478,12 +497,15 @@ mod tests { assert_eq!(i, 2); } - fn scheduled_tc() -> PusTc<'static> { - let contained_tc = base_ping_tc_simple_ctor(); - let len = contained_tc.total_len() as u16; - let mut sph = SpHeader::tc_unseg(0x02, 0x34, len).unwrap(); + fn scheduled_tc(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc { + let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(×tamp).unwrap(); - PusTc::new_simple(&mut sph, 11, 4, contained_tc.raw(), true) + let len_time_stamp = cds_time.write_to_bytes(buf).unwrap(); + + let len_packet = base_ping_tc_simple_ctor().write_to_bytes(&mut buf[len_time_stamp..]).unwrap(); + let mut sph = SpHeader::tc_unseg(0x02, 0x34, len_packet as u16).unwrap(); + + PusTc::new_simple(&mut sph, 11, 4, Some(&buf[..len_packet +len_time_stamp]), true) } fn base_ping_tc_simple_ctor() -> PusTc<'static> { @@ -524,17 +546,20 @@ mod tests { let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); - let tc = scheduled_tc(); + let mut buf: [u8; 32] = [0; 32]; + let tc = scheduled_tc(UnixTimestamp::new_only_seconds(100), &mut buf); - if let Some(data) = tc.user_data() { + let addr = match scheduler + .insert_wrapped_tc::(&tc, &mut pool) { + Ok(addr) => { + addr + } + Err(e) => { + println!("{}", e); + panic!(); + } + }; - } else { - panic!(); - } - - let addr = scheduler - .insert_wrapped_tc::(&tc, &mut pool) - .unwrap(); assert_eq!(scheduler.num_scheduled_telecommands(), 1); -- 2.43.0 From f215d121cc080e8ef7b23eb15c045769b25d4069 Mon Sep 17 00:00:00 2001 From: lkoester Date: Fri, 27 Jan 2023 11:01:43 +0100 Subject: [PATCH 06/11] got scheduler unit tests to work --- satrs-core/src/pus/scheduling.rs | 235 +++++++++++++++++++++---------- 1 file changed, 162 insertions(+), 73 deletions(-) diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index f07c290..171a2ec 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -1,9 +1,9 @@ //! # PUS Service 11 Scheduling Module use crate::pool::{PoolProvider, StoreAddr, StoreError}; use alloc::collections::btree_map::{Entry, Range}; -use core::fmt::{Debug, Display, Formatter}; use alloc::vec; use alloc::vec::Vec; +use core::fmt::{Debug, Display, Formatter}; use core::time::Duration; use spacepackets::ecss::{PusError, PusPacket}; use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc}; @@ -11,10 +11,9 @@ use spacepackets::time::cds::DaysLen24Bits; use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; use std::collections::BTreeMap; #[cfg(feature = "std")] -use std::time::SystemTimeError; -#[cfg(feature = "std")] use std::error::Error; - +#[cfg(feature = "std")] +use std::time::SystemTimeError; #[derive(Debug, Clone, PartialEq, Eq)] pub enum ScheduleError { @@ -24,6 +23,8 @@ pub enum ScheduleError { StoreError(StoreError), TCDataEmpty, TimestampError(TimestampError), + WrongSubservice, + WrongService, } impl Display for ScheduleError { @@ -33,7 +34,11 @@ impl Display for ScheduleError { write!(f, "Pus Error: {}", e) } ScheduleError::TimeMarginTooShort(current_time, timestamp) => { - write!(f, "Error: time margin too short, current time: {:?}, time stamp: {:?}", current_time, timestamp) + write!( + f, + "Error: time margin too short, current time: {:?}, time stamp: {:?}", + current_time, timestamp + ) } ScheduleError::NestedScheduledTC => { write!(f, "Error: nested scheduling is not allowed") @@ -47,6 +52,12 @@ impl Display for ScheduleError { ScheduleError::TimestampError(e) => { write!(f, "Timestamp Error: {}", e) } + ScheduleError::WrongService => { + write!(f, "Error: Service not 11.") + } + ScheduleError::WrongSubservice => { + write!(f, "Error: Subservice not 4.") + } } } } @@ -70,7 +81,7 @@ impl From for ScheduleError { } #[cfg(feature = "std")] -impl Error for ScheduleError{} +impl Error for ScheduleError {} //TODO: Move to spacepackets #[derive(Debug, PartialEq, Copy, Clone)] @@ -186,7 +197,10 @@ impl PusScheduler { addr: StoreAddr, ) -> Result<(), ScheduleError> { if time_stamp < self.current_time + self.time_margin { - return Err(ScheduleError::TimeMarginTooShort(self.current_time, time_stamp)); + return Err(ScheduleError::TimeMarginTooShort( + self.current_time, + time_stamp, + )); } match self.tc_map.entry(time_stamp) { Entry::Vacant(e) => { @@ -228,6 +242,12 @@ impl PusScheduler { pus_tc: &PusTc, pool: &mut (impl PoolProvider + ?Sized), ) -> Result { + if PusPacket::service(pus_tc) != 11 { + return Err(ScheduleError::WrongService); + } + if PusPacket::subservice(pus_tc) != 4 { + return Err(ScheduleError::WrongSubservice); + } return if let Some(user_data) = pus_tc.user_data() { let mut stamp: TimeStamp = TimeReader::from_bytes(user_data)?; let unix_stamp = stamp.unix_stamp(); @@ -279,7 +299,6 @@ impl PusScheduler { &mut self, mut releaser: R, tc_store: &mut (impl PoolProvider + ?Sized), - ) -> Result { let tcs_to_release = self.telecommands_to_release(); let mut released_tcs = 0; @@ -305,22 +324,23 @@ impl PusScheduler { #[cfg(test)] mod tests { - use alloc::rc::Rc; - use core::borrow::BorrowMut; use crate::pool::{LocalPool, PoolCfg, PoolProvider, StoreAddr}; use crate::pus::scheduling::{PusScheduler, ScheduleError}; + use crate::tmtc::ccsds_distrib::tests::generate_ping_tc; + use alloc::rc::Rc; + use core::borrow::BorrowMut; use spacepackets::ecss::PacketTypeCodes::UnsignedInt; + use spacepackets::ecss::PusPacket; use spacepackets::tc::PusTc; use spacepackets::time::{cds, TimeWriter, UnixTimestamp}; use spacepackets::{CcsdsPacket, SpHeader}; + use std::cell::RefCell; use std::sync::mpsc; use std::sync::mpsc::{channel, Receiver, TryRecvError}; use std::time::Duration; + use std::vec::Vec; #[allow(unused_imports)] use std::{println, vec}; - use std::cell::RefCell; - use spacepackets::ecss::PusPacket; - use std::vec::Vec; #[test] fn basic() { @@ -443,7 +463,6 @@ mod tests { let second_addr = pool.add(&[5, 6, 7]).unwrap(); scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr); - let mut i = 0; let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| { common_check(boolvar, store_addr, vec![first_addr], &mut i); @@ -502,7 +521,6 @@ mod tests { let second_addr = pool.add(&[2, 2, 2]).unwrap(); scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), second_addr); - let mut i = 0; let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { common_check(boolvar, store_addr, vec![first_addr, second_addr], &mut i); @@ -542,10 +560,56 @@ mod tests { let len_time_stamp = cds_time.write_to_bytes(buf).unwrap(); - let len_packet = base_ping_tc_simple_ctor().write_to_bytes(&mut buf[len_time_stamp..]).unwrap(); + let len_packet = base_ping_tc_simple_ctor() + .write_to_bytes(&mut buf[len_time_stamp..]) + .unwrap(); let mut sph = SpHeader::tc_unseg(0x02, 0x34, len_packet as u16).unwrap(); - PusTc::new_simple(&mut sph, 11, 4, Some(&buf[..len_packet +len_time_stamp]), true) + PusTc::new_simple( + &mut sph, + 11, + 4, + Some(&buf[..len_packet + len_time_stamp]), + true, + ) + } + + fn wrong_tc_service(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc { + let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(×tamp).unwrap(); + + let len_time_stamp = cds_time.write_to_bytes(buf).unwrap(); + + let len_packet = base_ping_tc_simple_ctor() + .write_to_bytes(&mut buf[len_time_stamp..]) + .unwrap(); + let mut sph = SpHeader::tc_unseg(0x02, 0x34, len_packet as u16).unwrap(); + + PusTc::new_simple( + &mut sph, + 12, + 4, + Some(&buf[..len_packet + len_time_stamp]), + true, + ) + } + + fn wrong_tc_subservice(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc { + let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(×tamp).unwrap(); + + let len_time_stamp = cds_time.write_to_bytes(buf).unwrap(); + + let len_packet = base_ping_tc_simple_ctor() + .write_to_bytes(&mut buf[len_time_stamp..]) + .unwrap(); + let mut sph = SpHeader::tc_unseg(0x02, 0x34, len_packet as u16).unwrap(); + + PusTc::new_simple( + &mut sph, + 11, + 5, + Some(&buf[..len_packet + len_time_stamp]), + true, + ) } fn base_ping_tc_simple_ctor() -> PusTc<'static> { @@ -553,59 +617,24 @@ mod tests { PusTc::new_simple(&mut sph, 17, 1, None, true) } - /* #[test] fn insert_unwrapped_tc() { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); - - - let addr = scheduler.insert_unwrapped_tc(UnixTimestamp::new_only_seconds(100), &[1,2,3], &mut pool).unwrap(); - - assert_eq!(scheduler.num_scheduled_telecommands(), 1); - - scheduler.update_time(UnixTimestamp::new_only_seconds(101)); - - let mut i = 0; - let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { - common_check(boolvar, store_addr, vec![addr], &mut i); - true - }; - - scheduler.release_telecommands(&mut test_closure, &mut pool).unwrap(); - } - - */ - - #[test] - fn insert_wrapped_tc() { - let mut scheduler = - PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - - let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); - let mut buf: [u8; 32] = [0; 32]; - let tc = scheduled_tc(UnixTimestamp::new_only_seconds(100), &mut buf); + let len = base_ping_tc_simple_ctor().write_to_bytes(&mut buf).unwrap(); - let addr = match scheduler - .insert_wrapped_tc::(&tc, &mut pool) { - Ok(addr) => { - addr - } - Err(e) => { - println!("{}", e); - panic!(); - } - }; + let addr = scheduler + .insert_unwrapped_tc(UnixTimestamp::new_only_seconds(100), &buf[..len], &mut pool) + .unwrap(); assert!(pool.has_element_at(&addr).unwrap()); - println!("test1"); let data = pool.read(&addr).unwrap(); - let check_tc = PusTc::from_bytes(&buf).expect("incorrect Pus tc raw data"); - assert_eq!(&check_tc.0.raw().unwrap()[cds::MIN_CDS_FIELD_LEN..], data); + let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor()); assert_eq!(scheduler.num_scheduled_telecommands(), 1); @@ -618,36 +647,96 @@ mod tests { common_check(boolvar, store_addr, vec![addr], &mut i); // check that tc remains unchanged addr_vec.push(*store_addr); - true + false }; - scheduler - .release_telecommands(&mut test_closure, &mut pool) + .release_telecommands(&mut test_closure, &mut pool) .unwrap(); - println!("test2"); - let data = pool.read(&addr_vec[0]).unwrap(); - let check_tc = PusTc::from_bytes(&buf).expect("incorrect Pus tc raw data"); - assert_eq!(&check_tc.0.raw().unwrap()[cds::MIN_CDS_FIELD_LEN..], data); - + let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor()); } - /* #[test] + fn insert_wrapped_tc() { + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + + let mut buf: [u8; 32] = [0; 32]; + let tc = scheduled_tc(UnixTimestamp::new_only_seconds(100), &mut buf); + + let addr = match scheduler + .insert_wrapped_tc::(&tc, &mut pool) + { + Ok(addr) => addr, + Err(e) => { + println!("{}", e); + panic!(); + } + }; + + assert!(pool.has_element_at(&addr).unwrap()); + + let data = pool.read(&addr).unwrap(); + let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor()); + + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + + scheduler.update_time(UnixTimestamp::new_only_seconds(101)); + + let mut addr_vec = vec::Vec::new(); + + let mut i = 0; + let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { + common_check(boolvar, store_addr, vec![addr], &mut i); + // check that tc remains unchanged + addr_vec.push(*store_addr); + false + }; + + scheduler + .release_telecommands(&mut test_closure, &mut pool) + .unwrap(); + + let data = pool.read(&addr_vec[0]).unwrap(); + let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor()); + } + + #[test] + #[should_panic] + fn insert_wrong_service() { + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + + let mut buf: [u8; 32] = [0; 32]; + let tc = wrong_tc_service(UnixTimestamp::new_only_seconds(100), &mut buf); + + let addr = scheduler + .insert_wrapped_tc::(&tc, &mut pool) + .unwrap(); + } + + #[test] + #[should_panic] fn insert_wrong_subservice() { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); - let tc = base_ping_tc_simple_ctor(); + let mut buf: [u8; 32] = [0; 32]; + let tc = wrong_tc_subservice(UnixTimestamp::new_only_seconds(100), &mut buf); - let addr = scheduler.insert_wrapped_tc::( &tc, &mut pool).unwrap(); - - assert_eq!(scheduler.num_scheduled_telecommands(), 0); + let addr = scheduler + .insert_wrapped_tc::(&tc, &mut pool) + .unwrap(); } - - */ } -- 2.43.0 From a7aad003ab5967b3f90b248a72018356d823d1da Mon Sep 17 00:00:00 2001 From: lkoester Date: Fri, 27 Jan 2023 14:02:46 +0100 Subject: [PATCH 07/11] scheduler implemented in tmtc task --- satrs-core/src/pus/scheduling.rs | 28 ++++++++--- satrs-example/src/pus.rs | 85 +++++++++++++++----------------- satrs-example/src/tmtc.rs | 27 +++++----- 3 files changed, 73 insertions(+), 67 deletions(-) diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index 171a2ec..a5a2083 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -709,7 +709,6 @@ mod tests { } #[test] - #[should_panic] fn insert_wrong_service() { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -719,13 +718,19 @@ mod tests { let mut buf: [u8; 32] = [0; 32]; let tc = wrong_tc_service(UnixTimestamp::new_only_seconds(100), &mut buf); - let addr = scheduler - .insert_wrapped_tc::(&tc, &mut pool) - .unwrap(); + match scheduler.insert_wrapped_tc::(&tc, &mut pool) { + Ok(_) => { + panic!(); + } + Err(e) => { + if e != ScheduleError::WrongService { + panic!(); + } + } + } } #[test] - #[should_panic] fn insert_wrong_subservice() { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -735,8 +740,15 @@ mod tests { let mut buf: [u8; 32] = [0; 32]; let tc = wrong_tc_subservice(UnixTimestamp::new_only_seconds(100), &mut buf); - let addr = scheduler - .insert_wrapped_tc::(&tc, &mut pool) - .unwrap(); + match scheduler.insert_wrapped_tc::(&tc, &mut pool) { + Ok(_) => { + panic!(); + } + Err(e) => { + if e != ScheduleError::WrongSubservice { + panic!(); + } + } + } } } diff --git a/satrs-example/src/pus.rs b/satrs-example/src/pus.rs index c7a8181..d33be86 100644 --- a/satrs-example/src/pus.rs +++ b/satrs-example/src/pus.rs @@ -19,6 +19,7 @@ use satrs_core::{ spacepackets::time::cds::TimeProvider, spacepackets::time::TimeWriter, spacepackets::SpHeader, }; use satrs_example::{hk_err, tmtc_err}; +use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; use std::sync::mpsc::Sender; @@ -36,7 +37,7 @@ pub struct PusReceiver { request_map: HashMap>, stamper: TimeProvider, time_stamp: [u8; 7], - scheduler: Arc>, + scheduler: Rc>, } impl PusReceiver { @@ -48,7 +49,7 @@ impl PusReceiver { tc_source: PusTcSource, event_request_tx: Sender, request_map: HashMap>, - scheduler: Arc>, + scheduler: Rc>, ) -> Self { Self { tm_helper: PusTmWithCdsShortHelper::new(apid), @@ -286,28 +287,16 @@ impl PusReceiver { } fn handle_scheduled_tc(&mut self, pus_tc: &PusTc, token: VerificationToken) { - if pus_tc.user_data().is_none() { - self.update_time_stamp(); - self.verif_reporter - .start_failure( - token, - FailParams::new(Some(&self.time_stamp), &tmtc_err::NOT_ENOUGH_APP_DATA, None), - ) - .expect("Sending start failure TM failed"); - return; - } - self.update_time_stamp(); match pus_tc.subservice() { 1 => { - let mut scheduler = self.scheduler.lock().expect("Lock of scheduler failed"); - let start_token = self .verif_reporter .start_success(token, Some(&self.time_stamp)) .expect("Error sending start success"); + let mut scheduler = self.scheduler.borrow_mut(); scheduler.enable(); if scheduler.is_enabled() { self.verif_reporter @@ -318,11 +307,12 @@ impl PusReceiver { } } 2 => { - let mut scheduler = self.scheduler.lock().expect("Lock of scheduler failed"); let start_token = self .verif_reporter .start_success(token, Some(&self.time_stamp)) .expect("Error sending start success"); + + let mut scheduler = self.scheduler.borrow_mut(); scheduler.disable(); if !scheduler.is_enabled() { self.verif_reporter @@ -333,45 +323,50 @@ impl PusReceiver { } } 3 => { - let mut scheduler = self.scheduler.lock().expect("Lock of scheduler failed"); - let start_token = self .verif_reporter .start_success(token, Some(&self.time_stamp)) .expect("Error sending start success"); - match self.tc_source.tc_store.pool.write() { - Ok(mut pool) => { - match scheduler.reset(pool.as_mut()) { - Ok(_) => { - self.verif_reporter - .completion_success(start_token, Some(&self.time_stamp)) - .expect("Error sending completion success"); - } - Err(_) => { - // TODO - } - } - } - Err(_) => {} - } + + let mut scheduler = self.scheduler.borrow_mut(); + let mut pool = self + .tc_source + .tc_store + .pool + .write() + .expect("Locking pool failed"); + + scheduler + .reset(pool.as_mut()) + .expect("Error resetting TC Pool"); + + self.verif_reporter + .completion_success(start_token, Some(&self.time_stamp)) + .expect("Error sending completion success"); } 4 => { - let mut scheduler = self.scheduler.lock().expect("Lock of scheduler failed"); let start_token = self .verif_reporter .start_success(token, Some(&self.time_stamp)) .expect("Error sending start success"); - match self.tc_source.tc_store.pool.write() { - Ok(mut pool) => { - scheduler - .insert_wrapped_tc::( - pus_tc, - pool.as_mut(), - ) - .expect("TODO: panic message"); - } - Err(_) => {} - } + + let mut scheduler = self.scheduler.borrow_mut(); + let mut pool = self + .tc_source + .tc_store + .pool + .write() + .expect("Locking pool failed"); + scheduler + .insert_wrapped_tc::( + pus_tc, + pool.as_mut(), + ) + .expect("TODO: panic message"); + + self.verif_reporter + .completion_success(start_token, Some(&self.time_stamp)) + .expect("Error sending completion success"); //let addr = self.tc_source.tc_store.add_pus_tc().unwrap(); //let unix_time = UnixTimestamp::new_only_seconds(self.stamper.unix_seconds()); diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 0cf8a43..00cd4f2 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -1,6 +1,7 @@ use satrs_core::events::EventU32; use satrs_core::hal::host::udp_server::{ReceiveResult, UdpTcServer}; use satrs_core::params::Params; +use std::cell::RefCell; use std::collections::HashMap; use std::error::Error; use std::fmt::{Display, Formatter}; @@ -161,7 +162,7 @@ impl ReceivesCcsdsTc for PusTcSource { } } pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { - let mut scheduler = Arc::new(Mutex::new( + let mut scheduler = Rc::new(RefCell::new( PusScheduler::new_with_current_init_time(Duration::from_secs(5)).unwrap(), )); @@ -192,7 +193,6 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { tm_store: tm_args.tm_store.pool.clone(), }; - let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| true; let (mut tc_source, mut tc_receiver) = tc_args.split(); loop { @@ -213,21 +213,20 @@ fn core_tmtc_loop( tc_source: &mut PusTcSource, tc_receiver: &mut Receiver, pus_receiver: &mut PusReceiver, - scheduler: Arc>, + scheduler: Rc>, ) { - let releaser = |enabled: bool, addr: &StoreAddr| { - tc_source.tc_source.send(*addr); - true + let releaser = |enabled: bool, addr: &StoreAddr| -> bool { + match tc_source.tc_source.send(*addr) { + Ok(_) => true, + Err(_) => false, + } }; - let mut scheduler = scheduler.lock().expect("Lock of scheduler failed"); - match tc_source.tc_store.pool.write() { - Ok(mut pool) => match scheduler.release_telecommands(releaser, pool.as_mut()) { - Ok(_) => {} - Err(_) => {} - }, - Err(_) => {} - } + let mut scheduler = scheduler.borrow_mut(); + let mut pool = tc_source.tc_store.pool.write().expect("error locking pool"); + scheduler + .release_telecommands(releaser, pool.as_mut()) + .expect("error releasing tc"); while poll_tc_server(udp_tmtc_server) {} match tc_receiver.try_recv() { -- 2.43.0 From d25f4aad87bcdb84b9d0af268dd2c410cb9be26e Mon Sep 17 00:00:00 2001 From: lkoester Date: Fri, 27 Jan 2023 15:37:28 +0100 Subject: [PATCH 08/11] trying to fix tmtc function --- satrs-example/src/tmtc.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 00cd4f2..67e01e6 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -193,14 +193,15 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { tm_store: tm_args.tm_store.pool.clone(), }; - let (mut tc_source, mut tc_receiver) = tc_args.split(); + //let (mut tc_source, mut tc_receiver) = tc_args.split(); loop { let mut tmtc_sched = scheduler.clone(); core_tmtc_loop( &mut udp_tmtc_server, - &mut tc_source, - &mut tc_receiver, + &mut tc_args, + //&mut tc_source, + //&mut tc_receiver, &mut pus_receiver, tmtc_sched, ); @@ -210,28 +211,29 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { fn core_tmtc_loop( udp_tmtc_server: &mut UdpTmtcServer, - tc_source: &mut PusTcSource, - tc_receiver: &mut Receiver, + tc_args: &mut TcArgs, + //tc_source: &mut PusTcSource, + //tc_receiver: &mut Receiver, pus_receiver: &mut PusReceiver, scheduler: Rc>, ) { let releaser = |enabled: bool, addr: &StoreAddr| -> bool { - match tc_source.tc_source.send(*addr) { + match tc_args.tc_source.tc_source.send(*addr) { Ok(_) => true, Err(_) => false, } }; let mut scheduler = scheduler.borrow_mut(); - let mut pool = tc_source.tc_store.pool.write().expect("error locking pool"); + let mut pool = tc_args.tc_source.tc_store.pool.write().expect("error locking pool"); scheduler .release_telecommands(releaser, pool.as_mut()) .expect("error releasing tc"); while poll_tc_server(udp_tmtc_server) {} - match tc_receiver.try_recv() { + match tc_args.tc_receiver.try_recv() { Ok(addr) => { - let pool = tc_source + let pool = tc_args.tc_source .tc_store .pool .read() -- 2.43.0 From 3fb028a239037bc4b80a4f78c1f29dccbc27d7ba Mon Sep 17 00:00:00 2001 From: lkoester Date: Mon, 30 Jan 2023 09:52:11 +0100 Subject: [PATCH 09/11] small changes to scheduler and main --- satrs-core/src/pus/scheduling.rs | 16 +++++++--------- satrs-example/src/main.rs | 2 -- satrs-example/src/tmtc.rs | 2 ++ 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index a5a2083..9c77e2d 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -19,9 +19,9 @@ use std::time::SystemTimeError; pub enum ScheduleError { PusError(PusError), TimeMarginTooShort(UnixTimestamp, UnixTimestamp), - NestedScheduledTC, + NestedScheduledTc, StoreError(StoreError), - TCDataEmpty, + TcDataEmpty, TimestampError(TimestampError), WrongSubservice, WrongService, @@ -40,14 +40,14 @@ impl Display for ScheduleError { current_time, timestamp ) } - ScheduleError::NestedScheduledTC => { + ScheduleError::NestedScheduledTc => { write!(f, "Error: nested scheduling is not allowed") } ScheduleError::StoreError(e) => { write!(f, "Store Error: {}", e) } - ScheduleError::TCDataEmpty => { - write!(f, "Error: empty TC Data field") + ScheduleError::TcDataEmpty => { + write!(f, "Error: empty Tc Data field") } ScheduleError::TimestampError(e) => { write!(f, "Timestamp Error: {}", e) @@ -134,7 +134,6 @@ impl PusScheduler { } /// Like [Self::new], but sets the `init_current_time` parameter to the current system time. - #[cfg(feature = "std")] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] pub fn new_with_current_init_time(time_margin: Duration) -> Result { @@ -221,8 +220,7 @@ impl PusScheduler { ) -> Result { let check_tc = PusTc::from_bytes(tc)?; if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { - // TODO: should not be able to schedule a scheduled tc - return Err(ScheduleError::NestedScheduledTC); + return Err(ScheduleError::NestedScheduledTc); } match pool.add(tc) { @@ -254,7 +252,7 @@ impl PusScheduler { let stamp_len = stamp.len_as_bytes(); self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool) } else { - Err(ScheduleError::TCDataEmpty) + Err(ScheduleError::TcDataEmpty) }; } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 549ffce..32cc6e6 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -1,5 +1,3 @@ -extern crate core; - mod ccsds; mod hk; mod pus; diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 67e01e6..f653736 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -229,6 +229,7 @@ fn core_tmtc_loop( scheduler .release_telecommands(releaser, pool.as_mut()) .expect("error releasing tc"); + drop(pool); while poll_tc_server(udp_tmtc_server) {} match tc_args.tc_receiver.try_recv() { @@ -263,6 +264,7 @@ fn core_tmtc_loop( } fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { + match udp_tmtc_server.udp_tc_server.try_recv_tc() { Ok(_) => true, Err(e) => match e { -- 2.43.0 From 7569244a9090943f8c636ae79b326ac12a27a498 Mon Sep 17 00:00:00 2001 From: lkoester Date: Mon, 30 Jan 2023 13:49:37 +0100 Subject: [PATCH 10/11] fixed scheduler logic in tmtc loop and pus handler, fixed pyclient implementation of service 11 --- satrs-core/src/pus/scheduling.rs | 86 +++++++++++++++++++++++-- satrs-example/log/tmtc_error.log | 0 satrs-example/pyclient/main.py | 41 ++++++++---- satrs-example/pyclient/requirements.txt | 3 +- satrs-example/src/pus.rs | 27 ++++++-- satrs-example/src/tmtc.rs | 35 +++++++--- 6 files changed, 157 insertions(+), 35 deletions(-) create mode 100644 satrs-example/log/tmtc_error.log diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index 9c77e2d..7c8ce92 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -10,6 +10,7 @@ use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc}; use spacepackets::time::cds::DaysLen24Bits; use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; use std::collections::BTreeMap; +use std::dbg; #[cfg(feature = "std")] use std::error::Error; #[cfg(feature = "std")] @@ -285,13 +286,14 @@ impl PusScheduler { /// Utility method which calls [Self::telecommands_to_release] and then calls a releaser /// closure for each telecommand which should be released. This function will also delete - /// the telecommands from the holding store after calling the release closure. + /// the telecommands from the holding store after calling the release closure, if the scheduler + /// is disabled. /// /// # Arguments /// /// * `releaser` - Closure where the first argument is whether the scheduler is enabled and /// the second argument is the store address. This closure should return whether the - /// command should be deleted. + /// command should be deleted if the scheduler is disabled to prevent memory leaks. /// * `store` - The holding store of the telecommands. pub fn release_telecommands bool>( &mut self, @@ -305,7 +307,7 @@ impl PusScheduler { for addr in tc.1 { let should_delete = releaser(self.enabled, addr); released_tcs += 1; - if should_delete { + if should_delete && !self.is_enabled() { let res = tc_store.delete(*addr); if res.is_err() { store_error = res; @@ -447,6 +449,16 @@ mod tests { assert!(expected_store_addrs.contains(store_addr)); *counter += 1; } + fn common_check_disabled( + enabled: bool, + store_addr: &StoreAddr, + expected_store_addrs: Vec, + counter: &mut usize, + ) { + assert_eq!(enabled, false); + assert!(expected_store_addrs.contains(store_addr)); + *counter += 1; + } #[test] fn release_basic() { @@ -481,7 +493,7 @@ mod tests { .release_telecommands(&mut test_closure_1, &mut pool) .expect("deletion failed"); assert_eq!(released, 1); - assert!(!pool.has_element_at(&first_addr).unwrap()); + assert!(pool.has_element_at(&first_addr).unwrap()); // test 3, late timestamp, release 1 overdue tc let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| { @@ -495,7 +507,7 @@ mod tests { .release_telecommands(&mut test_closure_2, &mut pool) .expect("deletion failed"); assert_eq!(released, 1); - assert!(!pool.has_element_at(&second_addr).unwrap()); + assert!(pool.has_element_at(&second_addr).unwrap()); //test 4: no tcs left scheduler @@ -540,8 +552,8 @@ mod tests { .release_telecommands(&mut test_closure, &mut pool) .expect("deletion failed"); assert_eq!(released, 2); - assert!(!pool.has_element_at(&first_addr).unwrap()); - assert!(!pool.has_element_at(&second_addr).unwrap()); + assert!(pool.has_element_at(&first_addr).unwrap()); + assert!(pool.has_element_at(&second_addr).unwrap()); //test 3: no tcs left released = scheduler @@ -553,6 +565,66 @@ mod tests { assert_eq!(i, 2); } + #[test] + fn release_with_scheduler_disabled() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + scheduler.disable(); + + let first_addr = pool.add(&[2, 2, 2]).unwrap(); + + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr); + + let second_addr = pool.add(&[5, 6, 7]).unwrap(); + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr); + + let mut i = 0; + let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| { + common_check_disabled(boolvar, store_addr, vec![first_addr], &mut i); + true + }; + + // test 1: too early, no tcs + scheduler.update_time(UnixTimestamp::new_only_seconds(99)); + + scheduler + .release_telecommands(&mut test_closure_1, &mut pool) + .expect("deletion failed"); + + // test 2: exact time stamp of tc, releases 1 tc + scheduler.update_time(UnixTimestamp::new_only_seconds(100)); + + let mut released = scheduler + .release_telecommands(&mut test_closure_1, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 1); + assert!(!pool.has_element_at(&first_addr).unwrap()); + + // test 3, late timestamp, release 1 overdue tc + let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| { + common_check_disabled(boolvar, store_addr, vec![second_addr], &mut i); + true + }; + + scheduler.update_time(UnixTimestamp::new_only_seconds(206)); + + released = scheduler + .release_telecommands(&mut test_closure_2, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 1); + assert!(!pool.has_element_at(&second_addr).unwrap()); + + //test 4: no tcs left + scheduler + .release_telecommands(&mut test_closure_2, &mut pool) + .expect("deletion failed"); + + // check that 2 total tcs have been released + assert_eq!(i, 2); + } + fn scheduled_tc(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc { let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(×tamp).unwrap(); diff --git a/satrs-example/log/tmtc_error.log b/satrs-example/log/tmtc_error.log new file mode 100644 index 0000000..e69de29 diff --git a/satrs-example/pyclient/main.py b/satrs-example/pyclient/main.py index c06f9ed..3d449b2 100755 --- a/satrs-example/pyclient/main.py +++ b/satrs-example/pyclient/main.py @@ -5,18 +5,21 @@ import struct import sys import time from typing import Optional +import datetime import tmtccmd from spacepackets.ecss import PusTelemetry, PusTelecommand, PusVerificator from spacepackets.ecss.pus_17_test import Service17Tm from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm +from spacepackets.ccsds.time import CdsShortTimestamp from tmtccmd import CcsdsTmtcBackend, TcHandlerBase, ProcedureParamsWrapper from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid +from tmtccmd.tc.pus_11_tc_sched import create_time_tagged_cmd from tmtccmd.core.base import BackendRequest from tmtccmd.pus import VerificationWrapper from tmtccmd.tm import CcsdsTmHandler, SpecificApidHandlerBase -from tmtccmd.com_if import ComInterface +from tmtccmd.com import ComInterface from tmtccmd.config import ( default_json_path, SetupParams, @@ -41,7 +44,7 @@ from tmtccmd.tc import ( SendCbParams, DefaultPusQueueHelper, ) -from tmtccmd.tm.pus_5_event import Service5Tm +from tmtccmd.tm.pus_5_fsfw_event import Service5Tm from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.util.obj_id import ObjectIdDictT @@ -57,7 +60,7 @@ class SatRsConfigHook(TmTcCfgHookBase): super().__init__(json_cfg_path=json_cfg_path) def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: - from tmtccmd.config.com_if import ( + from tmtccmd.config.com import ( create_com_interface_default, create_com_interface_cfg_default, ) @@ -94,6 +97,13 @@ class SatRsConfigHook(TmTcCfgHookBase): info="PUS Service 3 Housekeeping", op_code_entry=srv_3 ) + srv_11 = OpCodeEntry() + srv_11.add("0", "Scheduled TC Test") + defs.add_service( + name=CoreServiceList.SERVICE_11, + info="PUS Service 11 TC Scheduling", + op_code_entry=srv_11, + ) return defs def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int): @@ -120,7 +130,7 @@ class PusHandler(SpecificApidHandlerBase): def handle_tm(self, packet: bytes, _user_args: any): try: - tm_packet = PusTelemetry.unpack(packet) + tm_packet = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) except ValueError as e: LOGGER.warning("Could not generate PUS TM object from raw data") LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") @@ -128,7 +138,7 @@ class PusHandler(SpecificApidHandlerBase): service = tm_packet.service dedicated_handler = False if service == 1: - tm_packet = Service1Tm.unpack(data=packet, params=UnpackParams(1, 2)) + tm_packet = Service1Tm.unpack(data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2)) res = self.verif_wrapper.add_tm(tm_packet) if res is None: LOGGER.info( @@ -145,16 +155,16 @@ class PusHandler(SpecificApidHandlerBase): if service == 3: LOGGER.info("No handling for HK packets implemented") LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]") - pus_tm = PusTelemetry.unpack(packet) + pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) if pus_tm.subservice == 25: if len(pus_tm.source_data) < 8: raise ValueError("No addressable ID in HK packet") json_str = pus_tm.source_data[8:] dedicated_handler = True if service == 5: - tm_packet = Service5Tm.unpack(packet) + tm_packet = Service5Tm.unpack(packet, time_reader=CdsShortTimestamp.empty()) if service == 17: - tm_packet = Service17Tm.unpack(packet) + tm_packet = Service17Tm.unpack(packet, time_reader=CdsShortTimestamp.empty()) dedicated_handler = True if tm_packet.subservice == 2: self.printer.file_logger.info("Received Ping Reply TM[17,2]") @@ -205,7 +215,10 @@ class TcHandler(TcHandlerBase): self.verif_wrapper = verif_wrapper self.queue_helper = DefaultPusQueueHelper( queue_wrapper=None, + tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, seq_cnt_provider=seq_count_provider, + pus_verificator=self.verif_wrapper.pus_verificator, + default_pus_apid=EXAMPLE_PUS_APID ) def send_cb(self, send_params: SendCbParams): @@ -213,10 +226,6 @@ class TcHandler(TcHandlerBase): if entry_helper.is_tc: if entry_helper.entry_type == TcQueueEntryType.PUS_TC: pus_tc_wrapper = entry_helper.to_pus_tc_entry() - pus_tc_wrapper.pus_tc.seq_count = ( - self.seq_count_provider.get_and_increment() - ) - self.verif_wrapper.add_tc(pus_tc_wrapper.pus_tc) raw_tc = pus_tc_wrapper.pus_tc.pack() LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}") send_params.com_if.send(raw_tc) @@ -247,6 +256,14 @@ class TcHandler(TcHandlerBase): return q.add_pus_tc( PusTelecommand(service=17, subservice=1) ) + if service == CoreServiceList.SERVICE_11: + q.add_log_cmd("Sending PUS scheduled TC telecommand") + crt_time = CdsShortTimestamp.from_now() + time_stamp = crt_time + datetime.timedelta(seconds=10) + time_stamp = time_stamp.pack() + return q.add_pus_tc( + create_time_tagged_cmd(time_stamp, PusTelecommand(service=17, subservice=1), apid=EXAMPLE_PUS_APID) + ) if service == CoreServiceList.SERVICE_3: if op_code in HkOpCodes.GENERATE_ONE_SHOT: q.add_log_cmd("Sending HK one shot request") diff --git a/satrs-example/pyclient/requirements.txt b/satrs-example/pyclient/requirements.txt index 8e9634c..06fc287 100644 --- a/satrs-example/pyclient/requirements.txt +++ b/satrs-example/pyclient/requirements.txt @@ -1 +1,2 @@ -tmtccmd == 3.0.0 +# tmtccmd == 4.0.0a2 +-e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd diff --git a/satrs-example/src/pus.rs b/satrs-example/src/pus.rs index d33be86..c4d0e9a 100644 --- a/satrs-example/src/pus.rs +++ b/satrs-example/src/pus.rs @@ -287,8 +287,18 @@ impl PusReceiver { } fn handle_scheduled_tc(&mut self, pus_tc: &PusTc, token: VerificationToken) { - self.update_time_stamp(); + if pus_tc.user_data().is_none() { + self.update_time_stamp(); + self.verif_reporter + .start_failure( + token, + FailParams::new(Some(&self.time_stamp), &tmtc_err::NOT_ENOUGH_APP_DATA, None), + ) + .expect("Sending start failure TM failed"); + return; + } + self.update_time_stamp(); match pus_tc.subservice() { 1 => { let start_token = self @@ -305,6 +315,7 @@ impl PusReceiver { } else { panic!("Failed to enable scheduler"); } + drop(scheduler); } 2 => { let start_token = self @@ -321,6 +332,7 @@ impl PusReceiver { } else { panic!("Failed to disable scheduler"); } + drop(scheduler); } 3 => { let start_token = self @@ -328,7 +340,6 @@ impl PusReceiver { .start_success(token, Some(&self.time_stamp)) .expect("Error sending start success"); - let mut scheduler = self.scheduler.borrow_mut(); let mut pool = self .tc_source .tc_store @@ -336,9 +347,11 @@ impl PusReceiver { .write() .expect("Locking pool failed"); + let mut scheduler = self.scheduler.borrow_mut(); scheduler .reset(pool.as_mut()) .expect("Error resetting TC Pool"); + drop(scheduler); self.verif_reporter .completion_success(start_token, Some(&self.time_stamp)) @@ -350,19 +363,19 @@ impl PusReceiver { .start_success(token, Some(&self.time_stamp)) .expect("Error sending start success"); - let mut scheduler = self.scheduler.borrow_mut(); let mut pool = self .tc_source .tc_store .pool .write() .expect("Locking pool failed"); + let mut scheduler = self.scheduler.borrow_mut(); scheduler - .insert_wrapped_tc::( - pus_tc, - pool.as_mut(), - ) + .insert_wrapped_tc::(pus_tc, pool.as_mut()) .expect("TODO: panic message"); + let time = + TimeProvider::from_bytes_with_u16_days(&pus_tc.user_data().unwrap()).unwrap(); + drop(scheduler); self.verif_reporter .completion_success(start_token, Some(&self.time_stamp)) diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index f653736..bd0ab0b 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -195,11 +195,13 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { //let (mut tc_source, mut tc_receiver) = tc_args.split(); + let mut tc_buf: [u8; 4096] = [0; 4096]; loop { let mut tmtc_sched = scheduler.clone(); core_tmtc_loop( &mut udp_tmtc_server, &mut tc_args, + &mut tc_buf, //&mut tc_source, //&mut tc_receiver, &mut pus_receiver, @@ -212,6 +214,7 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { fn core_tmtc_loop( udp_tmtc_server: &mut UdpTmtcServer, tc_args: &mut TcArgs, + tc_buf: &mut [u8], //tc_source: &mut PusTcSource, //tc_receiver: &mut Receiver, pus_receiver: &mut PusReceiver, @@ -224,23 +227,40 @@ fn core_tmtc_loop( } }; + let mut pool = tc_args + .tc_source + .tc_store + .pool + .write() + .expect("error locking pool"); + let mut scheduler = scheduler.borrow_mut(); - let mut pool = tc_args.tc_source.tc_store.pool.write().expect("error locking pool"); - scheduler - .release_telecommands(releaser, pool.as_mut()) - .expect("error releasing tc"); + scheduler.update_time_from_now().unwrap(); + match scheduler.release_telecommands(releaser, pool.as_mut()) { + Ok(released_tcs) => { + if released_tcs > 0 { + println!("{} Tc(s) released from scheduler", released_tcs); + } + } + Err(_) => {} + } + //.expect("error releasing tc"); drop(pool); + drop(scheduler); while poll_tc_server(udp_tmtc_server) {} match tc_args.tc_receiver.try_recv() { Ok(addr) => { - let pool = tc_args.tc_source + let pool = tc_args + .tc_source .tc_store .pool .read() .expect("locking tc pool failed"); let data = pool.read(&addr).expect("reading pool failed"); - match PusTc::from_bytes(data) { + tc_buf[0..data.len()].copy_from_slice(data); + drop(pool); + match PusTc::from_bytes(tc_buf) { Ok((pus_tc, _)) => { pus_receiver .handle_pus_tc_packet(pus_tc.service(), pus_tc.sp_header(), &pus_tc) @@ -248,7 +268,7 @@ fn core_tmtc_loop( } Err(e) => { println!("error creating PUS TC from raw data: {e}"); - println!("raw data: {data:x?}"); + println!("raw data: {tc_buf:x?}"); } } } @@ -264,7 +284,6 @@ fn core_tmtc_loop( } fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { - match udp_tmtc_server.udp_tc_server.try_recv_tc() { Ok(_) => true, Err(e) => match e { -- 2.43.0 From c2768a2735e259eac74bd6c44fc9d55bd0fb0265 Mon Sep 17 00:00:00 2001 From: lkoester Date: Wed, 1 Feb 2023 10:27:12 +0100 Subject: [PATCH 11/11] added requested changes and used clippy to clean up code --- satrs-core/src/pus/scheduling.rs | 11 ++++------- satrs-example/log/tmtc_error.log | 0 satrs-example/src/pus.rs | 18 ++++-------------- satrs-example/src/tmtc.rs | 19 ++++--------------- 4 files changed, 12 insertions(+), 36 deletions(-) delete mode 100644 satrs-example/log/tmtc_error.log diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index 7c8ce92..4b41d47 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -6,11 +6,10 @@ use alloc::vec::Vec; use core::fmt::{Debug, Display, Formatter}; use core::time::Duration; use spacepackets::ecss::{PusError, PusPacket}; -use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc}; +use spacepackets::tc::PusTc; use spacepackets::time::cds::DaysLen24Bits; use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; use std::collections::BTreeMap; -use std::dbg; #[cfg(feature = "std")] use std::error::Error; #[cfg(feature = "std")] @@ -85,7 +84,7 @@ impl From for ScheduleError { impl Error for ScheduleError {} //TODO: Move to spacepackets -#[derive(Debug, PartialEq, Copy, Clone)] +#[derive(Debug, PartialEq, Eq, Copy, Clone)] pub enum ScheduleSubservice { EnableScheduling = 1, DisableScheduling = 2, @@ -229,9 +228,7 @@ impl PusScheduler { self.insert_unwrapped_and_stored_tc(time_stamp, addr)?; Ok(addr) } - Err(err) => { - return Err(err.into()); - } + Err(err) => Err(err.into()), } } @@ -248,7 +245,7 @@ impl PusScheduler { return Err(ScheduleError::WrongSubservice); } return if let Some(user_data) = pus_tc.user_data() { - let mut stamp: TimeStamp = TimeReader::from_bytes(user_data)?; + let stamp: TimeStamp = TimeReader::from_bytes(user_data)?; let unix_stamp = stamp.unix_stamp(); let stamp_len = stamp.len_as_bytes(); self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool) diff --git a/satrs-example/log/tmtc_error.log b/satrs-example/log/tmtc_error.log deleted file mode 100644 index e69de29..0000000 diff --git a/satrs-example/src/pus.rs b/satrs-example/src/pus.rs index c4d0e9a..a889704 100644 --- a/satrs-example/src/pus.rs +++ b/satrs-example/src/pus.rs @@ -2,29 +2,26 @@ use crate::hk::{CollectionIntervalFactor, HkRequest}; use crate::requests::{Request, RequestWithToken}; use crate::tmtc::{PusTcSource, TmStore}; use satrs_core::events::EventU32; -use satrs_core::pool::{StoreAddr, StoreError}; +use satrs_core::pool::StoreAddr; use satrs_core::pus::event::Subservices; use satrs_core::pus::event_man::{EventRequest, EventRequestWithToken}; use satrs_core::pus::hk; -use satrs_core::pus::scheduling::{PusScheduler, ScheduleSubservice}; +use satrs_core::pus::scheduling::PusScheduler; use satrs_core::pus::verification::{ FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; use satrs_core::res_code::ResultU16; -use satrs_core::spacepackets::time::{CcsdsTimeProvider, UnixTimestamp}; use satrs_core::tmtc::tm_helper::PusTmWithCdsShortHelper; use satrs_core::tmtc::{AddressableId, PusServiceProvider}; use satrs_core::{ - spacepackets, spacepackets::ecss::PusPacket, spacepackets::tc::PusTc, - spacepackets::time::cds::TimeProvider, spacepackets::time::TimeWriter, spacepackets::SpHeader, + spacepackets::ecss::PusPacket, spacepackets::tc::PusTc, spacepackets::time::cds::TimeProvider, + spacepackets::time::TimeWriter, spacepackets::SpHeader, }; use satrs_example::{hk_err, tmtc_err}; use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; use std::sync::mpsc::Sender; -use std::sync::{Arc, LockResult, Mutex}; -use std::time::Duration; pub struct PusReceiver { pub tm_helper: PusTmWithCdsShortHelper, @@ -373,17 +370,11 @@ impl PusReceiver { scheduler .insert_wrapped_tc::(pus_tc, pool.as_mut()) .expect("TODO: panic message"); - let time = - TimeProvider::from_bytes_with_u16_days(&pus_tc.user_data().unwrap()).unwrap(); drop(scheduler); self.verif_reporter .completion_success(start_token, Some(&self.time_stamp)) .expect("Error sending completion success"); - - //let addr = self.tc_source.tc_store.add_pus_tc().unwrap(); - //let unix_time = UnixTimestamp::new_only_seconds(self.stamper.unix_seconds()); - //let worked = self.scheduler.insert_tc(unix_time, ); } _ => { self.verif_reporter @@ -396,7 +387,6 @@ impl PusReceiver { ), ) .expect("Sending start failure TM failed"); - return; } } } diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index bd0ab0b..3f5142a 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -8,7 +8,6 @@ use std::fmt::{Display, Formatter}; use std::net::SocketAddr; use std::rc::Rc; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; -use std::sync::{Arc, LockResult, Mutex}; use std::thread; use std::time::Duration; @@ -162,11 +161,11 @@ impl ReceivesCcsdsTc for PusTcSource { } } pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { - let mut scheduler = Rc::new(RefCell::new( + let scheduler = Rc::new(RefCell::new( PusScheduler::new_with_current_init_time(Duration::from_secs(5)).unwrap(), )); - let mut sched_clone = scheduler.clone(); + let sched_clone = scheduler.clone(); let mut pus_receiver = PusReceiver::new( PUS_APID, tm_args.tm_sink_sender, @@ -193,17 +192,13 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { tm_store: tm_args.tm_store.pool.clone(), }; - //let (mut tc_source, mut tc_receiver) = tc_args.split(); - let mut tc_buf: [u8; 4096] = [0; 4096]; loop { - let mut tmtc_sched = scheduler.clone(); + let tmtc_sched = scheduler.clone(); core_tmtc_loop( &mut udp_tmtc_server, &mut tc_args, &mut tc_buf, - //&mut tc_source, - //&mut tc_receiver, &mut pus_receiver, tmtc_sched, ); @@ -215,16 +210,11 @@ fn core_tmtc_loop( udp_tmtc_server: &mut UdpTmtcServer, tc_args: &mut TcArgs, tc_buf: &mut [u8], - //tc_source: &mut PusTcSource, - //tc_receiver: &mut Receiver, pus_receiver: &mut PusReceiver, scheduler: Rc>, ) { let releaser = |enabled: bool, addr: &StoreAddr| -> bool { - match tc_args.tc_source.tc_source.send(*addr) { - Ok(_) => true, - Err(_) => false, - } + tc_args.tc_source.tc_source.send(*addr).is_ok() }; let mut pool = tc_args @@ -244,7 +234,6 @@ fn core_tmtc_loop( } Err(_) => {} } - //.expect("error releasing tc"); drop(pool); drop(scheduler); -- 2.43.0