Add initial support for request ID in scheduler #31

Merged
muellerr merged 7 commits from request_id_support_sched into main 2023-02-13 09:30:53 +01:00
3 changed files with 346 additions and 115 deletions
Showing only changes of commit 60db6e022d - Show all commits

View File

@ -53,10 +53,10 @@ default-features = false
optional = true
[dependencies.spacepackets]
version = "0.5.3"
# version = "0.5.3"
# path = "../spacepackets"
# git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
# rev = ""
git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
rev = "61c2042e3507242452ebe53164d40893cd8da155"
default-features = false

View File

@ -1,4 +1,4 @@
//! # Core components of the Flight Software Rust Crate (FSRC) collection
//! # Core components of the sat-rs framework
//!
//! This is a collection of Rust crates which can be used to build On-Board Software for remote
//! systems like satellites of rovers. It has special support for space tailored protocols

View File

@ -7,6 +7,7 @@ use core::fmt::{Debug, Display, Formatter};
use core::time::Duration;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use spacepackets::ecss::scheduling::TimeWindowType;
use spacepackets::ecss::{PusError, PusPacket};
use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc};
use spacepackets::time::cds::DaysLen24Bits;
@ -144,6 +145,8 @@ impl From<TimestampError> for ScheduleError {
#[cfg(feature = "std")]
impl Error for ScheduleError {}
/// This is the format stored internally by the TC scheduler for each scheduled telecommand.
/// It consists of the address of that telecommand in the TC pool and a request ID.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct TcInfo {
@ -178,6 +181,13 @@ impl TcInfo {
/// A disabled scheduler should still delete commands where the execution time has been reached
/// but should not release them to be executed.
///
/// The implementation uses an ordered map internally with the release timestamp being the key.
/// This allows efficient time based insertions and extractions which should be the primary use-case
/// for a time-based command scheduler.
/// There is no way to avoid duplicate [RequestId]s during insertion, which can occur even if the
/// user always correctly increment for sequence counter due to overflows. To avoid this issue,
/// it can make sense to split up telecommand groups by the APID to avoid overflows.
///
/// Currently, sub-schedules and groups are not supported.
#[derive(Debug)]
pub struct PusScheduler {
@ -192,6 +202,60 @@ enum DeletionResult {
WithStoreDeletion(Result<bool, StoreError>),
}
pub struct TimeWindow<TimeProvder> {
time_window_type: TimeWindowType,
start_time: Option<TimeProvder>,
end_time: Option<TimeProvder>,
}
impl<TimeProvider> TimeWindow<TimeProvider> {
pub fn new_select_all() -> Self {
Self {
time_window_type: TimeWindowType::SelectAll,
start_time: None,
end_time: None,
}
}
pub fn time_window_type(&self) -> TimeWindowType {
self.time_window_type
}
pub fn start_time(&self) -> Option<&TimeProvider> {
self.start_time.as_ref()
}
pub fn end_time(&self) -> Option<&TimeProvider> {
self.end_time.as_ref()
}
}
impl<TimeProvider: CcsdsTimeProvider + Clone> TimeWindow<TimeProvider> {
pub fn new_from_time_to_time(start_time: &TimeProvider, end_time: &TimeProvider) -> Self {
Self {
time_window_type: TimeWindowType::TimeTagToTimeTag,
start_time: Some(start_time.clone()),
end_time: Some(end_time.clone()),
}
}
pub fn new_from_time(start_time: &TimeProvider) -> Self {
Self {
time_window_type: TimeWindowType::FromTimeTag,
start_time: Some(start_time.clone()),
end_time: None,
}
}
pub fn new_to_time(end_time: &TimeProvider) -> Self {
Self {
time_window_type: TimeWindowType::ToTimeTag,
start_time: None,
end_time: Some(end_time.clone()),
}
}
}
impl PusScheduler {
/// Create a new PUS scheduler.
///
@ -360,6 +424,60 @@ impl PusScheduler {
self.insert_wrapped_tc::<spacepackets::time::cds::TimeProvider<DaysLen24Bits>>(pus_tc, pool)
}
/// This function uses [Self::retrieve_by_time_filter] to extract all scheduled commands inside
/// the time range and then deletes them from the provided store.
pub fn delete_by_time_filter<TimeProvider: CcsdsTimeProvider + Clone>(
&mut self,
time_window: TimeWindow<TimeProvider>,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<u64, (u64, StoreError)> {
let range = self.retrieve_by_time_filter(time_window);
let mut del_packets = 0;
let mut res_if_fails = None;
for time_bucket in range {
for tc in time_bucket.1 {
match pool.delete(tc.addr) {
Ok(_) => del_packets += 1,
Err(e) => res_if_fails = Some(e),
}
}
}
if let Some(err) = res_if_fails {
return Err((del_packets, err));
}
Ok(del_packets)
}
pub fn retrieve_all(&mut self) -> Range<'_, UnixTimestamp, Vec<TcInfo>> {
self.tc_map.range(..)
}
/// This retrieves scheduled telecommands which are inside the provided time window.
pub fn retrieve_by_time_filter<TimeProvider: CcsdsTimeProvider>(
&mut self,
time_window: TimeWindow<TimeProvider>,
) -> Range<'_, UnixTimestamp, Vec<TcInfo>> {
match time_window.time_window_type() {
TimeWindowType::SelectAll => self.tc_map.range(..),
TimeWindowType::TimeTagToTimeTag => {
// This should be guaranteed to be valid by library API, so unwrap is okay
let start_time = time_window.start_time().unwrap().unix_stamp();
let end_time = time_window.end_time().unwrap().unix_stamp();
self.tc_map.range(start_time..=end_time)
}
TimeWindowType::FromTimeTag => {
// This should be guaranteed to be valid by library API, so unwrap is okay
let start_time = time_window.start_time().unwrap().unix_stamp();
self.tc_map.range(start_time..)
}
TimeWindowType::ToTimeTag => {
// This should be guaranteed to be valid by library API, so unwrap is okay
let end_time = time_window.end_time().unwrap().unix_stamp();
self.tc_map.range(..=end_time)
}
}
}
/// Deletes a scheduled command with the given request ID. Returns the store address if a
/// scheduled command was found in the map and deleted, and None otherwise.
///
@ -375,7 +493,7 @@ impl PusScheduler {
panic!("unexpected deletion result");
}
/// This behaves like [delete_by_request_id] but deletes the packet from the pool as well.
/// This behaves like [Self::delete_by_request_id] but deletes the packet from the pool as well.
pub fn delete_by_request_id_and_from_pool(
&mut self,
req_id: &RequestId,
@ -438,8 +556,9 @@ impl PusScheduler {
/// # Arguments
///
/// * `releaser` - Closure where the first argument is whether the scheduler is enabled and
/// the second argument is the store address. This closure should return whether the
/// command should be deleted if the scheduler is disabled to prevent memory leaks.
/// the second argument is the telecommand information also containing the store address.
/// This closure should return whether the command should be deleted if the scheduler is
/// disabled to prevent memory leaks.
/// * `store` - The holding store of the telecommands.
pub fn release_telecommands<R: FnMut(bool, &TcInfo) -> bool>(
&mut self,
@ -541,11 +660,11 @@ mod tests {
buf: &mut [u8],
seq_count: u16,
app_data: Option<&'static [u8]>,
) -> (StoreAddr, RequestId) {
) -> TcInfo {
let ping_tc = base_ping_tc_simple_ctor(seq_count, app_data);
let ping_size = ping_tc.write_to_bytes(buf).expect("writing ping TC failed");
let first_addr = pool.add(&buf[0..ping_size]).unwrap();
(first_addr, RequestId::from_tc(&ping_tc))
TcInfo::new(first_addr, RequestId::from_tc(&ping_tc))
}
#[test]
@ -566,30 +685,30 @@ mod tests {
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(first_addr.clone(), req_id),
TcInfo::new(tc_info_0.addr.clone(), tc_info_0.request_id),
)
.unwrap();
let app_data = &[0, 1, 2];
let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 1, Some(app_data));
let tc_info_1 = ping_tc_to_store(&mut pool, &mut buf, 1, Some(app_data));
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(200),
TcInfo::new(second_addr.clone(), req_id),
TcInfo::new(tc_info_1.addr.clone(), tc_info_1.request_id),
)
.unwrap();
let app_data = &[0, 1, 2];
let (third_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 2, Some(app_data));
let tc_info_2 = ping_tc_to_store(&mut pool, &mut buf, 2, Some(app_data));
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(300),
TcInfo::new(third_addr.clone(), req_id),
TcInfo::new(tc_info_2.addr().clone(), tc_info_2.request_id()),
)
.unwrap();
@ -598,9 +717,9 @@ mod tests {
scheduler.reset(&mut pool).expect("deletion of TCs failed");
assert!(!scheduler.is_enabled());
assert_eq!(scheduler.num_scheduled_telecommands(), 0);
assert!(!pool.has_element_at(&first_addr).unwrap());
assert!(!pool.has_element_at(&second_addr).unwrap());
assert!(!pool.has_element_at(&third_addr).unwrap());
assert!(!pool.has_element_at(&tc_info_0.addr()).unwrap());
assert!(!pool.has_element_at(&tc_info_1.addr()).unwrap());
assert!(!pool.has_element_at(&tc_info_2.addr()).unwrap());
}
#[test]
@ -717,26 +836,20 @@ mod tests {
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(first_addr, req_id),
)
.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0)
.expect("insertion failed");
let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 1, None);
let tc_info_1 = ping_tc_to_store(&mut pool, &mut buf, 1, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(200),
TcInfo::new(second_addr, req_id),
)
.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), tc_info_1)
.expect("insertion failed");
let mut i = 0;
let mut test_closure_1 = |boolvar: bool, tc_info: &TcInfo| {
common_check(boolvar, &tc_info.addr, vec![first_addr], &mut i);
common_check(boolvar, &tc_info.addr, vec![tc_info_0.addr()], &mut i);
true
};
@ -754,11 +867,11 @@ mod tests {
.release_telecommands(&mut test_closure_1, &mut pool)
.expect("deletion failed");
assert_eq!(released, 1);
assert!(pool.has_element_at(&first_addr).unwrap());
assert!(pool.has_element_at(&tc_info_0.addr()).unwrap());
// test 3, late timestamp, release 1 overdue tc
let mut test_closure_2 = |boolvar: bool, tc_info: &TcInfo| {
common_check(boolvar, &tc_info.addr, vec![second_addr], &mut i);
common_check(boolvar, &tc_info.addr, vec![tc_info_1.addr()], &mut i);
true
};
@ -768,7 +881,7 @@ mod tests {
.release_telecommands(&mut test_closure_2, &mut pool)
.expect("deletion failed");
assert_eq!(released, 1);
assert!(pool.has_element_at(&second_addr).unwrap());
assert!(pool.has_element_at(&tc_info_1.addr()).unwrap());
//test 4: no tcs left
scheduler
@ -786,21 +899,15 @@ mod tests {
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(first_addr, req_id),
)
.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0)
.expect("insertion failed");
let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 1, None);
let tc_info_1 = ping_tc_to_store(&mut pool, &mut buf, 1, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(second_addr, req_id),
)
.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_1)
.expect("insertion failed");
let mut i = 0;
@ -808,7 +915,7 @@ mod tests {
common_check(
boolvar,
&store_addr.addr,
vec![first_addr, second_addr],
vec![tc_info_0.addr(), tc_info_1.addr()],
&mut i,
);
true
@ -829,8 +936,8 @@ mod tests {
.release_telecommands(&mut test_closure, &mut pool)
.expect("deletion failed");
assert_eq!(released, 2);
assert!(pool.has_element_at(&first_addr).unwrap());
assert!(pool.has_element_at(&second_addr).unwrap());
assert!(pool.has_element_at(&tc_info_0.addr()).unwrap());
assert!(pool.has_element_at(&tc_info_1.addr()).unwrap());
//test 3: no tcs left
released = scheduler
@ -851,26 +958,20 @@ mod tests {
scheduler.disable();
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(first_addr, req_id),
)
.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0)
.expect("insertion failed");
let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 1, None);
let tc_info_1 = ping_tc_to_store(&mut pool, &mut buf, 1, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(200),
TcInfo::new(second_addr, req_id),
)
.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), tc_info_1)
.expect("insertion failed");
let mut i = 0;
let mut test_closure_1 = |boolvar: bool, tc_info: &TcInfo| {
common_check_disabled(boolvar, &tc_info.addr, vec![first_addr], &mut i);
common_check_disabled(boolvar, &tc_info.addr, vec![tc_info_0.addr()], &mut i);
true
};
@ -888,11 +989,11 @@ mod tests {
.release_telecommands(&mut test_closure_1, &mut pool)
.expect("deletion failed");
assert_eq!(released, 1);
assert!(!pool.has_element_at(&first_addr).unwrap());
assert!(!pool.has_element_at(&tc_info_0.addr()).unwrap());
// test 3, late timestamp, release 1 overdue tc
let mut test_closure_2 = |boolvar: bool, tc_info: &TcInfo| {
common_check_disabled(boolvar, &tc_info.addr, vec![second_addr], &mut i);
common_check_disabled(boolvar, &tc_info.addr, vec![tc_info_1.addr()], &mut i);
true
};
@ -902,7 +1003,7 @@ mod tests {
.release_telecommands(&mut test_closure_2, &mut pool)
.expect("deletion failed");
assert_eq!(released, 1);
assert!(!pool.has_element_at(&second_addr).unwrap());
assert!(!pool.has_element_at(&tc_info_1.addr()).unwrap());
//test 4: no tcs left
scheduler
@ -920,19 +1021,19 @@ mod tests {
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut buf: [u8; 32] = [0; 32];
let (addr, _) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None);
let info = scheduler
.insert_unwrapped_tc(
UnixTimestamp::new_only_seconds(100),
&buf[..pool.len_of_data(&addr).unwrap()],
&buf[..pool.len_of_data(&tc_info_0.addr()).unwrap()],
&mut pool,
)
.unwrap();
assert!(pool.has_element_at(&info.addr).unwrap());
assert!(pool.has_element_at(&tc_info_0.addr()).unwrap());
let data = pool.read(&info.addr).unwrap();
let data = pool.read(&tc_info_0.addr()).unwrap();
let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data");
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None));
@ -1126,22 +1227,19 @@ mod tests {
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(first_addr, req_id),
)
.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0)
.expect("insertion failed");
let mut i = 0;
let test_closure_1 = |boolvar: bool, tc_info: &TcInfo| {
common_check_disabled(boolvar, &tc_info.addr, vec![first_addr], &mut i);
common_check_disabled(boolvar, &tc_info.addr, vec![tc_info_0.addr()], &mut i);
true
};
// premature deletion
pool.delete(first_addr).expect("deletion failed");
pool.delete(tc_info_0.addr()).expect("deletion failed");
// scheduler will only auto-delete if it is disabled.
scheduler.disable();
scheduler.update_time(UnixTimestamp::new_only_seconds(100));
@ -1151,7 +1249,7 @@ mod tests {
assert_eq!(err.0, 1);
match err.1 {
StoreError::DataDoesNotExist(addr) => {
assert_eq!(first_addr, addr);
assert_eq!(tc_info_0.addr(), addr);
}
_ => panic!("unexpected error {}", err.1),
}
@ -1163,22 +1261,19 @@ mod tests {
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(first_addr, req_id),
)
.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0)
.expect("insertion failed");
// premature deletion
pool.delete(first_addr).expect("deletion failed");
pool.delete(tc_info_0.addr()).expect("deletion failed");
let reset_res = scheduler.reset(&mut pool);
assert!(reset_res.is_err());
let err = reset_res.unwrap_err();
match err {
StoreError::DataDoesNotExist(addr) => {
assert_eq!(addr, first_addr);
assert_eq!(addr, tc_info_0.addr());
}
_ => panic!("unexpected error {err}"),
}
@ -1190,17 +1285,16 @@ mod tests {
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(first_addr, req_id),
)
.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0)
.expect("inserting tc failed");
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
let addr = scheduler.delete_by_request_id(&req_id).unwrap();
assert!(pool.has_element_at(&addr).unwrap());
assert_eq!(addr, first_addr);
let addr = scheduler
.delete_by_request_id(&tc_info_0.request_id())
.unwrap();
assert!(pool.has_element_at(&tc_info_0.addr()).unwrap());
assert_eq!(tc_info_0.addr(), addr);
assert_eq!(scheduler.num_scheduled_telecommands(), 0);
}
@ -1210,18 +1304,16 @@ mod tests {
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(first_addr, req_id),
)
.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0)
.expect("inserting tc failed");
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
let del_res = scheduler.delete_by_request_id_and_from_pool(&req_id, &mut pool);
let del_res =
scheduler.delete_by_request_id_and_from_pool(&tc_info_0.request_id(), &mut pool);
assert!(del_res.is_ok());
assert_eq!(del_res.unwrap(), true);
assert!(!pool.has_element_at(&first_addr).unwrap());
assert!(!pool.has_element_at(&tc_info_0.addr()).unwrap());
assert_eq!(scheduler.num_scheduled_telecommands(), 0);
}
@ -1231,48 +1323,41 @@ mod tests {
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id_0) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(first_addr, req_id_0),
)
.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_0)
.expect("inserting tc failed");
let (second_addr, req_id_1) = ping_tc_to_store(&mut pool, &mut buf, 1, None);
let tc_info_1 = ping_tc_to_store(&mut pool, &mut buf, 1, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(second_addr, req_id_1),
)
.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_1)
.expect("inserting tc failed");
let (third_addr, req_id_2) = ping_tc_to_store(&mut pool, &mut buf, 2, None);
let tc_info_2 = ping_tc_to_store(&mut pool, &mut buf, 2, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(third_addr, req_id_2),
)
.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), tc_info_2)
.expect("inserting tc failed");
assert_eq!(scheduler.num_scheduled_telecommands(), 3);
// Delete first packet
let addr_0 = scheduler.delete_by_request_id(&req_id_0);
let addr_0 = scheduler.delete_by_request_id(&tc_info_0.request_id());
assert!(addr_0.is_some());
assert_eq!(addr_0.unwrap(), first_addr);
assert!(pool.has_element_at(&first_addr).unwrap());
assert_eq!(addr_0.unwrap(), tc_info_0.addr());
assert!(pool.has_element_at(&tc_info_0.addr()).unwrap());
assert_eq!(scheduler.num_scheduled_telecommands(), 2);
// Delete next packet
let del_res = scheduler.delete_by_request_id_and_from_pool(&req_id_2, &mut pool);
let del_res =
scheduler.delete_by_request_id_and_from_pool(&tc_info_2.request_id(), &mut pool);
assert!(del_res.is_ok());
assert_eq!(del_res.unwrap(), true);
assert!(!pool.has_element_at(&third_addr).unwrap());
assert!(!pool.has_element_at(&tc_info_2.addr()).unwrap());
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
// Delete last packet
let addr_1 = scheduler.delete_by_request_id_and_from_pool(&req_id_1, &mut pool);
let addr_1 =
scheduler.delete_by_request_id_and_from_pool(&tc_info_1.request_id(), &mut pool);
assert!(addr_1.is_ok());
assert_eq!(addr_1.unwrap(), true);
assert!(!pool.has_element_at(&second_addr).unwrap());
assert!(!pool.has_element_at(&tc_info_1.addr()).unwrap());
assert_eq!(scheduler.num_scheduled_telecommands(), 0);
}
@ -1299,4 +1384,150 @@ mod tests {
_ => panic!("unexpected error {err}"),
}
}
fn insert_command_with_release_time(
pool: &mut LocalPool,
scheduler: &mut PusScheduler,
seq_count: u16,
release_secs: u64,
) -> TcInfo {
let mut buf: [u8; 32] = [0; 32];
let tc_info = ping_tc_to_store(pool, &mut buf, seq_count, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(release_secs as i64),
tc_info,
)
.expect("inserting tc failed");
tc_info
}
#[test]
fn test_time_window_retrieval_select_all() {
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let tc_info_0 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50);
let tc_info_1 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 100);
assert_eq!(scheduler.num_scheduled_telecommands(), 2);
let check_range = |range: Range<UnixTimestamp, Vec<TcInfo>>| {
let mut tcs_in_range = 0;
for (idx, time_bucket) in range.enumerate() {
tcs_in_range += 1;
if idx == 0 {
assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(50));
assert_eq!(time_bucket.1.len(), 1);
assert_eq!(time_bucket.1[0].request_id, tc_info_0.request_id);
} else if idx == 1 {
assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(100));
assert_eq!(time_bucket.1.len(), 1);
assert_eq!(time_bucket.1[0].request_id, tc_info_1.request_id);
}
}
assert_eq!(tcs_in_range, 2);
};
let range = scheduler.retrieve_all();
check_range(range);
let range =
scheduler.retrieve_by_time_filter(TimeWindow::<cds::TimeProvider>::new_select_all());
check_range(range);
}
#[test]
fn test_time_window_retrieval_select_from_stamp() {
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let _ = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50);
let tc_info_1 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 100);
let tc_info_2 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 150);
let start_stamp =
cds::TimeProvider::from_unix_secs_with_u16_days(&UnixTimestamp::new_only_seconds(100))
.expect("creating start stamp failed");
let time_window = TimeWindow::new_from_time(&start_stamp);
assert_eq!(scheduler.num_scheduled_telecommands(), 3);
let range = scheduler.retrieve_by_time_filter(time_window);
let mut tcs_in_range = 0;
for (idx, time_bucket) in range.enumerate() {
tcs_in_range += 1;
if idx == 0 {
assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(100));
assert_eq!(time_bucket.1.len(), 1);
assert_eq!(time_bucket.1[0].request_id, tc_info_1.request_id());
} else if idx == 1 {
assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(150));
assert_eq!(time_bucket.1.len(), 1);
assert_eq!(time_bucket.1[0].request_id, tc_info_2.request_id());
}
}
assert_eq!(tcs_in_range, 2);
}
#[test]
fn test_time_window_retrieval_select_to_stamp() {
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let tc_info_0 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50);
let tc_info_1 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 100);
let _ = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 150);
assert_eq!(scheduler.num_scheduled_telecommands(), 3);
let end_stamp =
cds::TimeProvider::from_unix_secs_with_u16_days(&UnixTimestamp::new_only_seconds(100))
.expect("creating start stamp failed");
let time_window = TimeWindow::new_to_time(&end_stamp);
let range = scheduler.retrieve_by_time_filter(time_window);
let mut tcs_in_range = 0;
for (idx, time_bucket) in range.enumerate() {
tcs_in_range += 1;
if idx == 0 {
assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(50));
assert_eq!(time_bucket.1.len(), 1);
assert_eq!(time_bucket.1[0].request_id, tc_info_0.request_id());
} else if idx == 1 {
assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(100));
assert_eq!(time_bucket.1.len(), 1);
assert_eq!(time_bucket.1[0].request_id, tc_info_1.request_id());
}
}
assert_eq!(tcs_in_range, 2);
}
#[test]
fn test_time_window_retrieval_select_from_to_stamp() {
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let _ = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 50);
let tc_info_1 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 100);
let tc_info_2 = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 150);
let _ = insert_command_with_release_time(&mut pool, &mut scheduler, 0, 200);
assert_eq!(scheduler.num_scheduled_telecommands(), 4);
let start_stamp =
cds::TimeProvider::from_unix_secs_with_u16_days(&UnixTimestamp::new_only_seconds(100))
.expect("creating start stamp failed");
let end_stamp =
cds::TimeProvider::from_unix_secs_with_u16_days(&UnixTimestamp::new_only_seconds(150))
.expect("creating end stamp failed");
let time_window = TimeWindow::new_from_time_to_time(&start_stamp, &end_stamp);
let range = scheduler.retrieve_by_time_filter(time_window);
let mut tcs_in_range = 0;
for (idx, time_bucket) in range.enumerate() {
tcs_in_range += 1;
if idx == 0 {
assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(100));
assert_eq!(time_bucket.1.len(), 1);
assert_eq!(time_bucket.1[0].request_id, tc_info_1.request_id());
} else if idx == 1 {
assert_eq!(*time_bucket.0, UnixTimestamp::new_only_seconds(15 0));
assert_eq!(time_bucket.1.len(), 1);
assert_eq!(time_bucket.1[0].request_id, tc_info_2.request_id());
}
}
assert_eq!(tcs_in_range, 2);
}
}