diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index 71dee84..6f009b4 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -1,15 +1,52 @@ use crate::pool::{PoolProvider, StoreAddr, StoreError}; use alloc::collections::btree_map::{Entry, Range}; +use core::fmt::{Debug, Display, Formatter}; use core::time::Duration; -use spacepackets::ecss::PusPacket; +use spacepackets::ecss::{PusError, PusPacket}; use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc}; use spacepackets::time::cds::DaysLen24Bits; use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; use std::collections::BTreeMap; +use std::error::Error; use std::time::SystemTimeError; use std::vec; use std::vec::Vec; +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ScheduleError { + PusError(PusError), + TimeMarginTooShort, + NestedScheduledTC, + StoreError(StoreError), + TCDataEmpty, + TimestampError(TimestampError), +} + +impl Display for ScheduleError { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + todo!() + } +} + +impl From for ScheduleError { + fn from(e: PusError) -> Self { + ScheduleError::PusError(e) + } +} + +impl From for ScheduleError { + fn from(e: StoreError) -> Self { + ScheduleError::StoreError(e) + } +} + +impl From for ScheduleError { + fn from(e: TimestampError) -> Self { + ScheduleError::TimestampError(e) + } +} + +impl Error for ScheduleError {} //TODO: Move to spacepackets #[derive(Debug, PartialEq, Copy, Clone)] @@ -19,7 +56,6 @@ pub enum ScheduleSubservice { ResetScheduling = 3, InsertActivity = 4, DeleteActivity = 5, - } #[derive(Debug)] @@ -74,8 +110,17 @@ impl PusScheduler { /// will be returned but the method will still try to delete all the commands in the schedule. pub fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError> { self.enabled = false; + let mut deletion_ok = Ok(()); + for tc_lists in &mut self.tc_map { + for tc in tc_lists.1 { + let res = store.delete(*tc); + if res.is_err() { + deletion_ok = res; + } + } + } self.tc_map.clear(); - return Ok(()) + deletion_ok } pub fn update_time(&mut self, current_time: UnixTimestamp) { @@ -90,9 +135,9 @@ impl PusScheduler { &mut self, time_stamp: UnixTimestamp, addr: StoreAddr, - ) -> bool { + ) -> Result<(), ScheduleError> { if time_stamp < self.current_time + self.time_margin { - return false; + return Err(ScheduleError::TimeMarginTooShort); } match self.tc_map.entry(time_stamp) { Entry::Vacant(e) => { @@ -102,7 +147,7 @@ impl PusScheduler { v.get_mut().push(addr); } } - true + Ok(()) } pub fn insert_unwrapped_tc( @@ -110,57 +155,45 @@ impl PusScheduler { time_stamp: UnixTimestamp, tc: &[u8], pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - let check_tc = PusTc::from_bytes(tc).unwrap(); + ) -> Result { + let check_tc = PusTc::from_bytes(tc)?; if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { // TODO: should not be able to schedule a scheduled tc - return Err(()); + return Err(ScheduleError::NestedScheduledTC); } match pool.add(tc) { Ok(addr) => { - let worked = self.insert_unwrapped_and_stored_tc(time_stamp, addr); - if worked { - return Ok(addr); - } else { - return Err(()); - } + self.insert_unwrapped_and_stored_tc(time_stamp, addr)?; + Ok(addr) } Err(err) => { - return Err(()); + return Err(err.into()); } } } - // insert_wrapped_tc() // (&dyn CcsdsTimeProvider)> pub fn insert_wrapped_tc( &mut self, pus_tc: &PusTc, pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { - if PusPacket::service(pus_tc) != 11 || PusPacket::subservice(pus_tc) != 4 { - return Err(()); - } - + ) -> Result { return if let Some(user_data) = pus_tc.user_data() { - let mut stamp: TimeStamp = match TimeReader::from_bytes(user_data) { - Ok(stamp) => stamp, - Err(error) => return Err(()), - }; + let mut stamp: TimeStamp = TimeReader::from_bytes(user_data)?; let unix_stamp = stamp.unix_stamp(); let stamp_len = stamp.len_as_bytes(); self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool) } else { - Err(()) - } + Err(ScheduleError::TCDataEmpty) + }; } pub fn insert_wrapped_tc_cds_short( &mut self, pus_tc: &PusTc, pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { + ) -> Result { self.insert_wrapped_tc::(pus_tc, pool) } @@ -168,7 +201,7 @@ impl PusScheduler { &mut self, pus_tc: &PusTc, pool: &mut (impl PoolProvider + ?Sized), - ) -> Result { + ) -> Result { self.insert_wrapped_tc::>(pus_tc, pool) } @@ -183,7 +216,6 @@ impl PusScheduler { Ok(()) } - /// Utility method which calls [Self::telecommands_to_release] and then calls a releaser /// closure for each telecommand which should be released. This function will also delete /// the telecommands from the holding store after calling the release closure. @@ -223,18 +255,18 @@ impl PusScheduler { #[cfg(test)] mod tests { - use crate::pool::{LocalPool, PoolCfg, StoreAddr}; - use crate::pus::scheduling::PusScheduler; + use crate::pool::{LocalPool, PoolCfg, PoolProvider, StoreAddr}; + use crate::pus::scheduling::{PusScheduler, ScheduleError}; use spacepackets::ecss::PacketTypeCodes::UnsignedInt; + use spacepackets::tc::PusTc; use spacepackets::time::UnixTimestamp; + use spacepackets::{CcsdsPacket, SpHeader}; use std::sync::mpsc; use std::sync::mpsc::{channel, Receiver, TryRecvError}; use std::time::Duration; use std::vec::Vec; use std::{println, vec}; - use heapless::pool::Pool; - use spacepackets::SpHeader; - use spacepackets::tc::PusTc; + use spacepackets::ecss::PusPacket; #[test] fn basic() { @@ -247,28 +279,42 @@ mod tests { #[test] fn reset() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let first_addr = pool.add(&[0, 1, 2]).unwrap(); - let worked = scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr.clone()); - assert!(worked); + let first_addr = pool.add(&[0, 1, 2]).unwrap(); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(100), + first_addr.clone(), + ) + .unwrap(); let second_addr = pool.add(&[2, 3, 4]).unwrap(); - let worked = scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr.clone()); - - assert!(worked); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(200), + second_addr.clone(), + ) + .unwrap(); let third_addr = pool.add(&[5, 6, 7]).unwrap(); - let worked = scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(300), third_addr.clone()); - - assert!(worked); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(300), + third_addr.clone(), + ) + .unwrap(); assert_eq!(scheduler.num_scheduled_telecommands(), 3); assert!(scheduler.is_enabled()); - scheduler.reset().unwrap(); + scheduler.reset(&mut pool).expect("deletion of TCs failed"); assert!(!scheduler.is_enabled()); assert_eq!(scheduler.num_scheduled_telecommands(), 0); + assert!(!pool.has_element_at(&first_addr).unwrap()); + assert!(!pool.has_element_at(&second_addr).unwrap()); + assert!(!pool.has_element_at(&third_addr).unwrap()); } #[test] @@ -276,35 +322,35 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let worked = scheduler.insert_unwrapped_and_stored_tc( - UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 1, - }, - ); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(100), + StoreAddr { + pool_idx: 0, + packet_idx: 1, + }, + ) + .unwrap(); - assert!(worked); + let worked = scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(100), + StoreAddr { + pool_idx: 0, + packet_idx: 2, + }, + ) + .unwrap(); - let worked = scheduler.insert_unwrapped_and_stored_tc( - UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 2, - }, - ); - - assert!(worked); - - let worked = scheduler.insert_unwrapped_and_stored_tc( - UnixTimestamp::new_only_seconds(300), - StoreAddr { - pool_idx: 0, - packet_idx: 2, - }, - ); - - assert!(worked); + let worked = scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(300), + StoreAddr { + pool_idx: 0, + packet_idx: 2, + }, + ) + .unwrap(); assert_eq!(scheduler.num_scheduled_telecommands(), 3); } @@ -318,8 +364,20 @@ mod tests { assert_eq!(scheduler.current_time(), &time); } + fn common_check( + enabled: bool, + store_addr: &StoreAddr, + expected_store_addrs: Vec, + counter: &mut usize, + ) { + assert_eq!(enabled, true); + assert!(expected_store_addrs.contains(store_addr)); + *counter += 1; + } + #[test] fn release_basic() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -331,46 +389,44 @@ mod tests { let mut i = 0; let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| { - assert_eq!(boolvar, true); - assert_eq!( - store_addr, - &StoreAddr { - pool_idx: 0, - packet_idx: 1, - } - ); - i += 1; + common_check(boolvar, store_addr, vec![first_addr], &mut i); + true }; // test 1: too early, no tcs scheduler.update_time(UnixTimestamp::new_only_seconds(99)); - scheduler.release_telecommands(&mut test_closure_1); + scheduler + .release_telecommands(&mut test_closure_1, &mut pool) + .expect("deletion failed"); // test 2: exact time stamp of tc, releases 1 tc scheduler.update_time(UnixTimestamp::new_only_seconds(100)); - scheduler.release_telecommands(&mut test_closure_1); + let mut released = scheduler + .release_telecommands(&mut test_closure_1, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 1); + assert!(!pool.has_element_at(&first_addr).unwrap()); // test 3, late timestamp, release 1 overdue tc let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| { - assert_eq!(boolvar, true); - assert_eq!( - store_addr, - &StoreAddr { - pool_idx: 0, - packet_idx: 2, - } - ); - i += 1; + common_check(boolvar, store_addr, vec![second_addr], &mut i); + true }; scheduler.update_time(UnixTimestamp::new_only_seconds(206)); - scheduler.release_telecommands(&mut test_closure_2); + released = scheduler + .release_telecommands(&mut test_closure_2, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 1); + assert!(!pool.has_element_at(&second_addr).unwrap()); //test 4: no tcs left - scheduler.release_telecommands(&mut test_closure_2); + scheduler + .release_telecommands(&mut test_closure_2, &mut pool) + .expect("deletion failed"); // check that 2 total tcs have been released assert_eq!(i, 2); @@ -378,6 +434,7 @@ mod tests { #[test] fn release_multi_with_same_time() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -389,34 +446,52 @@ mod tests { let mut i = 0; let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { - assert_eq!(boolvar, true); - assert_eq!( - store_addr, - &StoreAddr { - pool_idx: 0, - packet_idx: 1, - } - ); - i += 1; + common_check(boolvar, store_addr, vec![first_addr, second_addr], &mut i); + true }; // test 1: too early, no tcs scheduler.update_time(UnixTimestamp::new_only_seconds(99)); - scheduler.release_telecommands(&mut test_closure); + let mut released = scheduler + .release_telecommands(&mut test_closure, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 0); // test 2: exact time stamp of tc, releases 2 tc scheduler.update_time(UnixTimestamp::new_only_seconds(100)); - scheduler.release_telecommands(&mut test_closure); + released = scheduler + .release_telecommands(&mut test_closure, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 2); + assert!(!pool.has_element_at(&first_addr).unwrap()); + assert!(!pool.has_element_at(&second_addr).unwrap()); //test 3: no tcs left - scheduler.release_telecommands(&mut test_closure); + released = scheduler + .release_telecommands(&mut test_closure, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 0); // check that 2 total tcs have been released assert_eq!(i, 2); } + fn scheduled_tc() -> PusTc<'static> { + let contained_tc = base_ping_tc_simple_ctor(); + let len = contained_tc.total_len() as u16; + let mut sph = SpHeader::tc_unseg(0x02, 0x34, len).unwrap(); + + PusTc::new_simple(&mut sph, 11, 4, contained_tc.raw(), true) + } + + fn base_ping_tc_simple_ctor() -> PusTc<'static> { + let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap(); + PusTc::new_simple(&mut sph, 17, 1, None, true) + } + + /* #[test] fn insert_unwrapped_tc() { let mut scheduler = @@ -424,12 +499,14 @@ mod tests { let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); - let addr = scheduler.insert_unwrapped_tc(UnixTimestamp::new_only_seconds(100), &[1,2,3,4], &mut pool).unwrap(); + + let addr = scheduler.insert_unwrapped_tc(UnixTimestamp::new_only_seconds(100), &[1,2,3], &mut pool).unwrap(); assert_eq!(scheduler.num_scheduled_telecommands(), 1); scheduler.update_time(UnixTimestamp::new_only_seconds(101)); + let mut i = 0; let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { common_check(boolvar, store_addr, vec![addr], &mut i); true @@ -438,10 +515,7 @@ mod tests { scheduler.release_telecommands(&mut test_closure, &mut pool).unwrap(); } - fn scheduled_tc() -> PusTc<'static> { - let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap(); - PusTc::new_simple(&mut sph, 11, 4, None, true) - } + */ #[test] fn insert_wrapped_tc() { @@ -450,27 +524,35 @@ mod tests { let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); - let tc = base_ping_tc_simple_ctor(); + let tc = scheduled_tc(); - let addr = scheduler.insert_wrapped_tc( &tc, &mut pool).unwrap(); + if let Some(data) = tc.user_data() { + + } else { + panic!(); + } + + let addr = scheduler + .insert_wrapped_tc::(&tc, &mut pool) + .unwrap(); assert_eq!(scheduler.num_scheduled_telecommands(), 1); scheduler.update_time(UnixTimestamp::new_only_seconds(101)); + let mut i = 0; let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { common_check(boolvar, store_addr, vec![addr], &mut i); true }; - scheduler.release_telecommands(&mut test_closure, &mut pool).unwrap(); - } - - fn base_ping_tc_simple_ctor() -> PusTc<'static> { - let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap(); - PusTc::new_simple(&mut sph, 17, 1, None, true) + scheduler + .release_telecommands(&mut test_closure, &mut pool) + .unwrap(); } + /* + #[test] fn insert_wrong_subservice() { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); @@ -479,8 +561,10 @@ mod tests { let tc = base_ping_tc_simple_ctor(); - let addr = scheduler.insert_wrapped_tc( &tc, &mut pool).unwrap(); + let addr = scheduler.insert_wrapped_tc::( &tc, &mut pool).unwrap(); assert_eq!(scheduler.num_scheduled_telecommands(), 0); } + + */ } diff --git a/satrs-example/src/pus.rs b/satrs-example/src/pus.rs index 0084ca5..9c4dbe3 100644 --- a/satrs-example/src/pus.rs +++ b/satrs-example/src/pus.rs @@ -6,10 +6,12 @@ use satrs_core::pool::{StoreAddr, StoreError}; use satrs_core::pus::event::Subservices; use satrs_core::pus::event_man::{EventRequest, EventRequestWithToken}; use satrs_core::pus::hk; +use satrs_core::pus::scheduling::{PusScheduler, ScheduleSubservice}; use satrs_core::pus::verification::{ FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; use satrs_core::res_code::ResultU16; +use satrs_core::spacepackets::time::{CcsdsTimeProvider, UnixTimestamp}; use satrs_core::tmtc::tm_helper::PusTmWithCdsShortHelper; use satrs_core::tmtc::{AddressableId, PusServiceProvider}; use satrs_core::{ @@ -20,11 +22,8 @@ use satrs_example::{hk_err, tmtc_err}; use std::collections::HashMap; use std::rc::Rc; use std::sync::mpsc::Sender; -use std::time::Duration; -use satrs_core::pus::scheduling::{PusScheduler, ScheduleSubservice}; -use satrs_core::spacepackets::time::{CcsdsTimeProvider, UnixTimestamp}; use std::sync::{Arc, LockResult, Mutex}; - +use std::time::Duration; pub struct PusReceiver { pub tm_helper: PusTmWithCdsShortHelper, @@ -315,7 +314,6 @@ impl PusReceiver { .completion_success(start_token, Some(&self.time_stamp)) .expect("Error sending completion success"); } else { - panic!("Failed to enable scheduler"); } } @@ -331,7 +329,6 @@ impl PusReceiver { .completion_success(start_token, Some(&self.time_stamp)) .expect("Error sending completion success"); } else { - panic!("Failed to disable scheduler"); } } @@ -366,10 +363,12 @@ impl PusReceiver { .expect("Error sending start success"); match self.tc_source.tc_store.pool.write() { Ok(mut pool) => { - scheduler.insert_wrapped_tc::( - pus_tc, - pool.as_mut(), - ); + scheduler + .insert_wrapped_tc::( + pus_tc, + pool.as_mut(), + ) + .expect("TODO: panic message"); } Err(_) => {} } diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 728c19a..1693f7c 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -280,7 +280,6 @@ fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { } } - fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) { while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() { let mut store_lock = udp_tmtc_server