diff --git a/satrs-core/src/pool.rs b/satrs-core/src/pool.rs index f6bd013..3e276bf 100644 --- a/satrs-core/src/pool.rs +++ b/satrs-core/src/pool.rs @@ -257,6 +257,14 @@ pub trait PoolProvider { /// Delete data inside the pool given a [StoreAddr] fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError>; fn has_element_at(&self, addr: &StoreAddr) -> Result; + + /// Retrieve the length of the data at the given store address. + fn len_of_data(&self, addr: &StoreAddr) -> Result { + if !self.has_element_at(addr)? { + return Err(StoreError::DataDoesNotExist(*addr)); + } + return Ok(self.read(addr)?.len()); + } } impl LocalPool { diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index 51fa0ff..8c8dd2b 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -11,7 +11,7 @@ use spacepackets::ecss::{PusError, PusPacket}; use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc}; use spacepackets::time::cds::DaysLen24Bits; use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; -use spacepackets::{ByteConversionError, CcsdsPacket, SizeMissmatch}; +use spacepackets::CcsdsPacket; use std::collections::BTreeMap; #[cfg(feature = "std")] use std::error::Error; @@ -55,6 +55,7 @@ impl RequestId { ((self.source_id as u64) << 32) | ((self.apid as u64) << 16) | self.seq_count as u64 } + /* pub fn from_bytes(buf: &[u8]) -> Result { if buf.len() < core::mem::size_of::() { return Err(ByteConversionError::FromSliceTooSmall(SizeMissmatch { @@ -68,6 +69,7 @@ impl RequestId { seq_count: u16::from_be_bytes(buf[4..6].try_into().unwrap()), }) } + */ } #[derive(Debug, Clone, PartialEq, Eq)] @@ -142,7 +144,7 @@ impl From for ScheduleError { #[cfg(feature = "std")] impl Error for ScheduleError {} -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct TcInfo { addr: StoreAddr, @@ -185,6 +187,11 @@ pub struct PusScheduler { enabled: bool, } +enum DeletionResult { + WithoutStoreDeletion(Option), + WithStoreDeletion(Result), +} + impl PusScheduler { /// Create a new PUS scheduler. /// @@ -352,6 +359,64 @@ impl PusScheduler { self.insert_wrapped_tc::>(pus_tc, pool) } + /// Deletes a scheduled command with the given request ID. Returns the store address if a + /// scheduled command was found in the map and deleted, and None otherwise. + /// + /// Please note that this function will stop on the first telecommand with a request ID match. + /// In case of duplicate IDs (which should generally not happen), this function needs to be + /// called repeatedly. + pub fn delete_by_request_id(&mut self, req_id: &RequestId) -> Option { + if let DeletionResult::WithoutStoreDeletion(v) = + self.delete_by_request_id_internal(req_id, None::<&mut dyn PoolProvider>) + { + return v; + } + panic!("unexpected deletion result"); + } + + /// This behaves like [delete_by_request_id] but deletes the packet from the pool as well. + pub fn delete_by_request_id_and_from_pool( + &mut self, + req_id: &RequestId, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + if let DeletionResult::WithStoreDeletion(v) = + self.delete_by_request_id_internal(req_id, Some(pool)) + { + return v; + } + panic!("unexpected deletion result"); + } + + fn delete_by_request_id_internal( + &mut self, + req_id: &RequestId, + pool: Option<&mut (impl PoolProvider + ?Sized)>, + ) -> DeletionResult { + let mut idx_found = None; + for time_bucket in &mut self.tc_map { + for (idx, tc_info) in time_bucket.1.iter().enumerate() { + if &tc_info.request_id == req_id { + idx_found = Some(idx); + } + } + if let Some(idx) = idx_found { + let addr = time_bucket.1.remove(idx).addr; + if let Some(pool) = pool { + return match pool.delete(addr) { + Ok(_) => DeletionResult::WithStoreDeletion(Ok(true)), + Err(e) => DeletionResult::WithStoreDeletion(Err(e)), + }; + } + return DeletionResult::WithoutStoreDeletion(Some(addr)); + } + } + if pool.is_none() { + DeletionResult::WithoutStoreDeletion(None) + } else { + DeletionResult::WithStoreDeletion(Ok(false)) + } + } /// Retrieve all telecommands which should be release based on the current time. pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec> { self.tc_map.range(..=self.current_time) @@ -406,7 +471,7 @@ impl PusScheduler { mod tests { use super::*; use crate::pool::{LocalPool, PoolCfg, PoolProvider, StoreAddr, StoreError}; - use spacepackets::tc::PusTc; + use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::time::{cds, TimeWriter, UnixTimestamp}; use spacepackets::SpHeader; use std::time::Duration; @@ -417,7 +482,7 @@ mod tests { fn pus_tc_base(timestamp: UnixTimestamp, buf: &mut [u8]) -> (SpHeader, usize) { let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(×tamp).unwrap(); let len_time_stamp = cds_time.write_to_bytes(buf).unwrap(); - let len_packet = base_ping_tc_simple_ctor(None) + let len_packet = base_ping_tc_simple_ctor(0, None) .write_to_bytes(&mut buf[len_time_stamp..]) .unwrap(); ( @@ -465,17 +530,18 @@ mod tests { PusTc::new_simple(&mut sph, 11, 4, None, true) } - fn base_ping_tc_simple_ctor(app_data: Option<&'static [u8]>) -> PusTc<'static> { - let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap(); + fn base_ping_tc_simple_ctor(seq_count: u16, app_data: Option<&'static [u8]>) -> PusTc<'static> { + let mut sph = SpHeader::tc_unseg(0x02, seq_count, 0).unwrap(); PusTc::new_simple(&mut sph, 17, 1, app_data, true) } fn ping_tc_to_store( pool: &mut LocalPool, buf: &mut [u8], + seq_count: u16, app_data: Option<&'static [u8]>, ) -> (StoreAddr, RequestId) { - let ping_tc = base_ping_tc_simple_ctor(app_data); + let ping_tc = base_ping_tc_simple_ctor(seq_count, app_data); let ping_size = ping_tc.write_to_bytes(buf).expect("writing ping TC failed"); let first_addr = pool.add(&buf[0..ping_size]).unwrap(); (first_addr, RequestId::from_tc(&ping_tc)) @@ -499,7 +565,7 @@ mod tests { PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; - let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, None); + let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None); scheduler .insert_unwrapped_and_stored_tc( @@ -509,7 +575,7 @@ mod tests { .unwrap(); let app_data = &[0, 1, 2]; - let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, Some(app_data)); + let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 1, Some(app_data)); scheduler .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(200), @@ -518,7 +584,7 @@ mod tests { .unwrap(); let app_data = &[0, 1, 2]; - let (third_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, Some(app_data)); + let (third_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 2, Some(app_data)); scheduler .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(300), @@ -625,6 +691,24 @@ mod tests { *counter += 1; } + #[test] + fn request_id() { + let src_id_to_set = 12; + let apid_to_set = 0x22; + let seq_count = 105; + let mut sp_header = SpHeader::tc_unseg(apid_to_set, 105, 0).unwrap(); + let mut sec_header = PusTcSecondaryHeader::new_simple(17, 1); + sec_header.source_id = src_id_to_set; + let ping_tc = PusTc::new(&mut sp_header, sec_header, None, true); + let req_id = RequestId::from_tc(&ping_tc); + assert_eq!(req_id.source_id(), src_id_to_set); + assert_eq!(req_id.apid(), apid_to_set); + assert_eq!(req_id.seq_count(), seq_count); + assert_eq!( + req_id.as_u64(), + ((src_id_to_set as u64) << 32) | (apid_to_set as u64) << 16 | seq_count as u64 + ); + } #[test] fn release_basic() { let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); @@ -632,7 +716,7 @@ mod tests { PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; - let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, None); + let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None); scheduler .insert_unwrapped_and_stored_tc( @@ -641,7 +725,7 @@ mod tests { ) .expect("insertion failed"); - let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, None); + let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 1, None); scheduler .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(200), @@ -701,7 +785,7 @@ mod tests { PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; - let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, None); + let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None); scheduler .insert_unwrapped_and_stored_tc( @@ -710,7 +794,7 @@ mod tests { ) .expect("insertion failed"); - let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, None); + let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 1, None); scheduler .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(100), @@ -766,12 +850,7 @@ mod tests { scheduler.disable(); let mut buf: [u8; 32] = [0; 32]; - let ping_tc = base_ping_tc_simple_ctor(None); - let ping_size = ping_tc - .write_to_bytes(&mut buf) - .expect("writing ping TC failed"); - let first_addr = pool.add(&buf[0..ping_size]).unwrap(); - let req_id = RequestId::from_tc(&ping_tc); + let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None); scheduler .insert_unwrapped_and_stored_tc( @@ -780,8 +859,7 @@ mod tests { ) .expect("insertion failed"); - let second_addr = pool.add(&buf[0..ping_size]).unwrap(); - let req_id = RequestId::from_tc(&ping_tc); + let (second_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 1, None); scheduler .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(200), @@ -841,19 +919,21 @@ mod tests { let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut buf: [u8; 32] = [0; 32]; - let len = base_ping_tc_simple_ctor(None) - .write_to_bytes(&mut buf) - .unwrap(); + let (addr, _) = ping_tc_to_store(&mut pool, &mut buf, 0, None); let addr = scheduler - .insert_unwrapped_tc(UnixTimestamp::new_only_seconds(100), &buf[..len], &mut pool) + .insert_unwrapped_tc( + UnixTimestamp::new_only_seconds(100), + &buf[..pool.len_of_data(&addr).unwrap()], + &mut pool, + ) .unwrap(); assert!(pool.has_element_at(&addr).unwrap()); let data = pool.read(&addr).unwrap(); let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); - assert_eq!(check_tc.0, base_ping_tc_simple_ctor(None)); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); assert_eq!(scheduler.num_scheduled_telecommands(), 1); @@ -875,7 +955,7 @@ mod tests { let data = pool.read(&addr_vec[0]).unwrap(); let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); - assert_eq!(check_tc.0, base_ping_tc_simple_ctor(None)); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); } #[test] @@ -899,7 +979,7 @@ mod tests { let data = pool.read(&addr).unwrap(); let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); - assert_eq!(check_tc.0, base_ping_tc_simple_ctor(None)); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); assert_eq!(scheduler.num_scheduled_telecommands(), 1); @@ -921,7 +1001,7 @@ mod tests { let data = pool.read(&addr_vec[0]).unwrap(); let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); - assert_eq!(check_tc.0, base_ping_tc_simple_ctor(None)); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor(0, None)); } #[test] @@ -1044,14 +1124,12 @@ mod tests { let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let first_addr = pool.add(&[2, 2, 2]).unwrap(); let mut buf: [u8; 32] = [0; 32]; - let tc = base_ping_tc_simple_ctor(None); - tc.write_to_bytes(&mut buf).unwrap(); + let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None); scheduler .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(100), - TcInfo::new(first_addr, RequestId::from_tc(&tc)), + TcInfo::new(first_addr, req_id), ) .expect("insertion failed"); @@ -1084,10 +1162,7 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; - let tc = base_ping_tc_simple_ctor(None); - let tc_len = tc.write_to_bytes(&mut buf).unwrap(); - let first_addr = pool.add(&buf[0..tc_len]).unwrap(); - let req_id = RequestId::from_tc(&tc); + let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None); scheduler .insert_unwrapped_and_stored_tc( UnixTimestamp::new_only_seconds(100), @@ -1108,6 +1183,98 @@ mod tests { } } + #[test] + fn test_delete_by_req_id_simple_retrieve_addr() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut buf: [u8; 32] = [0; 32]; + let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(100), + TcInfo::new(first_addr, req_id), + ) + .expect("inserting tc failed"); + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + let addr = scheduler.delete_by_request_id(&req_id).unwrap(); + assert!(pool.has_element_at(&addr).unwrap()); + assert_eq!(addr, first_addr); + assert_eq!(scheduler.num_scheduled_telecommands(), 0); + } + + #[test] + fn test_delete_by_req_id_simple_delete_all() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut buf: [u8; 32] = [0; 32]; + let (first_addr, req_id) = ping_tc_to_store(&mut pool, &mut buf, 0, None); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(100), + TcInfo::new(first_addr, req_id), + ) + .expect("inserting tc failed"); + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + let del_res = scheduler.delete_by_request_id_and_from_pool(&req_id, &mut pool); + assert!(del_res.is_ok()); + assert_eq!(del_res.unwrap(), true); + assert!(!pool.has_element_at(&first_addr).unwrap()); + assert_eq!(scheduler.num_scheduled_telecommands(), 0); + } + + #[test] + fn test_delete_by_req_id_complex() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + let mut buf: [u8; 32] = [0; 32]; + let (first_addr, req_id_0) = ping_tc_to_store(&mut pool, &mut buf, 0, None); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(100), + TcInfo::new(first_addr, req_id_0), + ) + .expect("inserting tc failed"); + let (second_addr, req_id_1) = ping_tc_to_store(&mut pool, &mut buf, 1, None); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(100), + TcInfo::new(second_addr, req_id_1), + ) + .expect("inserting tc failed"); + let (third_addr, req_id_2) = ping_tc_to_store(&mut pool, &mut buf, 2, None); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(100), + TcInfo::new(third_addr, req_id_2), + ) + .expect("inserting tc failed"); + assert_eq!(scheduler.num_scheduled_telecommands(), 3); + + // Delete first packet + let addr_0 = scheduler.delete_by_request_id(&req_id_0); + assert!(addr_0.is_some()); + assert_eq!(addr_0.unwrap(), first_addr); + assert!(pool.has_element_at(&first_addr).unwrap()); + assert_eq!(scheduler.num_scheduled_telecommands(), 2); + + // Delete next packet + let del_res = scheduler.delete_by_request_id_and_from_pool(&req_id_2, &mut pool); + assert!(del_res.is_ok()); + assert_eq!(del_res.unwrap(), true); + assert!(!pool.has_element_at(&third_addr).unwrap()); + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + + // Delete last packet + let addr_1 = scheduler.delete_by_request_id_and_from_pool(&req_id_1, &mut pool); + assert!(addr_1.is_ok()); + assert_eq!(addr_1.unwrap(), true); + assert!(!pool.has_element_at(&second_addr).unwrap()); + assert_eq!(scheduler.num_scheduled_telecommands(), 0); + } + #[test] fn insert_full_store_test() { let mut scheduler =