diff --git a/satrs-core/Cargo.toml b/satrs-core/Cargo.toml index 2215dce..5f728ee 100644 --- a/satrs-core/Cargo.toml +++ b/satrs-core/Cargo.toml @@ -53,7 +53,7 @@ default-features = false optional = true [dependencies.spacepackets] -version = "0.5.3" +version = "0.5.4" # path = "../spacepackets" # git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" # rev = "" diff --git a/satrs-core/src/lib.rs b/satrs-core/src/lib.rs index f8b3b31..39d0bca 100644 --- a/satrs-core/src/lib.rs +++ b/satrs-core/src/lib.rs @@ -1,4 +1,4 @@ -//! # Core components of the Flight Software Rust Crate (FSRC) collection +//! # Core components of the sat-rs framework //! //! This is a collection of Rust crates which can be used to build On-Board Software for remote //! systems like satellites of rovers. It has special support for space tailored protocols diff --git a/satrs-core/src/pool.rs b/satrs-core/src/pool.rs index 9814be6..3e276bf 100644 --- a/satrs-core/src/pool.rs +++ b/satrs-core/src/pool.rs @@ -79,6 +79,8 @@ use alloc::vec; use alloc::vec::Vec; use core::fmt::{Display, Formatter}; use delegate::delegate; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; #[cfg(feature = "std")] use std::boxed::Box; #[cfg(feature = "std")] @@ -130,6 +132,7 @@ pub struct LocalPool { /// Simple address type used for transactions with the local pool. #[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct StoreAddr { pub(crate) pool_idx: u16, pub(crate) packet_idx: NumBlocks, @@ -144,6 +147,7 @@ impl StoreAddr { } #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum StoreIdError { InvalidSubpool(u16), InvalidPacketIdx(u16), @@ -166,6 +170,7 @@ impl Display for StoreIdError { impl Error for StoreIdError {} #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum StoreError { /// Requested data block is too large DataTooLarge(usize), @@ -252,6 +257,14 @@ pub trait PoolProvider { /// Delete data inside the pool given a [StoreAddr] fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError>; fn has_element_at(&self, addr: &StoreAddr) -> Result; + + /// Retrieve the length of the data at the given store address. + fn len_of_data(&self, addr: &StoreAddr) -> Result { + if !self.has_element_at(addr)? { + return Err(StoreError::DataDoesNotExist(*addr)); + } + return Ok(self.read(addr)?.len()); + } } impl LocalPool { diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index 22f9160..63d827f 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -1,21 +1,83 @@ //! # PUS Service 11 Scheduling Module +//! +//! The core data structure of this module is the [PusScheduler]. This structure can be used +//! to perform the scheduling of telecommands like specified in the ECSS standard. use crate::pool::{PoolProvider, StoreAddr, StoreError}; use alloc::collections::btree_map::{Entry, Range}; use alloc::vec; use alloc::vec::Vec; use core::fmt::{Debug, Display, Formatter}; use core::time::Duration; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; +use spacepackets::ecss::scheduling::TimeWindowType; use spacepackets::ecss::{PusError, PusPacket}; -use spacepackets::tc::PusTc; +use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc}; use spacepackets::time::cds::DaysLen24Bits; -use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; +use spacepackets::time::{cds, CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; +use spacepackets::CcsdsPacket; use std::collections::BTreeMap; #[cfg(feature = "std")] use std::error::Error; #[cfg(feature = "std")] use std::time::SystemTimeError; +/// This is the request ID as specified in ECSS-E-ST-70-41C 5.4.11.2 of the standard. +/// +/// This version of the request ID is used to identify scheduled commands and also contains +/// the source ID found in the secondary header of PUS telecommands. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct RequestId { + pub(crate) source_id: u16, + pub(crate) apid: u16, + pub(crate) seq_count: u16, +} + +impl RequestId { + pub fn source_id(&self) -> u16 { + self.source_id + } + + pub fn apid(&self) -> u16 { + self.apid + } + + pub fn seq_count(&self) -> u16 { + self.seq_count + } + + pub fn from_tc(tc: &PusTc) -> Self { + RequestId { + source_id: tc.source_id(), + apid: tc.apid(), + seq_count: tc.seq_count(), + } + } + + pub fn as_u64(&self) -> u64 { + ((self.source_id as u64) << 32) | ((self.apid as u64) << 16) | self.seq_count as u64 + } + + /* + pub fn from_bytes(buf: &[u8]) -> Result { + if buf.len() < core::mem::size_of::() { + return Err(ByteConversionError::FromSliceTooSmall(SizeMissmatch { + found: buf.len(), + expected: core::mem::size_of::(), + })); + } + Ok(Self { + source_id: u16::from_be_bytes(buf[0..2].try_into().unwrap()), + apid: u16::from_be_bytes(buf[2..4].try_into().unwrap()), + seq_count: u16::from_be_bytes(buf[4..6].try_into().unwrap()), + }) + } + */ +} + #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum ScheduleError { PusError(PusError), /// The release time is within the time-margin added on top of the current time. @@ -35,7 +97,7 @@ impl Display for ScheduleError { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { match self { ScheduleError::PusError(e) => { - write!(f, "Pus Error: {}", e) + write!(f, "Pus Error: {e}") } ScheduleError::ReleaseTimeInTimeMargin(current_time, margin, timestamp) => { write!( @@ -47,13 +109,13 @@ impl Display for ScheduleError { write!(f, "Error: nested scheduling is not allowed") } ScheduleError::StoreError(e) => { - write!(f, "Store Error: {}", e) + write!(f, "Store Error: {e}") } ScheduleError::TcDataEmpty => { write!(f, "Error: empty Tc Data field") } ScheduleError::TimestampError(e) => { - write!(f, "Timestamp Error: {}", e) + write!(f, "Timestamp Error: {e}") } ScheduleError::WrongService => { write!(f, "Error: Service not 11.") @@ -86,6 +148,29 @@ impl From for ScheduleError { #[cfg(feature = "std")] impl Error for ScheduleError {} +/// This is the format stored internally by the TC scheduler for each scheduled telecommand. +/// It consists of the address of that telecommand in the TC pool and a request ID. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct TcInfo { + addr: StoreAddr, + request_id: RequestId, +} + +impl TcInfo { + pub fn addr(&self) -> StoreAddr { + self.addr + } + + pub fn request_id(&self) -> RequestId { + self.request_id + } + + pub fn new(addr: StoreAddr, request_id: RequestId) -> Self { + TcInfo { addr, request_id } + } +} + /// This is the core data structure for scheduling PUS telecommands with [alloc] support. /// /// It is assumed that the actual telecommand data is stored in a separate TC pool offering @@ -99,15 +184,81 @@ impl Error for ScheduleError {} /// A disabled scheduler should still delete commands where the execution time has been reached /// but should not release them to be executed. /// +/// The implementation uses an ordered map internally with the release timestamp being the key. +/// This allows efficient time based insertions and extractions which should be the primary use-case +/// for a time-based command scheduler. +/// There is no way to avoid duplicate [RequestId]s during insertion, which can occur even if the +/// user always correctly increment for sequence counter due to overflows. To avoid this issue, +/// it can make sense to split up telecommand groups by the APID to avoid overflows. +/// /// Currently, sub-schedules and groups are not supported. #[derive(Debug)] pub struct PusScheduler { - tc_map: BTreeMap>, + tc_map: BTreeMap>, current_time: UnixTimestamp, time_margin: Duration, enabled: bool, } +enum DeletionResult { + WithoutStoreDeletion(Option), + WithStoreDeletion(Result), +} + +pub struct TimeWindow { + time_window_type: TimeWindowType, + start_time: Option, + end_time: Option, +} + +impl TimeWindow { + pub fn new_select_all() -> Self { + Self { + time_window_type: TimeWindowType::SelectAll, + start_time: None, + end_time: None, + } + } + + pub fn time_window_type(&self) -> TimeWindowType { + self.time_window_type + } + + pub fn start_time(&self) -> Option<&TimeProvider> { + self.start_time.as_ref() + } + + pub fn end_time(&self) -> Option<&TimeProvider> { + self.end_time.as_ref() + } +} + +impl TimeWindow { + pub fn new_from_time_to_time(start_time: &TimeProvider, end_time: &TimeProvider) -> Self { + Self { + time_window_type: TimeWindowType::TimeTagToTimeTag, + start_time: Some(start_time.clone()), + end_time: Some(end_time.clone()), + } + } + + pub fn new_from_time(start_time: &TimeProvider) -> Self { + Self { + time_window_type: TimeWindowType::FromTimeTag, + start_time: Some(start_time.clone()), + end_time: None, + } + } + + pub fn new_to_time(end_time: &TimeProvider) -> Self { + Self { + time_window_type: TimeWindowType::ToTimeTag, + start_time: None, + end_time: Some(end_time.clone()), + } + } +} + impl PusScheduler { /// Create a new PUS scheduler. /// @@ -166,7 +317,7 @@ impl PusScheduler { let mut deletion_ok = Ok(()); for tc_lists in &mut self.tc_map { for tc in tc_lists.1 { - let res = store.delete(*tc); + let res = store.delete(tc.addr); if res.is_err() { deletion_ok = res; } @@ -189,7 +340,7 @@ impl PusScheduler { pub fn insert_unwrapped_and_stored_tc( &mut self, time_stamp: UnixTimestamp, - addr: StoreAddr, + info: TcInfo, ) -> Result<(), ScheduleError> { if time_stamp < self.current_time + self.time_margin { return Err(ScheduleError::ReleaseTimeInTimeMargin( @@ -200,10 +351,10 @@ impl PusScheduler { } match self.tc_map.entry(time_stamp) { Entry::Vacant(e) => { - e.insert(vec![addr]); + e.insert(vec![info]); } Entry::Occupied(mut v) => { - v.get_mut().push(addr); + v.get_mut().push(info); } } Ok(()) @@ -216,16 +367,18 @@ impl PusScheduler { time_stamp: UnixTimestamp, tc: &[u8], pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { + ) -> Result { let check_tc = PusTc::from_bytes(tc)?; if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { return Err(ScheduleError::NestedScheduledTc); } + let req_id = RequestId::from_tc(&check_tc.0); match pool.add(tc) { Ok(addr) => { - self.insert_unwrapped_and_stored_tc(time_stamp, addr)?; - Ok(addr) + let info = TcInfo::new(addr, req_id); + self.insert_unwrapped_and_stored_tc(time_stamp, info)?; + Ok(info) } Err(err) => Err(err.into()), } @@ -237,7 +390,7 @@ impl PusScheduler { &mut self, pus_tc: &PusTc, pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { + ) -> Result { if PusPacket::service(pus_tc) != 11 { return Err(ScheduleError::WrongService); } @@ -260,8 +413,8 @@ impl PusScheduler { &mut self, pus_tc: &PusTc, pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - self.insert_wrapped_tc::(pus_tc, pool) + ) -> Result { + self.insert_wrapped_tc::(pus_tc, pool) } /// Insert a telecommand based on the fully wrapped time-tagged telecommand using a CDS @@ -270,12 +423,150 @@ impl PusScheduler { &mut self, pus_tc: &PusTc, pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - self.insert_wrapped_tc::>(pus_tc, pool) + ) -> Result { + self.insert_wrapped_tc::>(pus_tc, pool) } + /// This function uses [Self::retrieve_by_time_filter] to extract all scheduled commands inside + /// the time range and then deletes them from the provided store. + /// + /// Like specified in the documentation of [Self::retrieve_by_time_filter], the range extraction + /// for deletion is always inclusive. + /// + /// This function returns the number of deleted commands on success. In case any deletion fails, + /// the last deletion will be supplied in addition to the number of deleted commands. + pub fn delete_by_time_filter( + &mut self, + time_window: TimeWindow, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + let range = self.retrieve_by_time_filter(time_window); + let mut del_packets = 0; + let mut res_if_fails = None; + let mut keys_to_delete = Vec::new(); + for time_bucket in range { + for tc in time_bucket.1 { + match pool.delete(tc.addr) { + Ok(_) => del_packets += 1, + Err(e) => res_if_fails = Some(e), + } + } + keys_to_delete.push(*time_bucket.0); + } + for key in keys_to_delete { + self.tc_map.remove(&key); + } + if let Some(err) = res_if_fails { + return Err((del_packets, err)); + } + Ok(del_packets) + } + + /// Deletes all the scheduled commands. This also deletes the packets from the passed TC pool. + /// + /// This function returns the number of deleted commands on success. In case any deletion fails, + /// the last deletion will be supplied in addition to the number of deleted commands. + pub fn delete_all( + &mut self, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + self.delete_by_time_filter(TimeWindow::::new_select_all(), pool) + } + + /// Retrieve a range over all scheduled commands. + pub fn retrieve_all(&mut self) -> Range<'_, UnixTimestamp, Vec> { + self.tc_map.range(..) + } + + /// This retrieves scheduled telecommands which are inside the provided time window. + /// + /// It should be noted that the ranged extraction is always inclusive. For example, a range + /// from 50 to 100 unix seconds would also include command scheduled at 100 unix seconds. + pub fn retrieve_by_time_filter( + &mut self, + time_window: TimeWindow, + ) -> Range<'_, UnixTimestamp, Vec> { + match time_window.time_window_type() { + TimeWindowType::SelectAll => self.tc_map.range(..), + TimeWindowType::TimeTagToTimeTag => { + // This should be guaranteed to be valid by library API, so unwrap is okay + let start_time = time_window.start_time().unwrap().unix_stamp(); + let end_time = time_window.end_time().unwrap().unix_stamp(); + self.tc_map.range(start_time..=end_time) + } + TimeWindowType::FromTimeTag => { + // This should be guaranteed to be valid by library API, so unwrap is okay + let start_time = time_window.start_time().unwrap().unix_stamp(); + self.tc_map.range(start_time..) + } + TimeWindowType::ToTimeTag => { + // This should be guaranteed to be valid by library API, so unwrap is okay + let end_time = time_window.end_time().unwrap().unix_stamp(); + self.tc_map.range(..=end_time) + } + } + } + + /// Deletes a scheduled command with the given request ID. Returns the store address if a + /// scheduled command was found in the map and deleted, and None otherwise. + /// + /// Please note that this function will stop on the first telecommand with a request ID match. + /// In case of duplicate IDs (which should generally not happen), this function needs to be + /// called repeatedly. + pub fn delete_by_request_id(&mut self, req_id: &RequestId) -> Option { + if let DeletionResult::WithoutStoreDeletion(v) = + self.delete_by_request_id_internal(req_id, None::<&mut dyn PoolProvider>) + { + return v; + } + panic!("unexpected deletion result"); + } + + /// This behaves like [Self::delete_by_request_id] but deletes the packet from the pool as well. + pub fn delete_by_request_id_and_from_pool( + &mut self, + req_id: &RequestId, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + if let DeletionResult::WithStoreDeletion(v) = + self.delete_by_request_id_internal(req_id, Some(pool)) + { + return v; + } + panic!("unexpected deletion result"); + } + + fn delete_by_request_id_internal( + &mut self, + req_id: &RequestId, + pool: Option<&mut (impl PoolProvider + ?Sized)>, + ) -> DeletionResult { + let mut idx_found = None; + for time_bucket in &mut self.tc_map { + for (idx, tc_info) in time_bucket.1.iter().enumerate() { + if &tc_info.request_id == req_id { + idx_found = Some(idx); + } + } + if let Some(idx) = idx_found { + let addr = time_bucket.1.remove(idx).addr; + if let Some(pool) = pool { + return match pool.delete(addr) { + Ok(_) => DeletionResult::WithStoreDeletion(Ok(true)), + Err(e) => DeletionResult::WithStoreDeletion(Err(e)), + }; + } + return DeletionResult::WithoutStoreDeletion(Some(addr)); + } + } + if pool.is_none() { + DeletionResult::WithoutStoreDeletion(None) + } else { + DeletionResult::WithStoreDeletion(Ok(false)) + } + } /// Retrieve all telecommands which should be release based on the current time. - pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec> { + pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec> { self.tc_map.range(..=self.current_time) } @@ -294,10 +585,11 @@ impl PusScheduler { /// # Arguments /// /// * `releaser` - Closure where the first argument is whether the scheduler is enabled and - /// the second argument is the store address. This closure should return whether the - /// command should be deleted if the scheduler is disabled to prevent memory leaks. + /// the second argument is the telecommand information also containing the store address. + /// This closure should return whether the command should be deleted if the scheduler is + /// disabled to prevent memory leaks. /// * `store` - The holding store of the telecommands. - pub fn release_telecommands bool>( + pub fn release_telecommands bool>( &mut self, mut releaser: R, tc_store: &mut (impl PoolProvider + ?Sized), @@ -306,11 +598,11 @@ impl PusScheduler { let mut released_tcs = 0; let mut store_error = Ok(()); for tc in tcs_to_release { - for addr in tc.1 { - let should_delete = releaser(self.enabled, addr); + for info in tc.1 { + let should_delete = releaser(self.enabled, info); released_tcs += 1; if should_delete && !self.is_enabled() { - let res = tc_store.delete(*addr); + let res = tc_store.delete(info.addr); if res.is_err() { store_error = res; } @@ -326,9 +618,9 @@ impl PusScheduler { #[cfg(test)] mod tests { + use super::*; use crate::pool::{LocalPool, PoolCfg, PoolProvider, StoreAddr, StoreError}; - use crate::pus::scheduling::{PusScheduler, ScheduleError}; - use spacepackets::tc::PusTc; + use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::time::{cds, TimeWriter, UnixTimestamp}; use spacepackets::SpHeader; use std::time::Duration; @@ -339,7 +631,7 @@ mod tests { fn pus_tc_base(timestamp: UnixTimestamp, buf: &mut [u8]) -> (SpHeader, usize) { let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(×tamp).unwrap(); let len_time_stamp = cds_time.write_to_bytes(buf).unwrap(); - let len_packet = base_ping_tc_simple_ctor() + let len_packet = base_ping_tc_simple_ctor(0, None) .write_to_bytes(&mut buf[len_time_stamp..]) .unwrap(); ( @@ -387,9 +679,21 @@ mod tests { PusTc::new_simple(&mut sph, 11, 4, None, true) } - fn base_ping_tc_simple_ctor() -> PusTc<'static> { - let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap(); - PusTc::new_simple(&mut sph, 17, 1, None, true) + fn base_ping_tc_simple_ctor(seq_count: u16, app_data: Option<&'static [u8]>) -> PusTc<'static> { + let mut sph = SpHeader::tc_unseg(0x02, seq_count, 0).unwrap(); + PusTc::new_simple(&mut sph, 17, 1, app_data, true) + } + + fn ping_tc_to_store( + pool: &mut LocalPool, + buf: &mut [u8], + seq_count: u16, + app_data: Option<&'static [u8]>, + ) -> TcInfo { + let ping_tc = base_ping_tc_simple_ctor(seq_count, app_data); + let ping_size = ping_tc.write_to_bytes(buf).expect("writing ping TC failed"); + let first_addr = pool.add(&buf[0..ping_size]).unwrap(); + TcInfo::new(first_addr, RequestId::from_tc(&ping_tc)) } #[test] @@ -409,28 +713,31 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let first_addr = pool.add(&[0, 1, 2]).unwrap(); + let mut buf: [u8; 32] = [0; 32]; + let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); scheduler .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(100), - first_addr.clone(), + TcInfo::new(tc_info_0.addr.clone(), tc_info_0.request_id), ) .unwrap(); - let second_addr = pool.add(&[2, 3, 4]).unwrap(); + let app_data = &[0, 1, 2]; + let tc_info_1 = ping_tc_to_store(&mut pool, &mut buf, 1, Some(app_data)); scheduler .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(200), - second_addr.clone(), + TcInfo::new(tc_info_1.addr.clone(), tc_info_1.request_id), ) .unwrap(); - let third_addr = pool.add(&[5, 6, 7]).unwrap(); + let app_data = &[0, 1, 2]; + let tc_info_2 = ping_tc_to_store(&mut pool, &mut buf, 2, Some(app_data)); scheduler .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(300), - third_addr.clone(), + TcInfo::new(tc_info_2.addr().clone(), tc_info_2.request_id()), ) .unwrap(); @@ -439,9 +746,9 @@ mod tests { scheduler.reset(&mut pool).expect("deletion of TCs failed"); assert!(!scheduler.is_enabled()); assert_eq!(scheduler.num_scheduled_telecommands(), 0); - assert!(!pool.has_element_at(&first_addr).unwrap()); - assert!(!pool.has_element_at(&second_addr).unwrap()); - assert!(!pool.has_element_at(&third_addr).unwrap()); + assert!(!pool.has_element_at(&tc_info_0.addr()).unwrap()); + assert!(!pool.has_element_at(&tc_info_1.addr()).unwrap()); + assert!(!pool.has_element_at(&tc_info_2.addr()).unwrap()); } #[test] @@ -452,30 +759,51 @@ mod tests { scheduler .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 1, - }, + TcInfo::new( + StoreAddr { + pool_idx: 0, + packet_idx: 1, + }, + RequestId { + seq_count: 1, + apid: 0, + source_id: 0, + }, + ), ) .unwrap(); scheduler .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 2, - }, + TcInfo::new( + StoreAddr { + pool_idx: 0, + packet_idx: 2, + }, + RequestId { + seq_count: 2, + apid: 1, + source_id: 5, + }, + ), ) .unwrap(); scheduler .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(300), - StoreAddr { - pool_idx: 0, - packet_idx: 2, - }, + TcInfo::new( + StoreAddr { + pool_idx: 0, + packet_idx: 2, + }, + RequestId { + source_id: 10, + seq_count: 20, + apid: 23, + }, + ), ) .unwrap(); @@ -512,26 +840,45 @@ mod tests { *counter += 1; } + #[test] + fn request_id() { + let src_id_to_set = 12; + let apid_to_set = 0x22; + let seq_count = 105; + let mut sp_header = SpHeader::tc_unseg(apid_to_set, 105, 0).unwrap(); + let mut sec_header = PusTcSecondaryHeader::new_simple(17, 1); + sec_header.source_id = src_id_to_set; + let ping_tc = PusTc::new(&mut sp_header, sec_header, None, true); + let req_id = RequestId::from_tc(&ping_tc); + assert_eq!(req_id.source_id(), src_id_to_set); + assert_eq!(req_id.apid(), apid_to_set); + assert_eq!(req_id.seq_count(), seq_count); + assert_eq!( + req_id.as_u64(), + ((src_id_to_set as u64) << 32) | (apid_to_set as u64) << 16 | seq_count as u64 + ); + } #[test] fn release_basic() { let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let first_addr = pool.add(&[2, 2, 2]).unwrap(); + let mut buf: [u8; 32] = [0; 32]; + let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); scheduler - .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr) + .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0) .expect("insertion failed"); - let second_addr = pool.add(&[5, 6, 7]).unwrap(); + let tc_info_1 = ping_tc_to_store(&mut pool, &mut buf, 1, None); scheduler - .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr) + .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), tc_info_1) .expect("insertion failed"); let mut i = 0; - let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| { - common_check(boolvar, store_addr, vec![first_addr], &mut i); + let mut test_closure_1 = |boolvar: bool, tc_info: &TcInfo| { + common_check(boolvar, &tc_info.addr, vec![tc_info_0.addr()], &mut i); true }; @@ -549,11 +896,11 @@ mod tests { .release_telecommands(&mut test_closure_1, &mut pool) .expect("deletion failed"); assert_eq!(released, 1); - assert!(pool.has_element_at(&first_addr).unwrap()); + assert!(pool.has_element_at(&tc_info_0.addr()).unwrap()); // test 3, late timestamp, release 1 overdue tc - let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| { - common_check(boolvar, store_addr, vec![second_addr], &mut i); + let mut test_closure_2 = |boolvar: bool, tc_info: &TcInfo| { + common_check(boolvar, &tc_info.addr, vec![tc_info_1.addr()], &mut i); true }; @@ -563,7 +910,7 @@ mod tests { .release_telecommands(&mut test_closure_2, &mut pool) .expect("deletion failed"); assert_eq!(released, 1); - assert!(pool.has_element_at(&second_addr).unwrap()); + assert!(pool.has_element_at(&tc_info_1.addr()).unwrap()); //test 4: no tcs left scheduler @@ -580,20 +927,26 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let first_addr = pool.add(&[2, 2, 2]).unwrap(); + let mut buf: [u8; 32] = [0; 32]; + let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); scheduler - .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr) + .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0) .expect("insertion failed"); - let second_addr = pool.add(&[2, 2, 2]).unwrap(); + let tc_info_1 = ping_tc_to_store(&mut pool, &mut buf, 1, None); scheduler - .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), second_addr) + .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_1) .expect("insertion failed"); let mut i = 0; - let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { - common_check(boolvar, store_addr, vec![first_addr, second_addr], &mut i); + let mut test_closure = |boolvar: bool, store_addr: &TcInfo| { + common_check( + boolvar, + &store_addr.addr, + vec![tc_info_0.addr(), tc_info_1.addr()], + &mut i, + ); true }; @@ -612,8 +965,8 @@ mod tests { .release_telecommands(&mut test_closure, &mut pool) .expect("deletion failed"); assert_eq!(released, 2); - assert!(pool.has_element_at(&first_addr).unwrap()); - assert!(pool.has_element_at(&second_addr).unwrap()); + assert!(pool.has_element_at(&tc_info_0.addr()).unwrap()); + assert!(pool.has_element_at(&tc_info_1.addr()).unwrap()); //test 3: no tcs left released = scheduler @@ -633,20 +986,21 @@ mod tests { scheduler.disable(); - let first_addr = pool.add(&[2, 2, 2]).unwrap(); + let mut buf: [u8; 32] = [0; 32]; + let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); scheduler - .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr) + .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0) .expect("insertion failed"); - let second_addr = pool.add(&[5, 6, 7]).unwrap(); + let tc_info_1 = ping_tc_to_store(&mut pool, &mut buf, 1, None); scheduler - .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr) + .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), tc_info_1) .expect("insertion failed"); let mut i = 0; - let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| { - common_check_disabled(boolvar, store_addr, vec![first_addr], &mut i); + let mut test_closure_1 = |boolvar: bool, tc_info: &TcInfo| { + common_check_disabled(boolvar, &tc_info.addr, vec![tc_info_0.addr()], &mut i); true }; @@ -664,11 +1018,11 @@ mod tests { .release_telecommands(&mut test_closure_1, &mut pool) .expect("deletion failed"); assert_eq!(released, 1); - assert!(!pool.has_element_at(&first_addr).unwrap()); + assert!(!pool.has_element_at(&tc_info_0.addr()).unwrap()); // test 3, late timestamp, release 1 overdue tc - let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| { - common_check_disabled(boolvar, store_addr, vec![second_addr], &mut i); + let mut test_closure_2 = |boolvar: bool, tc_info: &TcInfo| { + common_check_disabled(boolvar, &tc_info.addr, vec![tc_info_1.addr()], &mut i); true }; @@ -678,7 +1032,7 @@ mod tests { .release_telecommands(&mut test_closure_2, &mut pool) .expect("deletion failed"); assert_eq!(released, 1); - assert!(!pool.has_element_at(&second_addr).unwrap()); + assert!(!pool.has_element_at(&tc_info_1.addr()).unwrap()); //test 4: no tcs left scheduler @@ -696,17 +1050,21 @@ mod tests { let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 32] = [0; 32]; - let len = base_ping_tc_simple_ctor().write_to_bytes(&mut buf).unwrap(); + let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); - let addr = scheduler - .insert_unwrapped_tc(UnixTimestamp::new_only_seconds(100), &buf[..len], &mut pool) + let info = scheduler + .insert_unwrapped_tc( + UnixTimestamp::new_only_seconds(100), + &buf[..pool.len_of_data(&tc_info_0.addr()).unwrap()], + &mut pool, + ) .unwrap(); - assert!(pool.has_element_at(&addr).unwrap()); + assert!(pool.has_element_at(&tc_info_0.addr()).unwrap()); - let data = pool.read(&addr).unwrap(); + let data = pool.read(&tc_info_0.addr()).unwrap(); let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); - assert_eq!(check_tc.0, base_ping_tc_simple_ctor()); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); assert_eq!(scheduler.num_scheduled_telecommands(), 1); @@ -715,10 +1073,10 @@ mod tests { let mut addr_vec = Vec::new(); let mut i = 0; - let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { - common_check(boolvar, store_addr, vec![addr], &mut i); + let mut test_closure = |boolvar: bool, tc_info: &TcInfo| { + common_check(boolvar, &tc_info.addr, vec![info.addr], &mut i); // check that tc remains unchanged - addr_vec.push(*store_addr); + addr_vec.push(tc_info.addr); false }; @@ -728,7 +1086,7 @@ mod tests { let data = pool.read(&addr_vec[0]).unwrap(); let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); - assert_eq!(check_tc.0, base_ping_tc_simple_ctor()); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); } #[test] @@ -741,18 +1099,18 @@ mod tests { let mut buf: [u8; 32] = [0; 32]; let tc = scheduled_tc(UnixTimestamp::new_only_seconds(100), &mut buf); - let addr = match scheduler.insert_wrapped_tc::(&tc, &mut pool) { + let info = match scheduler.insert_wrapped_tc::(&tc, &mut pool) { Ok(addr) => addr, Err(e) => { panic!("unexpected error {e}"); } }; - assert!(pool.has_element_at(&addr).unwrap()); + assert!(pool.has_element_at(&info.addr).unwrap()); - let data = pool.read(&addr).unwrap(); + let data = pool.read(&info.addr).unwrap(); let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); - assert_eq!(check_tc.0, base_ping_tc_simple_ctor()); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); assert_eq!(scheduler.num_scheduled_telecommands(), 1); @@ -761,10 +1119,10 @@ mod tests { let mut addr_vec = Vec::new(); let mut i = 0; - let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { - common_check(boolvar, store_addr, vec![addr], &mut i); + let mut test_closure = |boolvar: bool, tc_info: &TcInfo| { + common_check(boolvar, &tc_info.addr, vec![info.addr], &mut i); // check that tc remains unchanged - addr_vec.push(*store_addr); + addr_vec.push(tc_info.addr); false }; @@ -774,7 +1132,7 @@ mod tests { let data = pool.read(&addr_vec[0]).unwrap(); let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); - assert_eq!(check_tc.0, base_ping_tc_simple_ctor()); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); } #[test] @@ -897,19 +1255,20 @@ mod tests { let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let first_addr = pool.add(&[2, 2, 2]).unwrap(); + let mut buf: [u8; 32] = [0; 32]; + let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); scheduler - .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr) + .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0) .expect("insertion failed"); let mut i = 0; - let test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| { - common_check_disabled(boolvar, store_addr, vec![first_addr], &mut i); + let test_closure_1 = |boolvar: bool, tc_info: &TcInfo| { + common_check_disabled(boolvar, &tc_info.addr, vec![tc_info_0.addr()], &mut i); true }; // premature deletion - pool.delete(first_addr).expect("deletion failed"); + pool.delete(tc_info_0.addr()).expect("deletion failed"); // scheduler will only auto-delete if it is disabled. scheduler.disable(); scheduler.update_time(UnixTimestamp::new_only_seconds(100)); @@ -919,9 +1278,9 @@ mod tests { assert_eq!(err.0, 1); match err.1 { StoreError::DataDoesNotExist(addr) => { - assert_eq!(first_addr, addr); + assert_eq!(tc_info_0.addr(), addr); } - _ => panic!("unexpected error {}", err.1) + _ => panic!("unexpected error {}", err.1), } } @@ -930,24 +1289,107 @@ mod tests { let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let first_addr = pool.add(&[2, 2, 2]).unwrap(); + let mut buf: [u8; 32] = [0; 32]; + let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); scheduler - .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr) + .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0) .expect("insertion failed"); // premature deletion - pool.delete(first_addr).expect("deletion failed"); + pool.delete(tc_info_0.addr()).expect("deletion failed"); let reset_res = scheduler.reset(&mut pool); assert!(reset_res.is_err()); let err = reset_res.unwrap_err(); match err { StoreError::DataDoesNotExist(addr) => { - assert_eq!(addr, first_addr); - }, - _ => panic!("unexpected error {err}") + assert_eq!(addr, tc_info_0.addr()); + } + _ => panic!("unexpected error {err}"), } } + #[test] + fn test_delete_by_req_id_simple_retrieve_addr() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut buf: [u8; 32] = [0; 32]; + let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); + scheduler + .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0) + .expect("inserting tc failed"); + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + let addr = scheduler + .delete_by_request_id(&tc_info_0.request_id()) + .unwrap(); + assert!(pool.has_element_at(&tc_info_0.addr()).unwrap()); + assert_eq!(tc_info_0.addr(), addr); + assert_eq!(scheduler.num_scheduled_telecommands(), 0); + } + + #[test] + fn test_delete_by_req_id_simple_delete_all() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut buf: [u8; 32] = [0; 32]; + let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); + scheduler + .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0) + .expect("inserting tc failed"); + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + let del_res = + scheduler.delete_by_request_id_and_from_pool(&tc_info_0.request_id(), &mut pool); + assert!(del_res.is_ok()); + assert_eq!(del_res.unwrap(), true); + assert!(!pool.has_element_at(&tc_info_0.addr()).unwrap()); + assert_eq!(scheduler.num_scheduled_telecommands(), 0); + } + + #[test] + fn test_delete_by_req_id_complex() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut buf: [u8; 32] = [0; 32]; + let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None); + scheduler + .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0) + .expect("inserting tc failed"); + let tc_info_1 = ping_tc_to_store(&mut pool, &mut buf, 1, None); + scheduler + .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_1) + .expect("inserting tc failed"); + let tc_info_2 = ping_tc_to_store(&mut pool, &mut buf, 2, None); + scheduler + .insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_2) + .expect("inserting tc failed"); + assert_eq!(scheduler.num_scheduled_telecommands(), 3); + + // Delete first packet + let addr_0 = scheduler.delete_by_request_id(&tc_info_0.request_id()); + assert!(addr_0.is_some()); + assert_eq!(addr_0.unwrap(), tc_info_0.addr()); + assert!(pool.has_element_at(&tc_info_0.addr()).unwrap()); + assert_eq!(scheduler.num_scheduled_telecommands(), 2); + + // Delete next packet + let del_res = + scheduler.delete_by_request_id_and_from_pool(&tc_info_2.request_id(), &mut pool); + assert!(del_res.is_ok()); + assert_eq!(del_res.unwrap(), true); + assert!(!pool.has_element_at(&tc_info_2.addr()).unwrap()); + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + + // Delete last packet + let addr_1 = + scheduler.delete_by_request_id_and_from_pool(&tc_info_1.request_id(), &mut pool); + assert!(addr_1.is_ok()); + assert_eq!(addr_1.unwrap(), true); + assert!(!pool.has_element_at(&tc_info_1.addr()).unwrap()); + assert_eq!(scheduler.num_scheduled_telecommands(), 0); + } + #[test] fn insert_full_store_test() { let mut scheduler = @@ -964,13 +1406,256 @@ mod tests { assert!(insert_res.is_err()); let err = insert_res.unwrap_err(); match err { - ScheduleError::StoreError(e) => { - match e { - StoreError::StoreFull(_) => {} - _ => panic!("unexpected store error {e}") - } - } - _ => panic!("unexpected error {err}") + ScheduleError::StoreError(e) => match e { + StoreError::StoreFull(_) => {} + _ => panic!("unexpected store error {e}"), + }, + _ => panic!("unexpected error {err}"), } } + + fn insert_command_with_release_time( + pool: &mut LocalPool, + scheduler: &mut PusScheduler, + seq_count: u16, + release_secs: u64, + ) -> TcInfo { + let mut buf: [u8; 32] = [0; 32]; + let tc_info = ping_tc_to_store(pool, &mut buf, seq_count, None); + + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(release_secs as i64), + tc_info, + ) + .expect("inserting tc failed"); + tc_info + } + + #[test] + fn test_time_window_retrieval_select_all() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let tc_info_0 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); + let tc_info_1 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 100); + assert_eq!(scheduler.num_scheduled_telecommands(), 2); + let check_range = |range: Range>| { + let mut tcs_in_range = 0; + for (idx, time_bucket) in range.enumerate() { + tcs_in_range += 1; + if idx == 0 { + assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(50)); + assert_eq!(time_bucket.1.len(), 1); + assert_eq!(time_bucket.1[0].request_id, tc_info_0.request_id); + } else if idx == 1 { + assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(100)); + assert_eq!(time_bucket.1.len(), 1); + assert_eq!(time_bucket.1[0].request_id, tc_info_1.request_id); + } + } + assert_eq!(tcs_in_range, 2); + }; + let range = scheduler.retrieve_all(); + check_range(range); + let range = + scheduler.retrieve_by_time_filter(TimeWindow::::new_select_all()); + check_range(range); + } + + #[test] + fn test_time_window_retrieval_select_from_stamp() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let _ = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); + let tc_info_1 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 100); + let tc_info_2 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 150); + let start_stamp = + cds::TimeProvider::from_unix_secs_with_u16_days(&UnixTimestamp::new_only_seconds(100)) + .expect("creating start stamp failed"); + let time_window = TimeWindow::new_from_time(&start_stamp); + assert_eq!(scheduler.num_scheduled_telecommands(), 3); + + let range = scheduler.retrieve_by_time_filter(time_window); + let mut tcs_in_range = 0; + for (idx, time_bucket) in range.enumerate() { + tcs_in_range += 1; + if idx == 0 { + assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(100)); + assert_eq!(time_bucket.1.len(), 1); + assert_eq!(time_bucket.1[0].request_id, tc_info_1.request_id()); + } else if idx == 1 { + assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(150)); + assert_eq!(time_bucket.1.len(), 1); + assert_eq!(time_bucket.1[0].request_id, tc_info_2.request_id()); + } + } + assert_eq!(tcs_in_range, 2); + } + + #[test] + fn test_time_window_retrieval_select_to_time() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let tc_info_0 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); + let tc_info_1 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 100); + let _ = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 150); + assert_eq!(scheduler.num_scheduled_telecommands(), 3); + + let end_stamp = + cds::TimeProvider::from_unix_secs_with_u16_days(&UnixTimestamp::new_only_seconds(100)) + .expect("creating start stamp failed"); + let time_window = TimeWindow::new_to_time(&end_stamp); + let range = scheduler.retrieve_by_time_filter(time_window); + let mut tcs_in_range = 0; + for (idx, time_bucket) in range.enumerate() { + tcs_in_range += 1; + if idx == 0 { + assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(50)); + assert_eq!(time_bucket.1.len(), 1); + assert_eq!(time_bucket.1[0].request_id, tc_info_0.request_id()); + } else if idx == 1 { + assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(100)); + assert_eq!(time_bucket.1.len(), 1); + assert_eq!(time_bucket.1[0].request_id, tc_info_1.request_id()); + } + } + assert_eq!(tcs_in_range, 2); + } + + #[test] + fn test_time_window_retrieval_select_from_time_to_time() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let _ = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); + let tc_info_1 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 100); + let tc_info_2 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 150); + let _ = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 200); + assert_eq!(scheduler.num_scheduled_telecommands(), 4); + + let start_stamp = + cds::TimeProvider::from_unix_secs_with_u16_days(&UnixTimestamp::new_only_seconds(100)) + .expect("creating start stamp failed"); + let end_stamp = + cds::TimeProvider::from_unix_secs_with_u16_days(&UnixTimestamp::new_only_seconds(150)) + .expect("creating end stamp failed"); + let time_window = TimeWindow::new_from_time_to_time(&start_stamp, &end_stamp); + let range = scheduler.retrieve_by_time_filter(time_window); + let mut tcs_in_range = 0; + for (idx, time_bucket) in range.enumerate() { + tcs_in_range += 1; + if idx == 0 { + assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(100)); + assert_eq!(time_bucket.1.len(), 1); + assert_eq!(time_bucket.1[0].request_id, tc_info_1.request_id()); + } else if idx == 1 { + assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(150)); + assert_eq!(time_bucket.1.len(), 1); + assert_eq!(time_bucket.1[0].request_id, tc_info_2.request_id()); + } + } + assert_eq!(tcs_in_range, 2); + } + + #[test] + fn test_deletion_all() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); + insert_command_with_release_time(&mut pool, &mut scheduler, 0, 100); + assert_eq!(scheduler.num_scheduled_telecommands(), 2); + let del_res = scheduler.delete_all(&mut pool); + assert!(del_res.is_ok()); + assert_eq!(del_res.unwrap(), 2); + assert_eq!(scheduler.num_scheduled_telecommands(), 0); + // Contrary to reset, this does not disable the scheduler. + assert!(scheduler.is_enabled()); + + insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); + insert_command_with_release_time(&mut pool, &mut scheduler, 0, 100); + assert_eq!(scheduler.num_scheduled_telecommands(), 2); + let del_res = scheduler + .delete_by_time_filter(TimeWindow::::new_select_all(), &mut pool); + assert!(del_res.is_ok()); + assert_eq!(del_res.unwrap(), 2); + assert_eq!(scheduler.num_scheduled_telecommands(), 0); + // Contrary to reset, this does not disable the scheduler. + assert!(scheduler.is_enabled()); + } + + #[test] + fn test_deletion_from_start_time() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); + let cmd_0_to_delete = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 100); + let cmd_1_to_delete = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 150); + assert_eq!(scheduler.num_scheduled_telecommands(), 3); + let start_stamp = + cds::TimeProvider::from_unix_secs_with_u16_days(&UnixTimestamp::new_only_seconds(100)) + .expect("creating start stamp failed"); + let time_window = TimeWindow::new_from_time(&start_stamp); + let del_res = scheduler.delete_by_time_filter(time_window, &mut pool); + assert!(del_res.is_ok()); + assert_eq!(del_res.unwrap(), 2); + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + assert!(!pool.has_element_at(&cmd_0_to_delete.addr()).unwrap()); + assert!(!pool.has_element_at(&cmd_1_to_delete.addr()).unwrap()); + } + + #[test] + fn test_deletion_to_end_time() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let cmd_0_to_delete = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); + let cmd_1_to_delete = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 100); + insert_command_with_release_time(&mut pool, &mut scheduler, 0, 150); + assert_eq!(scheduler.num_scheduled_telecommands(), 3); + + let end_stamp = + cds::TimeProvider::from_unix_secs_with_u16_days(&UnixTimestamp::new_only_seconds(100)) + .expect("creating start stamp failed"); + let time_window = TimeWindow::new_to_time(&end_stamp); + let del_res = scheduler.delete_by_time_filter(time_window, &mut pool); + assert!(del_res.is_ok()); + assert_eq!(del_res.unwrap(), 2); + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + assert!(!pool.has_element_at(&cmd_0_to_delete.addr()).unwrap()); + assert!(!pool.has_element_at(&cmd_1_to_delete.addr()).unwrap()); + } + + #[test] + fn test_deletion_from_start_time_to_end_time() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let cmd_out_of_range_0 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50); + let cmd_0_to_delete = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 100); + let cmd_1_to_delete = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 150); + let cmd_out_of_range_1 = + insert_command_with_release_time(&mut pool, &mut scheduler, 0, 200); + assert_eq!(scheduler.num_scheduled_telecommands(), 4); + + let start_stamp = + cds::TimeProvider::from_unix_secs_with_u16_days(&UnixTimestamp::new_only_seconds(100)) + .expect("creating start stamp failed"); + let end_stamp = + cds::TimeProvider::from_unix_secs_with_u16_days(&UnixTimestamp::new_only_seconds(150)) + .expect("creating end stamp failed"); + let time_window = TimeWindow::new_from_time_to_time(&start_stamp, &end_stamp); + let del_res = scheduler.delete_by_time_filter(time_window, &mut pool); + assert!(del_res.is_ok()); + assert_eq!(del_res.unwrap(), 2); + assert_eq!(scheduler.num_scheduled_telecommands(), 2); + assert!(pool.has_element_at(&cmd_out_of_range_0.addr()).unwrap()); + assert!(!pool.has_element_at(&cmd_0_to_delete.addr()).unwrap()); + assert!(!pool.has_element_at(&cmd_1_to_delete.addr()).unwrap()); + assert!(pool.has_element_at(&cmd_out_of_range_1.addr()).unwrap()); + } } diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index 4da2a09..834303b 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -104,8 +104,11 @@ pub use stdmod::{ StdVerifSenderError, }; -/// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard +/// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard. +/// /// This field equivalent to the first two bytes of the CCSDS space packet header. +/// This version of the request ID is supplied in the verification reports and does not contain +/// the source ID. #[derive(Debug, Eq, Copy, Clone)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct RequestId { diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index af3e750..753d1a4 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -16,7 +16,7 @@ use crate::pus::{PusReceiver, PusTcArgs, PusTmArgs}; use crate::requests::RequestWithToken; use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; use satrs_core::pus::event_man::EventRequestWithToken; -use satrs_core::pus::scheduling::PusScheduler; +use satrs_core::pus::scheduling::{PusScheduler, TcInfo}; use satrs_core::pus::verification::StdVerifReporterWithSender; use satrs_core::spacepackets::{ecss::PusPacket, tc::PusTc, tm::PusTm, SpHeader}; use satrs_core::tmtc::{ @@ -217,12 +217,12 @@ fn core_tmtc_loop( pus_receiver: &mut PusReceiver, scheduler: Rc>, ) { - let releaser = |enabled: bool, addr: &StoreAddr| -> bool { + let releaser = |enabled: bool, info: &TcInfo| -> bool { if enabled { tc_args .tc_source .tc_source - .send(*addr) + .send(info.addr()) .expect("sending TC to TC source failed"); } true @@ -239,7 +239,7 @@ fn core_tmtc_loop( scheduler.update_time_from_now().unwrap(); if let Ok(released_tcs) = scheduler.release_telecommands(releaser, pool.as_mut()) { if released_tcs > 0 { - println!("{} Tc(s) released from scheduler", released_tcs); + println!("{released_tcs} TC(s) released from scheduler"); } } drop(pool);