Add initial support for request ID in scheduler #31

Merged
muellerr merged 7 commits from request_id_support_sched into main 2023-02-13 09:30:53 +01:00
2 changed files with 213 additions and 38 deletions
Showing only changes of commit 21b9434d18 - Show all commits

View File

@ -257,6 +257,14 @@ pub trait PoolProvider {
/// Delete data inside the pool given a [StoreAddr]
fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError>;
fn has_element_at(&self, addr: &StoreAddr) -> Result<bool, StoreError>;
/// Retrieve the length of the data at the given store address.
fn len_of_data(&self, addr: &StoreAddr) -> Result<usize, StoreError> {
if !self.has_element_at(addr)? {
return Err(StoreError::DataDoesNotExist(*addr));
}
return Ok(self.read(addr)?.len());
}
}
impl LocalPool {

View File

@ -11,7 +11,7 @@ use spacepackets::ecss::{PusError, PusPacket};
use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc};
use spacepackets::time::cds::DaysLen24Bits;
use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp};
use spacepackets::{ByteConversionError, CcsdsPacket, SizeMissmatch};
use spacepackets::CcsdsPacket;
use std::collections::BTreeMap;
#[cfg(feature = "std")]
use std::error::Error;
@ -55,6 +55,7 @@ impl RequestId {
((self.source_id as u64) << 32) | ((self.apid as u64) << 16) | self.seq_count as u64
}
/*
pub fn from_bytes(buf: &[u8]) -> Result<Self, ByteConversionError> {
if buf.len() < core::mem::size_of::<u64>() {
return Err(ByteConversionError::FromSliceTooSmall(SizeMissmatch {
@ -68,6 +69,7 @@ impl RequestId {
seq_count: u16::from_be_bytes(buf[4..6].try_into().unwrap()),
})
}
*/
}
#[derive(Debug, Clone, PartialEq, Eq)]
@ -142,7 +144,7 @@ impl From<TimestampError> for ScheduleError {
#[cfg(feature = "std")]
impl Error for ScheduleError {}
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct TcInfo {
addr: StoreAddr,
@ -185,6 +187,11 @@ pub struct PusScheduler {
enabled: bool,
}
enum DeletionResult {
WithoutStoreDeletion(Option<StoreAddr>),
WithStoreDeletion(Result<bool, StoreError>),
}
impl PusScheduler {
/// Create a new PUS scheduler.
///
@ -352,6 +359,64 @@ impl PusScheduler {
self.insert_wrapped_tc::<spacepackets::time::cds::TimeProvider<DaysLen24Bits>>(pus_tc, pool)
}
/// Deletes a scheduled command with the given request ID. Returns the store address if a
/// scheduled command was found in the map and deleted, and None otherwise.
///
/// Please note that this function will stop on the first telecommand with a request ID match.
/// In case of duplicate IDs (which should generally not happen), this function needs to be
/// called repeatedly.
pub fn delete_by_request_id(&mut self, req_id: &RequestId) -> Option<StoreAddr> {
if let DeletionResult::WithoutStoreDeletion(v) =
self.delete_by_request_id_internal(req_id, None::<&mut dyn PoolProvider>)
{
return v;
}
panic!("unexpected deletion result");
}
/// This behaves like [delete_by_request_id] but deletes the packet from the pool as well.
pub fn delete_by_request_id_and_from_pool(
&mut self,
req_id: &RequestId,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<bool, StoreError> {
if let DeletionResult::WithStoreDeletion(v) =
self.delete_by_request_id_internal(req_id, Some(pool))
{
return v;
}
panic!("unexpected deletion result");
}
fn delete_by_request_id_internal(
&mut self,
req_id: &RequestId,
pool: Option<&mut (impl PoolProvider + ?Sized)>,
) -> DeletionResult {
let mut idx_found = None;
for time_bucket in &mut self.tc_map {
for (idx, tc_info) in time_bucket.1.iter().enumerate() {
if &tc_info.request_id == req_id {
idx_found = Some(idx);
}
}
if let Some(idx) = idx_found {
let addr = time_bucket.1.remove(idx).addr;
if let Some(pool) = pool {
return match pool.delete(addr) {
Ok(_) => DeletionResult::WithStoreDeletion(Ok(true)),
Err(e) => DeletionResult::WithStoreDeletion(Err(e)),
};
}
return DeletionResult::WithoutStoreDeletion(Some(addr));
}
}
if pool.is_none() {
DeletionResult::WithoutStoreDeletion(None)
} else {
DeletionResult::WithStoreDeletion(Ok(false))
}
}
/// Retrieve all telecommands which should be release based on the current time.
pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec<TcInfo>> {
self.tc_map.range(..=self.current_time)
@ -406,7 +471,7 @@ impl PusScheduler {
mod tests {
use super::*;
use crate::pool::{LocalPool, PoolCfg, PoolProvider, StoreAddr, StoreError};
use spacepackets::tc::PusTc;
use spacepackets::tc::{PusTc, PusTcSecondaryHeader};
use spacepackets::time::{cds, TimeWriter, UnixTimestamp};
use spacepackets::SpHeader;
use std::time::Duration;
@ -417,7 +482,7 @@ mod tests {
fn pus_tc_base(timestamp: UnixTimestamp, buf: &mut [u8]) -> (SpHeader, usize) {
let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(&timestamp).unwrap();
let len_time_stamp = cds_time.write_to_bytes(buf).unwrap();
let len_packet = base_ping_tc_simple_ctor(None)
let len_packet = base_ping_tc_simple_ctor(0, None)
.write_to_bytes(&mut buf[len_time_stamp..])
.unwrap();
(
@ -465,17 +530,18 @@ mod tests {
PusTc::new_simple(&mut sph, 11, 4, None, true)
}
fn base_ping_tc_simple_ctor(app_data: Option<&'static [u8]>) -> PusTc<'static> {
let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap();
fn base_ping_tc_simple_ctor(seq_count: u16, app_data: Option<&'static [u8]>) -> PusTc<'static> {
let mut sph = SpHeader::tc_unseg(0x02, seq_count, 0).unwrap();
PusTc::new_simple(&mut sph, 17, 1, app_data, true)
}
fn ping_tc_to_store(
pool: &mut LocalPool,
buf: &mut [u8],
seq_count: u16,
app_data: Option<&'static [u8]>,
) -> (StoreAddr, RequestId) {
let ping_tc = base_ping_tc_simple_ctor(app_data);
let ping_tc = base_ping_tc_simple_ctor(seq_count, app_data);
let ping_size = ping_tc.write_to_bytes(buf).expect("writing ping TC failed");
let first_addr = pool.add(&buf[0..ping_size]).unwrap();
(first_addr, RequestId::from_tc(&ping_tc))
@ -499,7 +565,7 @@ mod tests {
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, None);
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
@ -509,7 +575,7 @@ mod tests {
.unwrap();
let app_data = &[0, 1, 2];
let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, Some(app_data));
let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 1, Some(app_data));
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(200),
@ -518,7 +584,7 @@ mod tests {
.unwrap();
let app_data = &[0, 1, 2];
let (third_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, Some(app_data));
let (third_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 2, Some(app_data));
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(300),
@ -625,6 +691,24 @@ mod tests {
*counter += 1;
}
#[test]
fn request_id() {
let src_id_to_set = 12;
let apid_to_set = 0x22;
let seq_count = 105;
let mut sp_header = SpHeader::tc_unseg(apid_to_set, 105, 0).unwrap();
let mut sec_header = PusTcSecondaryHeader::new_simple(17, 1);
sec_header.source_id = src_id_to_set;
let ping_tc = PusTc::new(&mut sp_header, sec_header, None, true);
let req_id = RequestId::from_tc(&ping_tc);
assert_eq!(req_id.source_id(), src_id_to_set);
assert_eq!(req_id.apid(), apid_to_set);
assert_eq!(req_id.seq_count(), seq_count);
assert_eq!(
req_id.as_u64(),
((src_id_to_set as u64) << 32) | (apid_to_set as u64) << 16 | seq_count as u64
);
}
#[test]
fn release_basic() {
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
@ -632,7 +716,7 @@ mod tests {
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, None);
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
@ -641,7 +725,7 @@ mod tests {
)
.expect("insertion failed");
let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, None);
let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 1, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(200),
@ -701,7 +785,7 @@ mod tests {
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, None);
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
@ -710,7 +794,7 @@ mod tests {
)
.expect("insertion failed");
let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, None);
let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 1, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
@ -766,12 +850,7 @@ mod tests {
scheduler.disable();
let mut buf: [u8; 32] = [0; 32];
let ping_tc = base_ping_tc_simple_ctor(None);
let ping_size = ping_tc
.write_to_bytes(&mut buf)
.expect("writing ping TC failed");
let first_addr = pool.add(&buf[0..ping_size]).unwrap();
let req_id = RequestId::from_tc(&ping_tc);
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
@ -780,8 +859,7 @@ mod tests {
)
.expect("insertion failed");
let second_addr = pool.add(&buf[0..ping_size]).unwrap();
let req_id = RequestId::from_tc(&ping_tc);
let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 1, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(200),
@ -841,19 +919,21 @@ mod tests {
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut buf: [u8; 32] = [0; 32];
let len = base_ping_tc_simple_ctor(None)
.write_to_bytes(&mut buf)
.unwrap();
let (addr, _) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
let addr = scheduler
.insert_unwrapped_tc(UnixTimestamp::new_only_seconds(100), &buf[..len], &mut pool)
.insert_unwrapped_tc(
UnixTimestamp::new_only_seconds(100),
&buf[..pool.len_of_data(&addr).unwrap()],
&mut pool,
)
.unwrap();
assert!(pool.has_element_at(&addr).unwrap());
let data = pool.read(&addr).unwrap();
let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data");
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(None));
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None));
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
@ -875,7 +955,7 @@ mod tests {
let data = pool.read(&addr_vec[0]).unwrap();
let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data");
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(None));
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None));
}
#[test]
@ -899,7 +979,7 @@ mod tests {
let data = pool.read(&addr).unwrap();
let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data");
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(None));
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None));
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
@ -921,7 +1001,7 @@ mod tests {
let data = pool.read(&addr_vec[0]).unwrap();
let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data");
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(None));
assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None));
}
#[test]
@ -1044,14 +1124,12 @@ mod tests {
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let first_addr = pool.add(&[2, 2, 2]).unwrap();
let mut buf: [u8; 32] = [0; 32];
let tc = base_ping_tc_simple_ctor(None);
tc.write_to_bytes(&mut buf).unwrap();
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(first_addr, RequestId::from_tc(&tc)),
TcInfo::new(first_addr, req_id),
)
.expect("insertion failed");
@ -1084,10 +1162,7 @@ mod tests {
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let tc = base_ping_tc_simple_ctor(None);
let tc_len = tc.write_to_bytes(&mut buf).unwrap();
let first_addr = pool.add(&buf[0..tc_len]).unwrap();
let req_id = RequestId::from_tc(&tc);
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
@ -1108,6 +1183,98 @@ mod tests {
}
}
#[test]
fn test_delete_by_req_id_simple_retrieve_addr() {
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(first_addr, req_id),
)
.expect("inserting tc failed");
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
let addr = scheduler.delete_by_request_id(&req_id).unwrap();
assert!(pool.has_element_at(&addr).unwrap());
assert_eq!(addr, first_addr);
assert_eq!(scheduler.num_scheduled_telecommands(), 0);
}
#[test]
fn test_delete_by_req_id_simple_delete_all() {
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(first_addr, req_id),
)
.expect("inserting tc failed");
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
let del_res = scheduler.delete_by_request_id_and_from_pool(&req_id, &mut pool);
assert!(del_res.is_ok());
assert_eq!(del_res.unwrap(), true);
assert!(!pool.has_element_at(&first_addr).unwrap());
assert_eq!(scheduler.num_scheduled_telecommands(), 0);
}
#[test]
fn test_delete_by_req_id_complex() {
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut buf: [u8; 32] = [0; 32];
let (first_addr, req_id_0) = ping_tc_to_store(&mut pool, &mut buf, 0, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(first_addr, req_id_0),
)
.expect("inserting tc failed");
let (second_addr, req_id_1) = ping_tc_to_store(&mut pool, &mut buf, 1, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(second_addr, req_id_1),
)
.expect("inserting tc failed");
let (third_addr, req_id_2) = ping_tc_to_store(&mut pool, &mut buf, 2, None);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
TcInfo::new(third_addr, req_id_2),
)
.expect("inserting tc failed");
assert_eq!(scheduler.num_scheduled_telecommands(), 3);
// Delete first packet
let addr_0 = scheduler.delete_by_request_id(&req_id_0);
assert!(addr_0.is_some());
assert_eq!(addr_0.unwrap(), first_addr);
assert!(pool.has_element_at(&first_addr).unwrap());
assert_eq!(scheduler.num_scheduled_telecommands(), 2);
// Delete next packet
let del_res = scheduler.delete_by_request_id_and_from_pool(&req_id_2, &mut pool);
assert!(del_res.is_ok());
assert_eq!(del_res.unwrap(), true);
assert!(!pool.has_element_at(&third_addr).unwrap());
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
// Delete last packet
let addr_1 = scheduler.delete_by_request_id_and_from_pool(&req_id_1, &mut pool);
assert!(addr_1.is_ok());
assert_eq!(addr_1.unwrap(), true);
assert!(!pool.has_element_at(&second_addr).unwrap());
assert_eq!(scheduler.num_scheduled_telecommands(), 0);
}
#[test]
fn insert_full_store_test() {
let mut scheduler =