From d7b8a8c1d1708f4909c53012a5fb894d0db776da Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 20 Oct 2025 12:06:18 +0200 Subject: [PATCH 1/9] start new CCSDS scheduler --- satrs-example/src/pus/scheduler.rs | 6 +- satrs/src/ccsds/mod.rs | 1 + satrs/src/ccsds/scheduler.rs | 129 ++++++++++++++++++ satrs/src/lib.rs | 1 + satrs/src/pus/scheduler.rs | 209 ++++++++++++----------------- satrs/src/pus/scheduler_srv.rs | 12 +- 6 files changed, 226 insertions(+), 132 deletions(-) create mode 100644 satrs/src/ccsds/mod.rs create mode 100644 satrs/src/ccsds/scheduler.rs diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index 49ffa64..59f18f7 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -5,7 +5,7 @@ use crate::pus::create_verification_reporter; use crate::tmtc::sender::TmTcSender; use log::info; use satrs::pool::{PoolProvider, StaticMemoryPool}; -use satrs::pus::scheduler::{PusScheduler, TcInfo}; +use satrs::pus::scheduler::{PusSchedulerAlloc, TcInfo}; use satrs::pus::scheduler_srv::PusSchedServiceHandler; use satrs::pus::verification::VerificationReporter; use satrs::pus::{ @@ -86,7 +86,7 @@ pub struct SchedulingServiceWrapper { TmTcSender, EcssTcCacher, VerificationReporter, - PusScheduler, + PusSchedulerAlloc, >, pub sched_tc_pool: StaticMemoryPool, pub releaser_buf: [u8; 4096], @@ -179,7 +179,7 @@ pub fn create_scheduler_service( pus_sched_rx: mpsc::Receiver, sched_tc_pool: StaticMemoryPool, ) -> SchedulingServiceWrapper { - let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) + let scheduler = PusSchedulerAlloc::new_with_current_init_time(Duration::from_secs(5)) .expect("Creating PUS Scheduler failed"); let pus_11_handler = PusSchedServiceHandler::new( PusServiceHelper::new( diff --git a/satrs/src/ccsds/mod.rs b/satrs/src/ccsds/mod.rs new file mode 100644 index 0000000..81b3546 --- /dev/null +++ b/satrs/src/ccsds/mod.rs @@ -0,0 +1 @@ +pub mod scheduler; diff --git a/satrs/src/ccsds/scheduler.rs b/satrs/src/ccsds/scheduler.rs new file mode 100644 index 0000000..c4fdb18 --- /dev/null +++ b/satrs/src/ccsds/scheduler.rs @@ -0,0 +1,129 @@ +use core::{hash::Hash, time::Duration}; + +#[cfg(feature = "alloc")] +pub use alloc_mod::*; +use spacepackets::{ + ByteConversionError, PacketId, PacketSequenceControl, + time::{TimestampError, UnixTime}, +}; + +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum ScheduleError { + /// The release time is within the time-margin added on top of the current time. + /// The first parameter is the current time, the second one the time margin, and the third one + /// the release time. + #[error("release time in margin")] + ReleaseTimeInTimeMargin { + current_time: UnixTime, + time_margin: Duration, + release_time: UnixTime, + }, + /// Nested time-tagged commands are not allowed. + #[error("nested scheduled tc")] + NestedScheduledTc, + #[error("tc data empty")] + TcDataEmpty, + #[error("timestamp error: {0}")] + TimestampError(#[from] TimestampError), + #[error("wrong subservice number {0}")] + WrongSubservice(u8), + #[error("wrong service number {0}")] + WrongService(u8), + #[error("byte conversion error: {0}")] + ByteConversionError(#[from] ByteConversionError), +} + +#[derive(Debug, PartialEq, Eq, Clone)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct CcsdsPacketId { + pub packet_id: PacketId, + pub psc: PacketSequenceControl, + pub crc16: u16, +} + +impl Hash for CcsdsPacketId { + fn hash(&self, state: &mut H) { + self.packet_id.hash(state); + self.psc.raw().hash(state); + self.crc16.hash(state); + } +} + +pub mod alloc_mod { + use core::time::Duration; + #[cfg(feature = "std")] + use std::time::SystemTimeError; + + use spacepackets::time::UnixTime; + + use crate::ccsds::scheduler::CcsdsPacketId; + + pub struct CcsdsScheduler { + tc_map: alloc::collections::BTreeMap< + UnixTime, + alloc::vec::Vec<(CcsdsPacketId, alloc::vec::Vec)>, + >, + packet_limit: usize, + pub(crate) current_time: UnixTime, + time_margin: Duration, + enabled: bool, + } + + impl CcsdsScheduler { + pub fn new(current_time: UnixTime, packet_limit: usize, time_margin: Duration) -> Self { + Self { + tc_map: alloc::collections::BTreeMap::new(), + packet_limit, + current_time, + time_margin, + enabled: true, + } + } + + /// Like [Self::new], but sets the `init_current_time` parameter to the current system time. + #[cfg(feature = "std")] + pub fn new_with_current_init_time( + packet_limit: usize, + time_margin: Duration, + ) -> Result { + Ok(Self::new(UnixTime::now()?, packet_limit, time_margin)) + } + + pub fn num_of_entries(&self) -> usize { + self.tc_map + .values() + .map(|v| v.iter().map(|(_, v)| v.len()).sum::()) + .sum() + } + + #[inline] + pub fn enable(&mut self) { + self.enabled = true; + } + + #[inline] + pub fn disable(&mut self) { + self.enabled = false; + } + + #[inline] + pub fn update_time(&mut self, current_time: UnixTime) { + self.current_time = current_time; + } + + #[inline] + pub fn current_time(&self) -> &UnixTime { + &self.current_time + } + + // TODO: Implementation + pub fn insert_telecommand( + &mut self, + packet_id: CcsdsPacketId, + packet: alloc::vec::Vec, + release_time: UnixTime, + ) { + } + } +} diff --git a/satrs/src/lib.rs b/satrs/src/lib.rs index 22b0cee..71547e1 100644 --- a/satrs/src/lib.rs +++ b/satrs/src/lib.rs @@ -51,6 +51,7 @@ pub mod scheduling; pub mod subsystem; pub mod time; pub mod tmtc; +pub mod ccsds; pub use spacepackets; diff --git a/satrs/src/pus/scheduler.rs b/satrs/src/pus/scheduler.rs index 6461eac..0a26d21 100644 --- a/satrs/src/pus/scheduler.rs +++ b/satrs/src/pus/scheduler.rs @@ -3,7 +3,7 @@ //! The core data structure of this module is the [PusScheduler]. This structure can be used //! to perform the scheduling of telecommands like specified in the ECSS standard. use arbitrary_int::{u11, u14}; -use core::fmt::{Debug, Display, Formatter}; +use core::fmt::Debug; use core::time::Duration; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -12,8 +12,6 @@ use spacepackets::ecss::tc::{GenericPusTcSecondaryHeader, IsPusTelecommand, PusT use spacepackets::ecss::{PusError, PusPacket, WritablePusPacket}; use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimeWriter, TimestampError, UnixTime}; use spacepackets::{ByteConversionError, CcsdsPacket}; -#[cfg(feature = "std")] -use std::error::Error; use crate::pool::{PoolError, PoolProvider}; #[cfg(feature = "alloc")] @@ -144,107 +142,39 @@ impl TimeWindow { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum ScheduleError { - PusError(PusError), + #[error("pus error: {0}")] + PusError(#[from] PusError), /// The release time is within the time-margin added on top of the current time. /// The first parameter is the current time, the second one the time margin, and the third one /// the release time. + #[error("release time in margin")] ReleaseTimeInTimeMargin { current_time: UnixTime, time_margin: Duration, release_time: UnixTime, }, /// Nested time-tagged commands are not allowed. + #[error("nested scheduled tc")] NestedScheduledTc, - StoreError(PoolError), + #[error("store error")] + Pool(#[from] PoolError), + #[error("tc data empty")] TcDataEmpty, - TimestampError(TimestampError), + #[error("timestamp error: {0}")] + TimestampError(#[from] TimestampError), + #[error("wrong subservice number {0}")] WrongSubservice(u8), + #[error("wrong service number {0}")] WrongService(u8), - ByteConversionError(ByteConversionError), -} - -impl Display for ScheduleError { - fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { - match self { - ScheduleError::PusError(e) => { - write!(f, "Pus Error: {e}") - } - ScheduleError::ReleaseTimeInTimeMargin { - current_time, - time_margin, - release_time, - } => { - write!( - f, - "time margin too short, current time: {current_time:?}, time margin: {time_margin:?}, release time: {release_time:?}" - ) - } - ScheduleError::NestedScheduledTc => { - write!(f, "nested scheduling is not allowed") - } - ScheduleError::StoreError(e) => { - write!(f, "pus scheduling: {e}") - } - ScheduleError::TcDataEmpty => { - write!(f, "empty TC data field") - } - ScheduleError::TimestampError(e) => { - write!(f, "pus scheduling: {e}") - } - ScheduleError::WrongService(srv) => { - write!(f, "pus scheduling: wrong service number {srv}") - } - ScheduleError::WrongSubservice(subsrv) => { - write!(f, "pus scheduling: wrong subservice number {subsrv}") - } - ScheduleError::ByteConversionError(e) => { - write!(f, "pus scheduling: {e}") - } - } - } -} - -impl From for ScheduleError { - fn from(e: PusError) -> Self { - Self::PusError(e) - } -} - -impl From for ScheduleError { - fn from(e: PoolError) -> Self { - Self::StoreError(e) - } -} - -impl From for ScheduleError { - fn from(e: TimestampError) -> Self { - Self::TimestampError(e) - } -} -impl From for ScheduleError { - fn from(e: ByteConversionError) -> Self { - Self::ByteConversionError(e) - } -} - -#[cfg(feature = "std")] -impl Error for ScheduleError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - match self { - ScheduleError::PusError(e) => Some(e), - ScheduleError::StoreError(e) => Some(e), - ScheduleError::TimestampError(e) => Some(e), - ScheduleError::ByteConversionError(e) => Some(e), - _ => None, - } - } + #[error("byte conversion error: {0}")] + ByteConversionError(#[from] ByteConversionError), } /// Generic trait for scheduler objects which are able to schedule ECSS PUS C packets. -pub trait PusSchedulerProvider { +pub trait PusScheduler { type TimeProvider: CcsdsTimeProvider + TimeReader; fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), PoolError>; @@ -401,7 +331,7 @@ pub mod alloc_mod { /// /// Currently, sub-schedules and groups are not supported. #[derive(Debug)] - pub struct PusScheduler { + pub struct PusSchedulerAlloc { // TODO: Use MonotonicTime from tai-time crate instead of UnixTime and cache leap seconds. // TODO: Introduce optional limit of commands stored in the TC map. If a limit is set, // there will be a check for each insertion whether the map is full, making the memory @@ -411,7 +341,8 @@ pub mod alloc_mod { time_margin: Duration, enabled: bool, } - impl PusScheduler { + + impl PusSchedulerAlloc { /// Create a new PUS scheduler. /// /// # Arguments @@ -423,7 +354,7 @@ pub mod alloc_mod { /// * `tc_buf_size` - Buffer for temporary storage of telecommand packets. This buffer /// should be large enough to accomodate the largest expected TC packets. pub fn new(init_current_time: UnixTime, time_margin: Duration) -> Self { - PusScheduler { + PusSchedulerAlloc { tc_map: Default::default(), current_time: init_current_time, time_margin, @@ -445,10 +376,12 @@ pub mod alloc_mod { num_entries } + #[inline] pub fn update_time(&mut self, current_time: UnixTime) { self.current_time = current_time; } + #[inline] pub fn current_time(&self) -> &UnixTime { &self.current_time } @@ -792,7 +725,7 @@ pub mod alloc_mod { } } - impl PusSchedulerProvider for PusScheduler { + impl PusScheduler for PusSchedulerAlloc { type TimeProvider = cds::CdsTime; /// This will disable the scheduler and clear the schedule as specified in 6.11.4.4. @@ -948,7 +881,8 @@ mod tests { #[test] fn test_enable_api() { - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); assert!(scheduler.is_enabled()); scheduler.disable(); assert!(!scheduler.is_enabled()); @@ -962,7 +896,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, u14::new(0), &[]); @@ -1004,7 +939,8 @@ mod tests { #[test] fn insert_multi_with_same_time() { - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); scheduler .insert_unwrapped_and_stored_tc( @@ -1063,7 +999,8 @@ mod tests { #[test] fn test_time_update() { - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let time = UnixTime::new(1, 2_000_000); scheduler.update_time(time); assert_eq!(scheduler.current_time(), &time); @@ -1117,7 +1054,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, u14::ZERO, &[]); @@ -1185,7 +1123,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, u14::ZERO, &[]); @@ -1245,7 +1184,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); scheduler.disable(); @@ -1310,7 +1250,8 @@ mod tests { #[test] fn insert_unwrapped_tc() { - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut pool = StaticMemoryPool::new(StaticPoolConfig::new_from_subpool_cfg_tuples( vec![(10, 32), (5, 64)], @@ -1360,7 +1301,8 @@ mod tests { #[test] fn insert_wrapped_tc() { - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut pool = StaticMemoryPool::new(StaticPoolConfig::new_from_subpool_cfg_tuples( vec![(10, 32), (5, 64)], @@ -1412,7 +1354,8 @@ mod tests { #[test] fn insert_wrong_service() { - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut pool = StaticMemoryPool::new(StaticPoolConfig::new_from_subpool_cfg_tuples( vec![(10, 32), (5, 64)], @@ -1437,7 +1380,8 @@ mod tests { #[test] fn insert_wrong_subservice() { - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut pool = StaticMemoryPool::new(StaticPoolConfig::new_from_subpool_cfg_tuples( vec![(10, 32), (5, 64)], @@ -1462,7 +1406,8 @@ mod tests { #[test] fn insert_wrapped_tc_faulty_app_data() { - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut pool = StaticMemoryPool::new(StaticPoolConfig::new_from_subpool_cfg_tuples( vec![(10, 32), (5, 64)], false, @@ -1479,7 +1424,8 @@ mod tests { #[test] fn insert_doubly_wrapped_time_tagged_cmd() { - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut pool = StaticMemoryPool::new(StaticPoolConfig::new_from_subpool_cfg_tuples( vec![(10, 32), (5, 64)], false, @@ -1497,7 +1443,7 @@ mod tests { #[test] fn test_ctor_from_current() { - let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) + let scheduler = PusSchedulerAlloc::new_with_current_init_time(Duration::from_secs(5)) .expect("creation from current time failed"); let current_time = scheduler.current_time; assert!(current_time.as_secs() > 0); @@ -1505,7 +1451,8 @@ mod tests { #[test] fn test_update_from_current() { - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); assert_eq!(scheduler.current_time.as_secs(), 0); scheduler .update_time_from_now() @@ -1515,7 +1462,8 @@ mod tests { #[test] fn release_time_within_time_margin() { - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut pool = StaticMemoryPool::new(StaticPoolConfig::new_from_subpool_cfg_tuples( vec![(10, 32), (5, 64)], @@ -1548,7 +1496,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, u14::ZERO, &[]); scheduler @@ -1585,7 +1534,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, u14::ZERO, &[]); scheduler @@ -1611,7 +1561,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, u14::ZERO, &[]); scheduler @@ -1632,7 +1583,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, u14::ZERO, &[]); scheduler @@ -1653,7 +1605,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, u14::ZERO, &[]); scheduler @@ -1695,7 +1648,8 @@ mod tests { #[test] fn insert_full_store_test() { - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut pool = StaticMemoryPool::new(StaticPoolConfig::new_from_subpool_cfg_tuples( vec![(1, 64)], @@ -1711,7 +1665,7 @@ mod tests { assert!(insert_res.is_err()); let err = insert_res.unwrap_err(); match err { - ScheduleError::StoreError(e) => match e { + ScheduleError::Pool(e) => match e { PoolError::StoreFull(_) => {} _ => panic!("unexpected store error {e}"), }, @@ -1721,7 +1675,7 @@ mod tests { fn insert_command_with_release_time( pool: &mut StaticMemoryPool, - scheduler: &mut PusScheduler, + scheduler: &mut PusSchedulerAlloc, seq_count: u14, release_secs: u64, ) -> TcInfo { @@ -1740,7 +1694,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let tc_info_0 = insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 50); let tc_info_1 = insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 100); assert_eq!(scheduler.num_scheduled_telecommands(), 2); @@ -1772,7 +1727,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let _ = insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 50); let tc_info_1 = insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 100); let tc_info_2 = insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 150); @@ -1807,7 +1763,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let tc_info_0 = insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 50); let tc_info_1 = insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 100); let _ = insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 150); @@ -1842,7 +1799,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let _ = insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 50); let tc_info_1 = insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 100); let tc_info_2 = insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 150); @@ -1883,7 +1841,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 50); insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 100); assert_eq!(scheduler.num_scheduled_telecommands(), 2); @@ -1912,7 +1871,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 50); let cmd_0_to_delete = insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 100); @@ -1939,7 +1899,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let cmd_0_to_delete = insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 50); let cmd_1_to_delete = @@ -1967,7 +1928,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let cmd_out_of_range_0 = insert_command_with_release_time(&mut pool, &mut scheduler, u14::ZERO, 50); let cmd_0_to_delete = @@ -2005,7 +1967,8 @@ mod tests { vec![(10, 32), (5, 64)], false, )); - let mut scheduler = PusScheduler::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); + let mut scheduler = + PusSchedulerAlloc::new(UnixTime::new_only_secs(0), Duration::from_secs(5)); let mut buf: [u8; 32] = [0; 32]; let tc_info_0 = ping_tc_to_store(&mut pool, &mut buf, u14::ZERO, &[]); diff --git a/satrs/src/pus/scheduler_srv.rs b/satrs/src/pus/scheduler_srv.rs index 8c1a7b7..0eb295f 100644 --- a/satrs/src/pus/scheduler_srv.rs +++ b/satrs/src/pus/scheduler_srv.rs @@ -1,4 +1,4 @@ -use super::scheduler::PusSchedulerProvider; +use super::scheduler::PusScheduler; use super::verification::{VerificationReporter, VerificationReportingProvider}; use super::{ CacheAndReadRawEcssTc, DirectPusPacketHandlerResult, EcssTcInSharedPoolCacher, EcssTcReceiver, @@ -26,11 +26,11 @@ pub struct PusSchedServiceHandler< TmSender: EcssTmSender, TcInMemConverter: CacheAndReadRawEcssTc, VerificationReporter: VerificationReportingProvider, - PusScheduler: PusSchedulerProvider, + PusSchedulerInstance: PusScheduler, > { pub service_helper: PusServiceHelper, - scheduler: PusScheduler, + scheduler: PusSchedulerInstance, } impl< @@ -38,7 +38,7 @@ impl< TmSender: EcssTmSender, TcInMemConverter: CacheAndReadRawEcssTc, VerificationReporter: VerificationReportingProvider, - Scheduler: PusSchedulerProvider, + Scheduler: PusScheduler, > PusSchedServiceHandler { pub fn new( @@ -254,7 +254,7 @@ mod tests { use crate::pus::{DirectPusPacketHandlerResult, MpscTcReceiver, PusPacketHandlingError}; use crate::pus::{ EcssTcInSharedPoolCacher, - scheduler::{self, PusSchedulerProvider, TcInfo}, + scheduler::{self, PusScheduler, TcInfo}, tests::PusServiceHandlerWithSharedStoreCommon, verification::{RequestId, TcStateAccepted, VerificationToken}, }; @@ -349,7 +349,7 @@ mod tests { inserted_tcs: VecDeque, } - impl PusSchedulerProvider for TestScheduler { + impl PusScheduler for TestScheduler { type TimeProvider = cds::CdsTime; fn reset( -- 2.43.0 From f4aff4780a791c1111ed735d914e183ffe352f5b Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 17 Nov 2025 11:34:27 +0100 Subject: [PATCH 2/9] continue CCSDS scheduler --- justfile | 6 ++-- satrs/src/ccsds/scheduler.rs | 46 +++++++++++++++++++--------- satrs/src/lib.rs | 2 +- satrs/src/pool.rs | 59 ++++++------------------------------ 4 files changed, 46 insertions(+), 67 deletions(-) diff --git a/justfile b/justfile index 5e00fa6..f61f920 100644 --- a/justfile +++ b/justfile @@ -14,12 +14,12 @@ test: embedded: cargo check -p satrs --target=thumbv7em-none-eabihf --no-default-features -fmt: - cargo fmt --all - check-fmt: cargo fmt --all -- --check +fmt: + cargo fmt --all + clippy: cargo clippy -- -D warnings diff --git a/satrs/src/ccsds/scheduler.rs b/satrs/src/ccsds/scheduler.rs index c4fdb18..b8d79e7 100644 --- a/satrs/src/ccsds/scheduler.rs +++ b/satrs/src/ccsds/scheduler.rs @@ -3,7 +3,7 @@ use core::{hash::Hash, time::Duration}; #[cfg(feature = "alloc")] pub use alloc_mod::*; use spacepackets::{ - ByteConversionError, PacketId, PacketSequenceControl, + ByteConversionError, CcsdsPacketIdAndPsc, time::{TimestampError, UnixTime}, }; @@ -24,6 +24,8 @@ pub enum ScheduleError { NestedScheduledTc, #[error("tc data empty")] TcDataEmpty, + #[error("scheduler is full")] + Full, #[error("timestamp error: {0}")] TimestampError(#[from] TimestampError), #[error("wrong subservice number {0}")] @@ -36,33 +38,32 @@ pub enum ScheduleError { #[derive(Debug, PartialEq, Eq, Clone)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct CcsdsPacketId { - pub packet_id: PacketId, - pub psc: PacketSequenceControl, - pub crc16: u16, +pub struct CcsdsSchedulePacketId { + pub base: CcsdsPacketIdAndPsc, + pub crc16: Option, } -impl Hash for CcsdsPacketId { +impl Hash for CcsdsSchedulePacketId { fn hash(&self, state: &mut H) { - self.packet_id.hash(state); - self.psc.raw().hash(state); + self.base.hash(state); self.crc16.hash(state); } } +#[cfg(feature = "alloc")] pub mod alloc_mod { use core::time::Duration; #[cfg(feature = "std")] use std::time::SystemTimeError; - use spacepackets::time::UnixTime; + use spacepackets::{CcsdsPacketIdAndPsc, CcsdsPacketReader, time::UnixTime}; - use crate::ccsds::scheduler::CcsdsPacketId; + use crate::ccsds::scheduler::CcsdsSchedulePacketId; pub struct CcsdsScheduler { tc_map: alloc::collections::BTreeMap< UnixTime, - alloc::vec::Vec<(CcsdsPacketId, alloc::vec::Vec)>, + alloc::vec::Vec<(CcsdsSchedulePacketId, alloc::vec::Vec)>, >, packet_limit: usize, pub(crate) current_time: UnixTime, @@ -117,13 +118,30 @@ pub mod alloc_mod { &self.current_time } + pub fn insert_telecommand_with_reader( + &mut self, + reader: &CcsdsPacketReader, + release_time: UnixTime, + ) -> Result<(), super::ScheduleError> { + if self.num_of_entries() + 1 >= self.packet_limit { + return Err(super::ScheduleError::Full); + } + let base_id = CcsdsPacketIdAndPsc::new_from_ccsds_packet(reader); + + Ok(()) + } + // TODO: Implementation pub fn insert_telecommand( &mut self, - packet_id: CcsdsPacketId, - packet: alloc::vec::Vec, + packet_id: CcsdsSchedulePacketId, + raw_packet: &[u8], release_time: UnixTime, - ) { + ) -> Result<(), super::ScheduleError> { + if self.num_of_entries() + 1 >= self.packet_limit { + return Err(super::ScheduleError::Full); + } + Ok(()) } } } diff --git a/satrs/src/lib.rs b/satrs/src/lib.rs index 5930f68..5878bd6 100644 --- a/satrs/src/lib.rs +++ b/satrs/src/lib.rs @@ -23,6 +23,7 @@ extern crate downcast_rs; extern crate std; pub mod action; +pub mod ccsds; #[cfg(feature = "alloc")] pub mod dev_mgmt; pub mod encoding; @@ -51,7 +52,6 @@ pub mod scheduling; pub mod subsystem; pub mod time; pub mod tmtc; -pub mod ccsds; pub use spacepackets; diff --git a/satrs/src/pool.rs b/satrs/src/pool.rs index c0c3c09..ac86781 100644 --- a/satrs/src/pool.rs +++ b/satrs/src/pool.rs @@ -155,73 +155,34 @@ impl Display for StoreIdError { #[cfg(feature = "std")] impl Error for StoreIdError {} -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum PoolError { /// Requested data block is too large + #[error("data to store with size {0} is too large")] DataTooLarge(usize), /// The store is full. Contains the index of the full subpool + #[error("store does not have any capacity")] StoreFull(u16), /// The store can not hold any data. + #[error("store does not have any capacity")] NoCapacity, /// Store ID is invalid. This also includes partial errors where only the subpool is invalid + #[error("invalid store ID: {0}, address: {1:?}")] InvalidStoreId(StoreIdError, Option), /// Valid subpool and packet index, but no data is stored at the given address + #[error("no data exists at address {0:?}")] DataDoesNotExist(PoolAddr), - ByteConversionError(spacepackets::ByteConversionError), + #[error("byte conversion error: {0}")] + ByteConversion(#[from] spacepackets::ByteConversionError), + #[error("lock error")] LockError, /// Internal or configuration errors + #[error("lock error")] InternalError(u32), } -impl Display for PoolError { - fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { - match self { - PoolError::DataTooLarge(size) => { - write!(f, "data to store with size {size} is too large") - } - PoolError::NoCapacity => { - write!(f, "store does not have any capacity") - } - PoolError::StoreFull(u16) => { - write!(f, "store is too full. index for full subpool: {u16}") - } - PoolError::InvalidStoreId(id_e, addr) => { - write!(f, "invalid store ID: {id_e}, address: {addr:?}") - } - PoolError::DataDoesNotExist(addr) => { - write!(f, "no data exists at address {addr:?}") - } - PoolError::InternalError(e) => { - write!(f, "internal error: {e}") - } - PoolError::ByteConversionError(e) => { - write!(f, "store error: {e}") - } - PoolError::LockError => { - write!(f, "lock error") - } - } - } -} - -impl From for PoolError { - fn from(value: ByteConversionError) -> Self { - Self::ByteConversionError(value) - } -} - -#[cfg(feature = "std")] -impl Error for PoolError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - if let PoolError::InvalidStoreId(e, _) = self { - return Some(e); - } - None - } -} - /// Generic trait for pool providers which provide memory pools for variable sized packet data. /// /// It specifies a basic API to [Self::add], [Self::modify], [Self::read] and [Self::delete] data -- 2.43.0 From c5efea008f8def569f94543b183eb7e631d27000 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 20 Nov 2025 15:16:25 +0100 Subject: [PATCH 3/9] finished very basic implementation --- satrs/Cargo.toml | 2 +- satrs/src/ccsds/scheduler.rs | 187 +++++++++++++++++++++++++++++++---- 2 files changed, 167 insertions(+), 22 deletions(-) diff --git a/satrs/Cargo.toml b/satrs/Cargo.toml index f5f290f..aa35b0d 100644 --- a/satrs/Cargo.toml +++ b/satrs/Cargo.toml @@ -14,7 +14,7 @@ categories = ["aerospace", "aerospace::space-protocols", "no-std", "hardware-sup [dependencies] satrs-shared = { version = "0.2", path = "../satrs-shared" } -spacepackets = { version = "0.17", default-features = false } +spacepackets = { version = "0.17", git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git", default-features = false } delegate = "0.13" paste = "1" diff --git a/satrs/src/ccsds/scheduler.rs b/satrs/src/ccsds/scheduler.rs index b8d79e7..ab4035e 100644 --- a/satrs/src/ccsds/scheduler.rs +++ b/satrs/src/ccsds/scheduler.rs @@ -1,45 +1,58 @@ +//! # CCSDS Telecommand Scheduler. +#![deny(missing_docs)] use core::{hash::Hash, time::Duration}; #[cfg(feature = "alloc")] pub use alloc_mod::*; use spacepackets::{ - ByteConversionError, CcsdsPacketIdAndPsc, + CcsdsPacketIdAndPsc, time::{TimestampError, UnixTime}, }; +/// Generic CCSDS scheduling errors. #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum ScheduleError { /// The release time is within the time-margin added on top of the current time. /// The first parameter is the current time, the second one the time margin, and the third one /// the release time. #[error("release time in margin")] ReleaseTimeInTimeMargin { + /// Current time. current_time: UnixTime, + /// Configured time margin. time_margin: Duration, + /// Release time. release_time: UnixTime, }, /// Nested time-tagged commands are not allowed. #[error("nested scheduled tc")] NestedScheduledTc, + /// TC data is empty. #[error("tc data empty")] TcDataEmpty, - #[error("scheduler is full")] - Full, + /// Scheduler is full, packet number limit reached. + #[error("scheduler is full, packet number limit reached")] + PacketLimitReached, + /// Scheduler is full, numver of bytes limit reached. + #[error("scheduler is full, number of bytes limit reached")] + ByteLimitReached, + /// Timestamp error. #[error("timestamp error: {0}")] TimestampError(#[from] TimestampError), - #[error("wrong subservice number {0}")] - WrongSubservice(u8), - #[error("wrong service number {0}")] - WrongService(u8), - #[error("byte conversion error: {0}")] - ByteConversionError(#[from] ByteConversionError), } +/// Packet ID used for identifying scheduled packets. +/// +/// Right now, this ID can be determined from the packet without requiring external input +/// or custom data fields in the CCSDS space pacekt. #[derive(Debug, PartialEq, Eq, Clone)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct CcsdsSchedulePacketId { + /// Base ID. pub base: CcsdsPacketIdAndPsc, + /// Optional checksum of the packet. pub crc16: Option, } @@ -50,32 +63,72 @@ impl Hash for CcsdsSchedulePacketId { } } +/// Modules requiring [alloc] support. #[cfg(feature = "alloc")] pub mod alloc_mod { use core::time::Duration; #[cfg(feature = "std")] use std::time::SystemTimeError; + use alloc::collections::btree_map; use spacepackets::{CcsdsPacketIdAndPsc, CcsdsPacketReader, time::UnixTime}; use crate::ccsds::scheduler::CcsdsSchedulePacketId; + /// The scheduler can be configured to have bounds for both the number of packets + /// and the total number of bytes used by scheduled packets. + /// + /// This can be used to avoid memory exhaustion in systems with limited resources or under + /// heavy workloads. + #[derive(Default, Debug)] + #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] + #[cfg_attr(feature = "defmt", derive(defmt::Format))] + pub struct Limits { + /// Maximum number of scheduled packets. + pub packets: Option, + /// Maximum total number of bytes used by scheduled packets. + pub bytes: Option, + } + + impl Limits { + /// Check if no limits are set. + pub fn has_no_limits(&self) -> bool { + self.packets.is_none() || self.bytes.is_none() + } + } + + /// Fill count of the scheduler. + #[derive(Default, Debug)] + #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] + #[cfg_attr(feature = "defmt", derive(defmt::Format))] + pub struct FillCount { + /// Number of scheduled packets. + pub packets: usize, + /// Total number of bytes used by scheduled packets. + pub bytes: usize, + } + + /// Simple CCSDS scheduler implementation. + /// + /// Relies of [alloc] support but limits the number of scheduled packets. + #[derive(Debug)] pub struct CcsdsScheduler { tc_map: alloc::collections::BTreeMap< UnixTime, alloc::vec::Vec<(CcsdsSchedulePacketId, alloc::vec::Vec)>, >, - packet_limit: usize, + limits: Limits, pub(crate) current_time: UnixTime, time_margin: Duration, enabled: bool, } impl CcsdsScheduler { - pub fn new(current_time: UnixTime, packet_limit: usize, time_margin: Duration) -> Self { + /// Create a new CCSDS scheduler. + pub fn new(current_time: UnixTime, limits: Limits, time_margin: Duration) -> Self { Self { tc_map: alloc::collections::BTreeMap::new(), - packet_limit, + limits, current_time, time_margin, enabled: true, @@ -85,12 +138,28 @@ pub mod alloc_mod { /// Like [Self::new], but sets the `init_current_time` parameter to the current system time. #[cfg(feature = "std")] pub fn new_with_current_init_time( - packet_limit: usize, + limits: Limits, time_margin: Duration, ) -> Result { - Ok(Self::new(UnixTime::now()?, packet_limit, time_margin)) + Ok(Self::new(UnixTime::now()?, limits, time_margin)) } + /// Current fill count: number of scheduled packets and total number of bytes. + /// + /// The first returned value is the number of scheduled packets, the second one is the + /// byte count. + pub fn current_fill_count(&self) -> FillCount { + let mut fill_count = FillCount::default(); + for value in self.tc_map.values() { + for (_, raw_scheduled_tc) in value { + fill_count.packets += 1; + fill_count.bytes += raw_scheduled_tc.len(); + } + } + fill_count + } + + /// Current number of scheduled entries. pub fn num_of_entries(&self) -> usize { self.tc_map .values() @@ -98,50 +167,126 @@ pub mod alloc_mod { .sum() } + /// Enable the scheduler. #[inline] pub fn enable(&mut self) { self.enabled = true; } + /// Disable the scheduler. #[inline] pub fn disable(&mut self) { self.enabled = false; } + /// Update the current time. #[inline] pub fn update_time(&mut self, current_time: UnixTime) { self.current_time = current_time; } + /// Current time. #[inline] pub fn current_time(&self) -> &UnixTime { &self.current_time } + fn common_check( + &mut self, + release_time: UnixTime, + packet_size: usize, + ) -> Result<(), super::ScheduleError> { + if !self.limits.has_no_limits() { + let fill_count = self.current_fill_count(); + if let Some(max_bytes) = self.limits.bytes { + if fill_count.bytes + packet_size >= max_bytes { + return Err(super::ScheduleError::ByteLimitReached); + } + } + if let Some(max_packets) = self.limits.packets { + if fill_count.packets + 1 >= max_packets { + return Err(super::ScheduleError::PacketLimitReached); + } + } + } + if release_time < self.current_time + self.time_margin { + return Err(super::ScheduleError::ReleaseTimeInTimeMargin { + current_time: self.current_time, + time_margin: self.time_margin, + release_time, + }); + } + Ok(()) + } + + /// Insert a telecommand using an existing [CcsdsPacketReader]. pub fn insert_telecommand_with_reader( &mut self, reader: &CcsdsPacketReader, release_time: UnixTime, ) -> Result<(), super::ScheduleError> { - if self.num_of_entries() + 1 >= self.packet_limit { - return Err(super::ScheduleError::Full); - } + self.common_check(release_time, reader.packet_len())?; let base_id = CcsdsPacketIdAndPsc::new_from_ccsds_packet(reader); + let checksum = reader.checksum(); + let packet_id_scheduling = CcsdsSchedulePacketId { + base: base_id, + crc16: checksum, + }; + self.insert_telecommand(packet_id_scheduling, reader.raw_data(), release_time)?; Ok(()) } - // TODO: Implementation + /// Insert a raw telecommand, assuming the user has already extracted the + /// [CcsdsSchedulePacketId] pub fn insert_telecommand( &mut self, - packet_id: CcsdsSchedulePacketId, + packet_id_scheduling: CcsdsSchedulePacketId, raw_packet: &[u8], release_time: UnixTime, ) -> Result<(), super::ScheduleError> { - if self.num_of_entries() + 1 >= self.packet_limit { - return Err(super::ScheduleError::Full); + self.common_check(release_time, raw_packet.len())?; + match self.tc_map.entry(release_time) { + btree_map::Entry::Vacant(e) => { + e.insert(alloc::vec![(packet_id_scheduling, raw_packet.to_vec())]); + } + btree_map::Entry::Occupied(mut v) => { + v.get_mut() + .push((packet_id_scheduling, raw_packet.to_vec())); + } } Ok(()) } + + /// Release all telecommands which should be released based on the current time. + pub fn release_telecommands( + &mut self, + mut releaser: R, + ) { + let tcs_to_release = self.telecommands_to_release(); + for tc_group in tcs_to_release { + for (packet_id, raw_tc) in tc_group.1 { + releaser(self.enabled, packet_id, raw_tc); + } + } + self.tc_map.retain(|k, _| k > &self.current_time); + } + + /// Retrieve all telecommands which should be released based on the current time. + pub fn telecommands_to_release( + &self, + ) -> btree_map::Range< + '_, + UnixTime, + alloc::vec::Vec<(CcsdsSchedulePacketId, alloc::vec::Vec)>, + > { + self.tc_map.range(..=self.current_time) + } } } + +#[cfg(test)] +mod tests { + #[test] + fn test_basic() {} +} -- 2.43.0 From ee6dc1a39e367555b7d005d5c69ff7aa1ada6465 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 20 Nov 2025 15:17:01 +0100 Subject: [PATCH 4/9] add clear method --- satrs/src/ccsds/scheduler.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/satrs/src/ccsds/scheduler.rs b/satrs/src/ccsds/scheduler.rs index ab4035e..948596c 100644 --- a/satrs/src/ccsds/scheduler.rs +++ b/satrs/src/ccsds/scheduler.rs @@ -282,6 +282,11 @@ pub mod alloc_mod { > { self.tc_map.range(..=self.current_time) } + + /// Completely clear the scheduler. + pub fn clear(&mut self) { + self.tc_map.clear(); + } } } -- 2.43.0 From fb117b748923a8b29975703a922031c520dee695 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 26 Nov 2025 17:01:17 +0100 Subject: [PATCH 5/9] add basic tests --- satrs/src/ccsds/scheduler.rs | 325 ++++++++++++++++++++++++++++++++++- 1 file changed, 317 insertions(+), 8 deletions(-) diff --git a/satrs/src/ccsds/scheduler.rs b/satrs/src/ccsds/scheduler.rs index 948596c..9ec5a2c 100644 --- a/satrs/src/ccsds/scheduler.rs +++ b/satrs/src/ccsds/scheduler.rs @@ -56,6 +56,16 @@ pub struct CcsdsSchedulePacketId { pub crc16: Option, } +impl CcsdsSchedulePacketId { + /// Create a new CCSDS scheduling packet ID. + pub const fn new(base: CcsdsPacketIdAndPsc, checksum: Option) -> Self { + Self { + base, + crc16: checksum, + } + } +} + impl Hash for CcsdsSchedulePacketId { fn hash(&self, state: &mut H) { self.base.hash(state); @@ -91,9 +101,14 @@ pub mod alloc_mod { } impl Limits { + /// Create new limits for the CCSDS scheduler. + pub const fn new(packets: Option, bytes: Option) -> Self { + Self { packets, bytes } + } + /// Check if no limits are set. pub fn has_no_limits(&self) -> bool { - self.packets.is_none() || self.bytes.is_none() + self.packets.is_none() && self.bytes.is_none() } } @@ -161,10 +176,7 @@ pub mod alloc_mod { /// Current number of scheduled entries. pub fn num_of_entries(&self) -> usize { - self.tc_map - .values() - .map(|v| v.iter().map(|(_, v)| v.len()).sum::()) - .sum() + self.current_fill_count().packets } /// Enable the scheduler. @@ -199,12 +211,12 @@ pub mod alloc_mod { if !self.limits.has_no_limits() { let fill_count = self.current_fill_count(); if let Some(max_bytes) = self.limits.bytes { - if fill_count.bytes + packet_size >= max_bytes { + if fill_count.bytes + packet_size > max_bytes { return Err(super::ScheduleError::ByteLimitReached); } } if let Some(max_packets) = self.limits.packets { - if fill_count.packets + 1 >= max_packets { + if fill_count.packets + 1 > max_packets { return Err(super::ScheduleError::PacketLimitReached); } } @@ -292,6 +304,303 @@ pub mod alloc_mod { #[cfg(test)] mod tests { + use arbitrary_int::u11; + use spacepackets::{ + CcsdsPacketCreatorOwned, CcsdsPacketReader, ChecksumType, SpacePacketHeader, + }; + + use super::*; + + fn test_tc(app_data: &[u8]) -> CcsdsPacketCreatorOwned { + CcsdsPacketCreatorOwned::new( + SpacePacketHeader::new_from_apid(u11::new(0x1)), + spacepackets::PacketType::Tc, + app_data, + Some(ChecksumType::WithCrc16), + ) + .unwrap() + } + #[test] - fn test_basic() {} + fn test_basic() { + let unix_time = UnixTime::new(0, 0); + let mut scheduler = CcsdsScheduler::new( + unix_time, + Limits::new(Some(100), Some(1024)), + Duration::from_millis(5000), + ); + assert_eq!(scheduler.current_fill_count().packets, 0); + assert_eq!(scheduler.current_fill_count().bytes, 0); + assert_eq!(scheduler.num_of_entries(), 0); + assert_eq!( + scheduler + .telecommands_to_release() + .collect::>() + .len(), + 0 + ); + assert_eq!(scheduler.current_time(), &unix_time); + scheduler.release_telecommands(|_, _, _| { + panic!("should not be called"); + }); + } + + #[test] + fn test_clear() { + let unix_time = UnixTime::new(0, 0); + let mut scheduler = CcsdsScheduler::new( + unix_time, + Limits::new(Some(100), Some(1024)), + Duration::from_millis(1000), + ); + let test_tc = test_tc(&[1, 2, 3]); + let test_tc_raw = test_tc.to_vec(); + let reader = CcsdsPacketReader::new(&test_tc_raw, Some(ChecksumType::WithCrc16)).unwrap(); + scheduler + .insert_telecommand_with_reader(&reader, UnixTime::new(2, 0)) + .unwrap(); + assert_eq!(scheduler.current_fill_count().packets, 1); + assert_eq!(scheduler.current_fill_count().bytes, test_tc_raw.len()); + assert_eq!(scheduler.num_of_entries(), 1); + assert_eq!( + scheduler + .telecommands_to_release() + .collect::>() + .len(), + 0 + ); + scheduler.clear(); + assert_eq!(scheduler.current_fill_count().packets, 0); + assert_eq!(scheduler.current_fill_count().bytes, 0); + assert_eq!(scheduler.num_of_entries(), 0); + assert_eq!( + scheduler + .telecommands_to_release() + .collect::>() + .len(), + 0 + ); + } + + #[test] + fn insert_and_release_one() { + let unix_time = UnixTime::new(0, 0); + let mut scheduler = CcsdsScheduler::new( + unix_time, + Limits::new(Some(100), Some(1024)), + Duration::from_millis(1000), + ); + let test_tc = test_tc(&[1, 2, 3]); + let tc_id = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&test_tc); + let test_tc_raw = test_tc.to_vec(); + let reader = CcsdsPacketReader::new(&test_tc_raw, Some(ChecksumType::WithCrc16)).unwrap(); + let checksum = reader.checksum(); + scheduler + .insert_telecommand_with_reader(&reader, UnixTime::new(2, 0)) + .unwrap(); + assert_eq!(scheduler.current_fill_count().packets, 1); + assert_eq!(scheduler.current_fill_count().bytes, test_tc_raw.len()); + assert_eq!(scheduler.num_of_entries(), 1); + assert_eq!( + scheduler + .telecommands_to_release() + .collect::>() + .len(), + 0 + ); + scheduler.release_telecommands(|_, _, _| { + panic!("should not be called"); + }); + scheduler.update_time(UnixTime::new(3, 0)); + assert_eq!( + scheduler + .telecommands_to_release() + .collect::>() + .len(), + 1 + ); + scheduler.release_telecommands(|enabled, tc_id_scheduled, tc_raw| { + assert!(enabled); + assert_eq!(tc_id, tc_id_scheduled.base); + assert_eq!(checksum, tc_id_scheduled.crc16); + assert_eq!(tc_raw, test_tc_raw); + }); + assert_eq!( + scheduler + .telecommands_to_release() + .collect::>() + .len(), + 0 + ); + } + + #[test] + fn insert_and_release_multi() { + let unix_time = UnixTime::new(0, 0); + let mut scheduler = CcsdsScheduler::new( + unix_time, + Limits::new(Some(100), Some(1024)), + Duration::from_millis(1000), + ); + let test_tc_0 = test_tc(&[42]); + let test_tc_1 = test_tc(&[1, 2, 3]); + let tc_id_0 = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&test_tc_0); + let tc_id_1 = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&test_tc_1); + let test_tc_0_raw = test_tc_0.to_vec(); + let test_tc_1_raw = test_tc_1.to_vec(); + let reader_0 = + CcsdsPacketReader::new(&test_tc_0_raw, Some(ChecksumType::WithCrc16)).unwrap(); + let reader_1 = + CcsdsPacketReader::new(&test_tc_1_raw, Some(ChecksumType::WithCrc16)).unwrap(); + scheduler + .insert_telecommand_with_reader(&reader_0, UnixTime::new(2, 0)) + .unwrap(); + scheduler + .insert_telecommand( + CcsdsSchedulePacketId::new(tc_id_1, reader_1.checksum()), + &test_tc_1_raw, + UnixTime::new(5, 0), + ) + .unwrap(); + assert_eq!(scheduler.current_fill_count().packets, 2); + assert_eq!( + scheduler.current_fill_count().bytes, + test_tc_0_raw.len() + test_tc_1_raw.len() + ); + assert_eq!(scheduler.num_of_entries(), 2); + assert_eq!( + scheduler + .telecommands_to_release() + .collect::>() + .len(), + 0 + ); + scheduler.release_telecommands(|_, _, _| { + panic!("should not be called"); + }); + + // Release first TC. + scheduler.update_time(UnixTime::new(3, 0)); + assert_eq!( + scheduler + .telecommands_to_release() + .collect::>() + .len(), + 1 + ); + scheduler.release_telecommands(|enabled, tc_id_scheduled, tc_raw| { + assert!(enabled); + assert_eq!(tc_id_0, tc_id_scheduled.base); + assert_eq!(reader_0.checksum(), tc_id_scheduled.crc16); + assert_eq!(tc_raw, test_tc_0_raw); + }); + assert_eq!( + scheduler + .telecommands_to_release() + .collect::>() + .len(), + 0 + ); + assert_eq!(scheduler.current_fill_count().packets, 1); + assert_eq!(scheduler.current_fill_count().bytes, test_tc_1_raw.len()); + assert_eq!(scheduler.num_of_entries(), 1); + + // Release second TC. + scheduler.update_time(UnixTime::new(6, 0)); + assert_eq!( + scheduler + .telecommands_to_release() + .collect::>() + .len(), + 1 + ); + scheduler.release_telecommands(|enabled, tc_id_scheduled, tc_raw| { + assert!(enabled); + assert_eq!(tc_id_1, tc_id_scheduled.base); + assert_eq!(reader_1.checksum(), tc_id_scheduled.crc16); + assert_eq!(tc_raw, test_tc_1_raw); + }); + assert_eq!( + scheduler + .telecommands_to_release() + .collect::>() + .len(), + 0 + ); + assert_eq!(scheduler.current_fill_count().packets, 0); + assert_eq!(scheduler.current_fill_count().bytes, 0); + assert_eq!(scheduler.num_of_entries(), 0); + } + + #[test] + fn test_packet_limit_reached() { + let unix_time = UnixTime::new(0, 0); + let mut scheduler = CcsdsScheduler::new( + unix_time, + Limits::new(Some(3), None), + Duration::from_millis(1000), + ); + let test_tc_0 = test_tc(&[42]); + let test_tc_0_raw = test_tc_0.to_vec(); + let reader = CcsdsPacketReader::new(&test_tc_0_raw, Some(ChecksumType::WithCrc16)).unwrap(); + let tc_id = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&test_tc_0); + scheduler + .insert_telecommand_with_reader(&reader, UnixTime::new(2, 0)) + .unwrap(); + scheduler + .insert_telecommand_with_reader(&reader, UnixTime::new(2, 0)) + .unwrap(); + scheduler + .insert_telecommand_with_reader(&reader, UnixTime::new(2, 0)) + .unwrap(); + assert_eq!(scheduler.current_fill_count().packets, 3); + assert_eq!( + scheduler.insert_telecommand_with_reader(&reader, UnixTime::new(2, 0)), + Err(ScheduleError::PacketLimitReached) + ); + assert_eq!( + scheduler.insert_telecommand( + CcsdsSchedulePacketId::new(tc_id, reader.checksum()), + &test_tc_0_raw, + UnixTime::new(2, 0) + ), + Err(ScheduleError::PacketLimitReached) + ); + } + + #[test] + fn test_byte_limit_reached() { + let unix_time = UnixTime::new(0, 0); + let test_tc_0 = test_tc(&[42]); + let mut scheduler = CcsdsScheduler::new( + unix_time, + Limits::new(None, Some(test_tc_0.len_written() * 3)), + Duration::from_millis(1000), + ); + let test_tc_0_raw = test_tc_0.to_vec(); + let reader = CcsdsPacketReader::new(&test_tc_0_raw, Some(ChecksumType::WithCrc16)).unwrap(); + let tc_id = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&test_tc_0); + scheduler + .insert_telecommand_with_reader(&reader, UnixTime::new(2, 0)) + .unwrap(); + scheduler + .insert_telecommand_with_reader(&reader, UnixTime::new(2, 0)) + .unwrap(); + scheduler + .insert_telecommand_with_reader(&reader, UnixTime::new(2, 0)) + .unwrap(); + assert_eq!(scheduler.current_fill_count().packets, 3); + assert_eq!( + scheduler.insert_telecommand_with_reader(&reader, UnixTime::new(2, 0)), + Err(ScheduleError::ByteLimitReached) + ); + assert_eq!( + scheduler.insert_telecommand( + CcsdsSchedulePacketId::new(tc_id, reader.checksum()), + &test_tc_0_raw, + UnixTime::new(2, 0) + ), + Err(ScheduleError::ByteLimitReached) + ); + } } -- 2.43.0 From f9fafd4cd8500100f780471ea3dd0ce747db3170 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 26 Nov 2025 17:21:13 +0100 Subject: [PATCH 6/9] only need test by time window --- satrs/src/ccsds/scheduler.rs | 134 ++++++++++++++++++++++++++++------- 1 file changed, 109 insertions(+), 25 deletions(-) diff --git a/satrs/src/ccsds/scheduler.rs b/satrs/src/ccsds/scheduler.rs index 9ec5a2c..5cfbb6d 100644 --- a/satrs/src/ccsds/scheduler.rs +++ b/satrs/src/ccsds/scheduler.rs @@ -135,7 +135,6 @@ pub mod alloc_mod { limits: Limits, pub(crate) current_time: UnixTime, time_margin: Duration, - enabled: bool, } impl CcsdsScheduler { @@ -146,7 +145,6 @@ pub mod alloc_mod { limits, current_time, time_margin, - enabled: true, } } @@ -179,18 +177,6 @@ pub mod alloc_mod { self.current_fill_count().packets } - /// Enable the scheduler. - #[inline] - pub fn enable(&mut self) { - self.enabled = true; - } - - /// Disable the scheduler. - #[inline] - pub fn disable(&mut self) { - self.enabled = false; - } - /// Update the current time. #[inline] pub fn update_time(&mut self, current_time: UnixTime) { @@ -271,14 +257,14 @@ pub mod alloc_mod { } /// Release all telecommands which should be released based on the current time. - pub fn release_telecommands( + pub fn release_telecommands( &mut self, mut releaser: R, ) { let tcs_to_release = self.telecommands_to_release(); for tc_group in tcs_to_release { for (packet_id, raw_tc) in tc_group.1 { - releaser(self.enabled, packet_id, raw_tc); + releaser(packet_id, raw_tc); } } self.tc_map.retain(|k, _| k > &self.current_time); @@ -295,6 +281,55 @@ pub mod alloc_mod { self.tc_map.range(..=self.current_time) } + /// Delete scheduled telecommand by their packet ID. + /// + /// Returns whether any telecommand was deleted. This function might have to be called + /// multiple times if multiple identical CCSDS packet IDs are possible. + pub fn delete_by_id(&mut self, packet_id: &CcsdsSchedulePacketId) -> bool { + let mut was_removed = false; + self.tc_map.retain(|_, v| { + let len_before = v.len(); + v.retain(|(stored_id, _)| stored_id != packet_id); + let has_remaining = !v.is_empty(); + if v.len() < len_before { + was_removed = true; + } + has_remaining + }); + was_removed + } + + /// Delete all telecommands scheduled in a time window. + /// + /// Returns whether any telecommands were deleted. + pub fn delete_from_start_to_end( + &mut self, + start_time: UnixTime, + end_time: UnixTime, + ) -> bool { + let len_before = self.tc_map.len(); + self.tc_map.retain(|k, _| k < &start_time || k > &end_time); + self.tc_map.len() < len_before + } + + /// Delete all scheduled telecommands starting from a given time. + /// + /// Returns whether any telecommands were deleted. + pub fn delete_from_start(&mut self, start_time: UnixTime) -> bool { + let len_before = self.tc_map.len(); + self.tc_map.retain(|k, _| k < &start_time); + self.tc_map.len() < len_before + } + + /// Delete all scheduled telecommands scheduled before a given time. + /// + /// Returns whether any telecommands were deleted. + pub fn delete_until_end(&mut self, end_time: UnixTime) -> bool { + let len_before = self.tc_map.len(); + self.tc_map.retain(|k, _| k > &end_time); + self.tc_map.len() < len_before + } + /// Completely clear the scheduler. pub fn clear(&mut self) { self.tc_map.clear(); @@ -340,11 +375,26 @@ mod tests { 0 ); assert_eq!(scheduler.current_time(), &unix_time); - scheduler.release_telecommands(|_, _, _| { + scheduler.release_telecommands(|_, _| { panic!("should not be called"); }); } + #[test] + fn test_mutable_closure() { + let unix_time = UnixTime::new(0, 0); + let mut scheduler = CcsdsScheduler::new( + unix_time, + Limits::new(Some(100), Some(1024)), + Duration::from_millis(5000), + ); + let mut some_flag = false; + // We should be able to manipulate the boolean inside the closure. + scheduler.release_telecommands(|_, _| { + some_flag = true; + }); + } + #[test] fn test_clear() { let unix_time = UnixTime::new(0, 0); @@ -408,7 +458,7 @@ mod tests { .len(), 0 ); - scheduler.release_telecommands(|_, _, _| { + scheduler.release_telecommands(|_, _| { panic!("should not be called"); }); scheduler.update_time(UnixTime::new(3, 0)); @@ -419,8 +469,7 @@ mod tests { .len(), 1 ); - scheduler.release_telecommands(|enabled, tc_id_scheduled, tc_raw| { - assert!(enabled); + scheduler.release_telecommands(|tc_id_scheduled, tc_raw| { assert_eq!(tc_id, tc_id_scheduled.base); assert_eq!(checksum, tc_id_scheduled.crc16); assert_eq!(tc_raw, test_tc_raw); @@ -475,7 +524,7 @@ mod tests { .len(), 0 ); - scheduler.release_telecommands(|_, _, _| { + scheduler.release_telecommands(|_, _| { panic!("should not be called"); }); @@ -488,8 +537,7 @@ mod tests { .len(), 1 ); - scheduler.release_telecommands(|enabled, tc_id_scheduled, tc_raw| { - assert!(enabled); + scheduler.release_telecommands(|tc_id_scheduled, tc_raw| { assert_eq!(tc_id_0, tc_id_scheduled.base); assert_eq!(reader_0.checksum(), tc_id_scheduled.crc16); assert_eq!(tc_raw, test_tc_0_raw); @@ -514,8 +562,7 @@ mod tests { .len(), 1 ); - scheduler.release_telecommands(|enabled, tc_id_scheduled, tc_raw| { - assert!(enabled); + scheduler.release_telecommands(|tc_id_scheduled, tc_raw| { assert_eq!(tc_id_1, tc_id_scheduled.base); assert_eq!(reader_1.checksum(), tc_id_scheduled.crc16); assert_eq!(tc_raw, test_tc_1_raw); @@ -603,4 +650,41 @@ mod tests { Err(ScheduleError::ByteLimitReached) ); } + + #[test] + fn test_deletion_by_id() { + let unix_time = UnixTime::new(0, 0); + let mut scheduler = CcsdsScheduler::new( + unix_time, + Limits::new(Some(100), Some(1024)), + Duration::from_millis(1000), + ); + let test_tc = test_tc(&[1, 2, 3]); + let tc_id = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&test_tc); + let test_tc_raw = test_tc.to_vec(); + let reader = CcsdsPacketReader::new(&test_tc_raw, Some(ChecksumType::WithCrc16)).unwrap(); + let checksum = reader.checksum(); + let id = CcsdsSchedulePacketId::new(tc_id, checksum); + scheduler + .insert_telecommand_with_reader(&reader, UnixTime::new(2, 0)) + .unwrap(); + scheduler.delete_by_id(&id); + assert_eq!(scheduler.current_fill_count().packets, 0); + assert_eq!(scheduler.current_fill_count().bytes, 0); + } + + #[test] + fn test_deletion_by_window() { + //TODO + } + + #[test] + fn test_deletion_from_start() { + //TODO + } + + #[test] + fn test_deletion_until_end() { + //TODO + } } -- 2.43.0 From 01c5362473a22a17d8fe8d2a214940890d43b8e1 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 27 Nov 2025 12:23:36 +0100 Subject: [PATCH 7/9] add some more tests --- satrs/src/ccsds/scheduler.rs | 220 ++++++++++++++++++++++++++++++----- 1 file changed, 193 insertions(+), 27 deletions(-) diff --git a/satrs/src/ccsds/scheduler.rs b/satrs/src/ccsds/scheduler.rs index 5cfbb6d..664d1d5 100644 --- a/satrs/src/ccsds/scheduler.rs +++ b/satrs/src/ccsds/scheduler.rs @@ -301,32 +301,29 @@ pub mod alloc_mod { /// Delete all telecommands scheduled in a time window. /// - /// Returns whether any telecommands were deleted. - pub fn delete_from_start_to_end( - &mut self, - start_time: UnixTime, - end_time: UnixTime, - ) -> bool { + /// The range includes the start time but excludes the end time. Returns whether any + /// telecommands were deleted. + pub fn delete_in_time_window(&mut self, start_time: UnixTime, end_time: UnixTime) -> bool { let len_before = self.tc_map.len(); - self.tc_map.retain(|k, _| k < &start_time || k > &end_time); + self.tc_map.retain(|k, _| k < &start_time || k >= &end_time); self.tc_map.len() < len_before } - /// Delete all scheduled telecommands starting from a given time. + /// Delete all scheduled telecommands scheduled after or at a given time. /// /// Returns whether any telecommands were deleted. - pub fn delete_from_start(&mut self, start_time: UnixTime) -> bool { + pub fn delete_all_starting_at(&mut self, start_time: UnixTime) -> bool { let len_before = self.tc_map.len(); self.tc_map.retain(|k, _| k < &start_time); self.tc_map.len() < len_before } - /// Delete all scheduled telecommands scheduled before a given time. + /// Delete all scheduled telecommands scheduled before but not equal to a given time. /// /// Returns whether any telecommands were deleted. - pub fn delete_until_end(&mut self, end_time: UnixTime) -> bool { + pub fn delete_all_before(&mut self, end_time: UnixTime) -> bool { let len_before = self.tc_map.len(); - self.tc_map.retain(|k, _| k > &end_time); + self.tc_map.retain(|k, _| k >= &end_time); self.tc_map.len() < len_before } @@ -339,16 +336,21 @@ pub mod alloc_mod { #[cfg(test)] mod tests { - use arbitrary_int::u11; + use arbitrary_int::{traits::Integer, u11, u14}; use spacepackets::{ CcsdsPacketCreatorOwned, CcsdsPacketReader, ChecksumType, SpacePacketHeader, }; use super::*; - fn test_tc(app_data: &[u8]) -> CcsdsPacketCreatorOwned { + fn test_tc(app_data: &[u8], seq_count: u14) -> CcsdsPacketCreatorOwned { CcsdsPacketCreatorOwned::new( - SpacePacketHeader::new_from_apid(u11::new(0x1)), + SpacePacketHeader::new_for_tc( + u11::new(0x1), + spacepackets::SequenceFlags::Unsegmented, + seq_count, + 0, + ), spacepackets::PacketType::Tc, app_data, Some(ChecksumType::WithCrc16), @@ -403,7 +405,7 @@ mod tests { Limits::new(Some(100), Some(1024)), Duration::from_millis(1000), ); - let test_tc = test_tc(&[1, 2, 3]); + let test_tc = test_tc(&[1, 2, 3], u14::ZERO); let test_tc_raw = test_tc.to_vec(); let reader = CcsdsPacketReader::new(&test_tc_raw, Some(ChecksumType::WithCrc16)).unwrap(); scheduler @@ -440,9 +442,9 @@ mod tests { Limits::new(Some(100), Some(1024)), Duration::from_millis(1000), ); - let test_tc = test_tc(&[1, 2, 3]); - let tc_id = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&test_tc); - let test_tc_raw = test_tc.to_vec(); + let test_tc_0 = test_tc(&[1, 2, 3], u14::ZERO); + let tc_id = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&test_tc_0); + let test_tc_raw = test_tc_0.to_vec(); let reader = CcsdsPacketReader::new(&test_tc_raw, Some(ChecksumType::WithCrc16)).unwrap(); let checksum = reader.checksum(); scheduler @@ -484,15 +486,15 @@ mod tests { } #[test] - fn insert_and_release_multi() { + fn insert_and_release_multi_0() { let unix_time = UnixTime::new(0, 0); let mut scheduler = CcsdsScheduler::new( unix_time, Limits::new(Some(100), Some(1024)), Duration::from_millis(1000), ); - let test_tc_0 = test_tc(&[42]); - let test_tc_1 = test_tc(&[1, 2, 3]); + let test_tc_0 = test_tc(&[42], u14::ZERO); + let test_tc_1 = test_tc(&[1, 2, 3], u14::new(1)); let tc_id_0 = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&test_tc_0); let tc_id_1 = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&test_tc_1); let test_tc_0_raw = test_tc_0.to_vec(); @@ -579,6 +581,85 @@ mod tests { assert_eq!(scheduler.num_of_entries(), 0); } + #[test] + fn insert_and_release_multi_1() { + let unix_time = UnixTime::new(0, 0); + let mut scheduler = CcsdsScheduler::new( + unix_time, + Limits::new(Some(100), Some(1024)), + Duration::from_millis(1000), + ); + let test_tc_0 = test_tc(&[42], u14::ZERO); + let test_tc_1 = test_tc(&[1, 2, 3], u14::new(1)); + let tc_id_0 = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&test_tc_0); + let tc_id_1 = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&test_tc_1); + let test_tc_0_raw = test_tc_0.to_vec(); + let test_tc_1_raw = test_tc_1.to_vec(); + let reader_0 = + CcsdsPacketReader::new(&test_tc_0_raw, Some(ChecksumType::WithCrc16)).unwrap(); + let reader_1 = + CcsdsPacketReader::new(&test_tc_1_raw, Some(ChecksumType::WithCrc16)).unwrap(); + scheduler + .insert_telecommand_with_reader(&reader_0, UnixTime::new(2, 0)) + .unwrap(); + scheduler + .insert_telecommand( + CcsdsSchedulePacketId::new(tc_id_1, reader_1.checksum()), + &test_tc_1_raw, + UnixTime::new(5, 0), + ) + .unwrap(); + assert_eq!(scheduler.current_fill_count().packets, 2); + assert_eq!( + scheduler.current_fill_count().bytes, + test_tc_0_raw.len() + test_tc_1_raw.len() + ); + assert_eq!(scheduler.num_of_entries(), 2); + assert_eq!( + scheduler + .telecommands_to_release() + .collect::>() + .len(), + 0 + ); + scheduler.release_telecommands(|_, _| { + panic!("should not be called"); + }); + + // Release first TC. + scheduler.update_time(UnixTime::new(6, 0)); + assert_eq!( + scheduler + .telecommands_to_release() + .collect::>() + .len(), + 2 + ); + let mut index = 0; + scheduler.release_telecommands(|tc_id_scheduled, tc_raw| { + if index == 0 { + assert_eq!(tc_id_0, tc_id_scheduled.base); + assert_eq!(reader_0.checksum(), tc_id_scheduled.crc16); + assert_eq!(tc_raw, test_tc_0_raw); + } else { + assert_eq!(tc_id_1, tc_id_scheduled.base); + assert_eq!(reader_1.checksum(), tc_id_scheduled.crc16); + assert_eq!(tc_raw, test_tc_1_raw); + } + index += 1; + }); + assert_eq!( + scheduler + .telecommands_to_release() + .collect::>() + .len(), + 0 + ); + assert_eq!(scheduler.current_fill_count().packets, 0); + assert_eq!(scheduler.current_fill_count().bytes, 0); + assert_eq!(scheduler.num_of_entries(), 0); + } + #[test] fn test_packet_limit_reached() { let unix_time = UnixTime::new(0, 0); @@ -587,7 +668,7 @@ mod tests { Limits::new(Some(3), None), Duration::from_millis(1000), ); - let test_tc_0 = test_tc(&[42]); + let test_tc_0 = test_tc(&[42], u14::ZERO); let test_tc_0_raw = test_tc_0.to_vec(); let reader = CcsdsPacketReader::new(&test_tc_0_raw, Some(ChecksumType::WithCrc16)).unwrap(); let tc_id = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&test_tc_0); @@ -618,7 +699,7 @@ mod tests { #[test] fn test_byte_limit_reached() { let unix_time = UnixTime::new(0, 0); - let test_tc_0 = test_tc(&[42]); + let test_tc_0 = test_tc(&[42], u14::ZERO); let mut scheduler = CcsdsScheduler::new( unix_time, Limits::new(None, Some(test_tc_0.len_written() * 3)), @@ -659,7 +740,7 @@ mod tests { Limits::new(Some(100), Some(1024)), Duration::from_millis(1000), ); - let test_tc = test_tc(&[1, 2, 3]); + let test_tc = test_tc(&[1, 2, 3], u14::ZERO); let tc_id = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&test_tc); let test_tc_raw = test_tc.to_vec(); let reader = CcsdsPacketReader::new(&test_tc_raw, Some(ChecksumType::WithCrc16)).unwrap(); @@ -674,8 +755,93 @@ mod tests { } #[test] - fn test_deletion_by_window() { - //TODO + fn test_deletion_by_window_0() { + let unix_time = UnixTime::new(0, 0); + let mut scheduler = CcsdsScheduler::new( + unix_time, + Limits::new(Some(100), Some(1024)), + Duration::from_millis(1000), + ); + let test_tc_0 = test_tc(&[42], u14::ZERO); + let test_tc_1 = test_tc(&[1, 2, 3], u14::new(1)); + let test_tc_2 = test_tc(&[1, 2, 3], u14::new(2)); + let test_tc_0_raw = test_tc_0.to_vec(); + let test_tc_1_raw = test_tc_1.to_vec(); + let test_tc_2_raw = test_tc_2.to_vec(); + let reader_0 = + CcsdsPacketReader::new(&test_tc_0_raw, Some(ChecksumType::WithCrc16)).unwrap(); + let reader_1 = + CcsdsPacketReader::new(&test_tc_1_raw, Some(ChecksumType::WithCrc16)).unwrap(); + let reader_2 = + CcsdsPacketReader::new(&test_tc_2_raw, Some(ChecksumType::WithCrc16)).unwrap(); + scheduler + .insert_telecommand_with_reader(&reader_0, UnixTime::new(2, 0)) + .unwrap(); + scheduler + .insert_telecommand_with_reader(&reader_1, UnixTime::new(5, 0)) + .unwrap(); + scheduler + .insert_telecommand_with_reader(&reader_2, UnixTime::new(7, 0)) + .unwrap(); + let deleted = scheduler.delete_in_time_window(UnixTime::new(3, 0), UnixTime::new(6, 0)); + assert!(deleted); + assert_eq!(scheduler.current_fill_count().packets, 2); + assert_eq!( + scheduler.current_fill_count().bytes, + test_tc_0_raw.len() + test_tc_2_raw.len() + ); + scheduler.update_time(UnixTime::new(10, 0)); + let mut index = 0; + scheduler.release_telecommands(|_id, packet| { + if index == 0 { + assert_eq!(packet, test_tc_0_raw); + } else { + assert_eq!(packet, test_tc_2_raw); + } + index += 1; + }); + assert_eq!(scheduler.current_fill_count().packets, 0); + assert_eq!(scheduler.current_fill_count().bytes, 0); + } + + #[test] + fn test_deletion_by_window_1() { + let unix_time = UnixTime::new(0, 0); + let mut scheduler = CcsdsScheduler::new( + unix_time, + Limits::new(Some(100), Some(1024)), + Duration::from_millis(1000), + ); + let test_tc_0 = test_tc(&[42], u14::ZERO); + let test_tc_1 = test_tc(&[1, 2, 3], u14::new(1)); + let test_tc_2 = test_tc(&[1, 2, 3], u14::new(2)); + let test_tc_0_raw = test_tc_0.to_vec(); + let test_tc_1_raw = test_tc_1.to_vec(); + let test_tc_2_raw = test_tc_2.to_vec(); + let reader_0 = + CcsdsPacketReader::new(&test_tc_0_raw, Some(ChecksumType::WithCrc16)).unwrap(); + let reader_1 = + CcsdsPacketReader::new(&test_tc_1_raw, Some(ChecksumType::WithCrc16)).unwrap(); + let reader_2 = + CcsdsPacketReader::new(&test_tc_2_raw, Some(ChecksumType::WithCrc16)).unwrap(); + scheduler + .insert_telecommand_with_reader(&reader_0, UnixTime::new(2, 0)) + .unwrap(); + scheduler + .insert_telecommand_with_reader(&reader_1, UnixTime::new(5, 0)) + .unwrap(); + scheduler + .insert_telecommand_with_reader(&reader_2, UnixTime::new(7, 0)) + .unwrap(); + // This only deletes the first 2 TCs. + let deleted = scheduler.delete_in_time_window(UnixTime::new(2, 0), UnixTime::new(7, 0)); + assert!(deleted); + assert_eq!(scheduler.current_fill_count().packets, 1); + assert_eq!(scheduler.current_fill_count().bytes, test_tc_2_raw.len()); + scheduler.update_time(UnixTime::new(10, 0)); + scheduler.release_telecommands(|_id, packet| { + assert_eq!(packet, test_tc_2_raw); + }); } #[test] -- 2.43.0 From 2d21777c73951d1fa22d8ae674bcbbf5bb21cdbb Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 27 Nov 2025 12:30:10 +0100 Subject: [PATCH 8/9] install libudev-dev for CI --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ab84b05..1be0809 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,6 +23,7 @@ jobs: - uses: dtolnay/rust-toolchain@stable - name: Install nextest uses: taiki-e/install-action@nextest + - run: sudo apt update && sudo apt install -y libudev-dev - run: cargo nextest run --all-features - run: cargo test --doc --all-features @@ -67,4 +68,5 @@ jobs: - uses: dtolnay/rust-toolchain@stable with: components: clippy + - run: sudo apt update && sudo apt install -y libudev-dev - run: cargo clippy -- -D warnings -- 2.43.0 From a552ec5eeb900396f64235f6470a48bb415ba6b7 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 27 Nov 2025 12:31:37 +0100 Subject: [PATCH 9/9] libudev for check --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1be0809..d1392b5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,6 +11,9 @@ jobs: steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable + - name: Install libudev-dev on Ubuntu + if: ${{ matrix.os == 'ubuntu-latest' }} + run: sudo apt update && sudo apt install -y libudev-dev - run: cargo check # Check example with static pool configuration - run: cargo check -p satrs-example --no-default-features -- 2.43.0