diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index 2f91483..4b41d47 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -3,12 +3,96 @@ use crate::pool::{PoolProvider, StoreAddr, StoreError}; use alloc::collections::btree_map::{Entry, Range}; use alloc::vec; use alloc::vec::Vec; +use core::fmt::{Debug, Display, Formatter}; use core::time::Duration; -use spacepackets::time::UnixTimestamp; +use spacepackets::ecss::{PusError, PusPacket}; +use spacepackets::tc::PusTc; +use spacepackets::time::cds::DaysLen24Bits; +use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; use std::collections::BTreeMap; #[cfg(feature = "std")] +use std::error::Error; +#[cfg(feature = "std")] use std::time::SystemTimeError; +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ScheduleError { + PusError(PusError), + TimeMarginTooShort(UnixTimestamp, UnixTimestamp), + NestedScheduledTc, + StoreError(StoreError), + TcDataEmpty, + TimestampError(TimestampError), + WrongSubservice, + WrongService, +} + +impl Display for ScheduleError { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + match self { + ScheduleError::PusError(e) => { + write!(f, "Pus Error: {}", e) + } + ScheduleError::TimeMarginTooShort(current_time, timestamp) => { + write!( + f, + "Error: time margin too short, current time: {:?}, time stamp: {:?}", + current_time, timestamp + ) + } + ScheduleError::NestedScheduledTc => { + write!(f, "Error: nested scheduling is not allowed") + } + ScheduleError::StoreError(e) => { + write!(f, "Store Error: {}", e) + } + ScheduleError::TcDataEmpty => { + write!(f, "Error: empty Tc Data field") + } + ScheduleError::TimestampError(e) => { + write!(f, "Timestamp Error: {}", e) + } + ScheduleError::WrongService => { + write!(f, "Error: Service not 11.") + } + ScheduleError::WrongSubservice => { + write!(f, "Error: Subservice not 4.") + } + } + } +} + +impl From for ScheduleError { + fn from(e: PusError) -> Self { + ScheduleError::PusError(e) + } +} + +impl From for ScheduleError { + fn from(e: StoreError) -> Self { + ScheduleError::StoreError(e) + } +} + +impl From for ScheduleError { + fn from(e: TimestampError) -> Self { + ScheduleError::TimestampError(e) + } +} + +#[cfg(feature = "std")] +impl Error for ScheduleError {} + +//TODO: Move to spacepackets +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum ScheduleSubservice { + EnableScheduling = 1, + DisableScheduling = 2, + ResetScheduling = 3, + InsertActivity = 4, + DeleteActivity = 5, +} + /// This is the core data structure for scheduling PUS telecommands with [alloc] support. /// /// It is assumed that the actual telecommand data is stored in a separate TC pool offering @@ -82,7 +166,8 @@ impl PusScheduler { /// The holding store for the telecommands needs to be passed so all the stored telecommands /// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error /// will be returned but the method will still try to delete all the commands in the schedule. - pub fn reset(&mut self, store: &mut impl PoolProvider) -> Result<(), StoreError> { + + pub fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError> { self.enabled = false; let mut deletion_ok = Ok(()); for tc_lists in &mut self.tc_map { @@ -105,9 +190,16 @@ impl PusScheduler { &self.current_time } - pub fn insert_tc(&mut self, time_stamp: UnixTimestamp, addr: StoreAddr) -> bool { + pub fn insert_unwrapped_and_stored_tc( + &mut self, + time_stamp: UnixTimestamp, + addr: StoreAddr, + ) -> Result<(), ScheduleError> { if time_stamp < self.current_time + self.time_margin { - return false; + return Err(ScheduleError::TimeMarginTooShort( + self.current_time, + time_stamp, + )); } match self.tc_map.entry(time_stamp) { Entry::Vacant(e) => { @@ -117,7 +209,65 @@ impl PusScheduler { v.get_mut().push(addr); } } - true + Ok(()) + } + + pub fn insert_unwrapped_tc( + &mut self, + time_stamp: UnixTimestamp, + tc: &[u8], + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + let check_tc = PusTc::from_bytes(tc)?; + if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 { + return Err(ScheduleError::NestedScheduledTc); + } + + match pool.add(tc) { + Ok(addr) => { + self.insert_unwrapped_and_stored_tc(time_stamp, addr)?; + Ok(addr) + } + Err(err) => Err(err.into()), + } + } + + // (&dyn CcsdsTimeProvider)> + pub fn insert_wrapped_tc( + &mut self, + pus_tc: &PusTc, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + if PusPacket::service(pus_tc) != 11 { + return Err(ScheduleError::WrongService); + } + if PusPacket::subservice(pus_tc) != 4 { + return Err(ScheduleError::WrongSubservice); + } + return if let Some(user_data) = pus_tc.user_data() { + let stamp: TimeStamp = TimeReader::from_bytes(user_data)?; + let unix_stamp = stamp.unix_stamp(); + let stamp_len = stamp.len_as_bytes(); + self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool) + } else { + Err(ScheduleError::TcDataEmpty) + }; + } + + pub fn insert_wrapped_tc_cds_short( + &mut self, + pus_tc: &PusTc, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + self.insert_wrapped_tc::(pus_tc, pool) + } + + pub fn insert_wrapped_tc_cds_long( + &mut self, + pus_tc: &PusTc, + pool: &mut (impl PoolProvider + ?Sized), + ) -> Result { + self.insert_wrapped_tc::>(pus_tc, pool) } pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec> { @@ -133,18 +283,19 @@ impl PusScheduler { /// Utility method which calls [Self::telecommands_to_release] and then calls a releaser /// closure for each telecommand which should be released. This function will also delete - /// the telecommands from the holding store after calling the release closure. + /// the telecommands from the holding store after calling the release closure, if the scheduler + /// is disabled. /// /// # Arguments /// /// * `releaser` - Closure where the first argument is whether the scheduler is enabled and /// the second argument is the store address. This closure should return whether the - /// command should be deleted. + /// command should be deleted if the scheduler is disabled to prevent memory leaks. /// * `store` - The holding store of the telecommands. pub fn release_telecommands bool>( &mut self, mut releaser: R, - tc_store: &mut impl PoolProvider, + tc_store: &mut (impl PoolProvider + ?Sized), ) -> Result { let tcs_to_release = self.telecommands_to_release(); let mut released_tcs = 0; @@ -153,7 +304,7 @@ impl PusScheduler { for addr in tc.1 { let should_delete = releaser(self.enabled, addr); released_tcs += 1; - if should_delete { + if should_delete && !self.is_enabled() { let res = tc_store.delete(*addr); if res.is_err() { store_error = res; @@ -171,10 +322,20 @@ impl PusScheduler { #[cfg(test)] mod tests { use crate::pool::{LocalPool, PoolCfg, PoolProvider, StoreAddr}; - use crate::pus::scheduling::PusScheduler; - use alloc::vec::Vec; - use spacepackets::time::UnixTimestamp; + use crate::pus::scheduling::{PusScheduler, ScheduleError}; + use crate::tmtc::ccsds_distrib::tests::generate_ping_tc; + use alloc::rc::Rc; + use core::borrow::BorrowMut; + use spacepackets::ecss::PacketTypeCodes::UnsignedInt; + use spacepackets::ecss::PusPacket; + use spacepackets::tc::PusTc; + use spacepackets::time::{cds, TimeWriter, UnixTimestamp}; + use spacepackets::{CcsdsPacket, SpHeader}; + use std::cell::RefCell; + use std::sync::mpsc; + use std::sync::mpsc::{channel, Receiver, TryRecvError}; use std::time::Duration; + use std::vec::Vec; #[allow(unused_imports)] use std::{println, vec}; @@ -194,19 +355,29 @@ mod tests { PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let first_addr = pool.add(&[0, 1, 2]).unwrap(); - let worked = scheduler.insert_tc(UnixTimestamp::new_only_seconds(100), first_addr.clone()); - assert!(worked); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(100), + first_addr.clone(), + ) + .unwrap(); let second_addr = pool.add(&[2, 3, 4]).unwrap(); - let worked = scheduler.insert_tc(UnixTimestamp::new_only_seconds(200), second_addr.clone()); - - assert!(worked); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(200), + second_addr.clone(), + ) + .unwrap(); let third_addr = pool.add(&[5, 6, 7]).unwrap(); - let worked = scheduler.insert_tc(UnixTimestamp::new_only_seconds(300), third_addr.clone()); - - assert!(worked); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(300), + third_addr.clone(), + ) + .unwrap(); assert_eq!(scheduler.num_scheduled_telecommands(), 3); assert!(scheduler.is_enabled()); @@ -223,35 +394,35 @@ mod tests { let mut scheduler = PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); - let worked = scheduler.insert_tc( - UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 1, - }, - ); + scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(100), + StoreAddr { + pool_idx: 0, + packet_idx: 1, + }, + ) + .unwrap(); - assert!(worked); + let worked = scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(100), + StoreAddr { + pool_idx: 0, + packet_idx: 2, + }, + ) + .unwrap(); - let worked = scheduler.insert_tc( - UnixTimestamp::new_only_seconds(100), - StoreAddr { - pool_idx: 0, - packet_idx: 2, - }, - ); - - assert!(worked); - - let worked = scheduler.insert_tc( - UnixTimestamp::new_only_seconds(300), - StoreAddr { - pool_idx: 0, - packet_idx: 2, - }, - ); - - assert!(worked); + let worked = scheduler + .insert_unwrapped_and_stored_tc( + UnixTimestamp::new_only_seconds(300), + StoreAddr { + pool_idx: 0, + packet_idx: 2, + }, + ) + .unwrap(); assert_eq!(scheduler.num_scheduled_telecommands(), 3); } @@ -275,6 +446,17 @@ mod tests { assert!(expected_store_addrs.contains(store_addr)); *counter += 1; } + fn common_check_disabled( + enabled: bool, + store_addr: &StoreAddr, + expected_store_addrs: Vec, + counter: &mut usize, + ) { + assert_eq!(enabled, false); + assert!(expected_store_addrs.contains(store_addr)); + *counter += 1; + } + #[test] fn release_basic() { let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); @@ -282,10 +464,11 @@ mod tests { PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let first_addr = pool.add(&[2, 2, 2]).unwrap(); - scheduler.insert_tc(UnixTimestamp::new_only_seconds(100), first_addr); + + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr); let second_addr = pool.add(&[5, 6, 7]).unwrap(); - scheduler.insert_tc(UnixTimestamp::new_only_seconds(200), second_addr); + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr); let mut i = 0; let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| { @@ -307,7 +490,7 @@ mod tests { .release_telecommands(&mut test_closure_1, &mut pool) .expect("deletion failed"); assert_eq!(released, 1); - assert!(!pool.has_element_at(&first_addr).unwrap()); + assert!(pool.has_element_at(&first_addr).unwrap()); // test 3, late timestamp, release 1 overdue tc let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| { @@ -321,7 +504,7 @@ mod tests { .release_telecommands(&mut test_closure_2, &mut pool) .expect("deletion failed"); assert_eq!(released, 1); - assert!(!pool.has_element_at(&second_addr).unwrap()); + assert!(pool.has_element_at(&second_addr).unwrap()); //test 4: no tcs left scheduler @@ -339,10 +522,11 @@ mod tests { PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); let first_addr = pool.add(&[2, 2, 2]).unwrap(); - scheduler.insert_tc(UnixTimestamp::new_only_seconds(100), first_addr); + + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr); let second_addr = pool.add(&[2, 2, 2]).unwrap(); - scheduler.insert_tc(UnixTimestamp::new_only_seconds(100), second_addr); + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), second_addr); let mut i = 0; let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { @@ -365,8 +549,8 @@ mod tests { .release_telecommands(&mut test_closure, &mut pool) .expect("deletion failed"); assert_eq!(released, 2); - assert!(!pool.has_element_at(&first_addr).unwrap()); - assert!(!pool.has_element_at(&second_addr).unwrap()); + assert!(pool.has_element_at(&first_addr).unwrap()); + assert!(pool.has_element_at(&second_addr).unwrap()); //test 3: no tcs left released = scheduler @@ -377,4 +561,261 @@ mod tests { // check that 2 total tcs have been released assert_eq!(i, 2); } + + #[test] + fn release_with_scheduler_disabled() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + scheduler.disable(); + + let first_addr = pool.add(&[2, 2, 2]).unwrap(); + + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr); + + let second_addr = pool.add(&[5, 6, 7]).unwrap(); + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr); + + let mut i = 0; + let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| { + common_check_disabled(boolvar, store_addr, vec![first_addr], &mut i); + true + }; + + // test 1: too early, no tcs + scheduler.update_time(UnixTimestamp::new_only_seconds(99)); + + scheduler + .release_telecommands(&mut test_closure_1, &mut pool) + .expect("deletion failed"); + + // test 2: exact time stamp of tc, releases 1 tc + scheduler.update_time(UnixTimestamp::new_only_seconds(100)); + + let mut released = scheduler + .release_telecommands(&mut test_closure_1, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 1); + assert!(!pool.has_element_at(&first_addr).unwrap()); + + // test 3, late timestamp, release 1 overdue tc + let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| { + common_check_disabled(boolvar, store_addr, vec![second_addr], &mut i); + true + }; + + scheduler.update_time(UnixTimestamp::new_only_seconds(206)); + + released = scheduler + .release_telecommands(&mut test_closure_2, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 1); + assert!(!pool.has_element_at(&second_addr).unwrap()); + + //test 4: no tcs left + scheduler + .release_telecommands(&mut test_closure_2, &mut pool) + .expect("deletion failed"); + + // check that 2 total tcs have been released + assert_eq!(i, 2); + } + + fn scheduled_tc(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc { + let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(×tamp).unwrap(); + + let len_time_stamp = cds_time.write_to_bytes(buf).unwrap(); + + let len_packet = base_ping_tc_simple_ctor() + .write_to_bytes(&mut buf[len_time_stamp..]) + .unwrap(); + let mut sph = SpHeader::tc_unseg(0x02, 0x34, len_packet as u16).unwrap(); + + PusTc::new_simple( + &mut sph, + 11, + 4, + Some(&buf[..len_packet + len_time_stamp]), + true, + ) + } + + fn wrong_tc_service(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc { + let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(×tamp).unwrap(); + + let len_time_stamp = cds_time.write_to_bytes(buf).unwrap(); + + let len_packet = base_ping_tc_simple_ctor() + .write_to_bytes(&mut buf[len_time_stamp..]) + .unwrap(); + let mut sph = SpHeader::tc_unseg(0x02, 0x34, len_packet as u16).unwrap(); + + PusTc::new_simple( + &mut sph, + 12, + 4, + Some(&buf[..len_packet + len_time_stamp]), + true, + ) + } + + fn wrong_tc_subservice(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc { + let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(×tamp).unwrap(); + + let len_time_stamp = cds_time.write_to_bytes(buf).unwrap(); + + let len_packet = base_ping_tc_simple_ctor() + .write_to_bytes(&mut buf[len_time_stamp..]) + .unwrap(); + let mut sph = SpHeader::tc_unseg(0x02, 0x34, len_packet as u16).unwrap(); + + PusTc::new_simple( + &mut sph, + 11, + 5, + Some(&buf[..len_packet + len_time_stamp]), + true, + ) + } + + fn base_ping_tc_simple_ctor() -> PusTc<'static> { + let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap(); + PusTc::new_simple(&mut sph, 17, 1, None, true) + } + + #[test] + fn insert_unwrapped_tc() { + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut buf: [u8; 32] = [0; 32]; + let len = base_ping_tc_simple_ctor().write_to_bytes(&mut buf).unwrap(); + + let addr = scheduler + .insert_unwrapped_tc(UnixTimestamp::new_only_seconds(100), &buf[..len], &mut pool) + .unwrap(); + + assert!(pool.has_element_at(&addr).unwrap()); + + let data = pool.read(&addr).unwrap(); + let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor()); + + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + + scheduler.update_time(UnixTimestamp::new_only_seconds(101)); + + let mut addr_vec = vec::Vec::new(); + + let mut i = 0; + let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { + common_check(boolvar, store_addr, vec![addr], &mut i); + // check that tc remains unchanged + addr_vec.push(*store_addr); + false + }; + + scheduler + .release_telecommands(&mut test_closure, &mut pool) + .unwrap(); + + let data = pool.read(&addr_vec[0]).unwrap(); + let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor()); + } + + #[test] + fn insert_wrapped_tc() { + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + + let mut buf: [u8; 32] = [0; 32]; + let tc = scheduled_tc(UnixTimestamp::new_only_seconds(100), &mut buf); + + let addr = match scheduler + .insert_wrapped_tc::(&tc, &mut pool) + { + Ok(addr) => addr, + Err(e) => { + println!("{}", e); + panic!(); + } + }; + + assert!(pool.has_element_at(&addr).unwrap()); + + let data = pool.read(&addr).unwrap(); + let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor()); + + assert_eq!(scheduler.num_scheduled_telecommands(), 1); + + scheduler.update_time(UnixTimestamp::new_only_seconds(101)); + + let mut addr_vec = vec::Vec::new(); + + let mut i = 0; + let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| { + common_check(boolvar, store_addr, vec![addr], &mut i); + // check that tc remains unchanged + addr_vec.push(*store_addr); + false + }; + + scheduler + .release_telecommands(&mut test_closure, &mut pool) + .unwrap(); + + let data = pool.read(&addr_vec[0]).unwrap(); + let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data"); + assert_eq!(check_tc.0, base_ping_tc_simple_ctor()); + } + + #[test] + fn insert_wrong_service() { + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + + let mut buf: [u8; 32] = [0; 32]; + let tc = wrong_tc_service(UnixTimestamp::new_only_seconds(100), &mut buf); + + match scheduler.insert_wrapped_tc::(&tc, &mut pool) { + Ok(_) => { + panic!(); + } + Err(e) => { + if e != ScheduleError::WrongService { + panic!(); + } + } + } + } + + #[test] + fn insert_wrong_subservice() { + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + + let mut buf: [u8; 32] = [0; 32]; + let tc = wrong_tc_subservice(UnixTimestamp::new_only_seconds(100), &mut buf); + + match scheduler.insert_wrapped_tc::(&tc, &mut pool) { + Ok(_) => { + panic!(); + } + Err(e) => { + if e != ScheduleError::WrongSubservice { + panic!(); + } + } + } + } } diff --git a/satrs-example/pyclient/main.py b/satrs-example/pyclient/main.py index c06f9ed..3d449b2 100755 --- a/satrs-example/pyclient/main.py +++ b/satrs-example/pyclient/main.py @@ -5,18 +5,21 @@ import struct import sys import time from typing import Optional +import datetime import tmtccmd from spacepackets.ecss import PusTelemetry, PusTelecommand, PusVerificator from spacepackets.ecss.pus_17_test import Service17Tm from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm +from spacepackets.ccsds.time import CdsShortTimestamp from tmtccmd import CcsdsTmtcBackend, TcHandlerBase, ProcedureParamsWrapper from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid +from tmtccmd.tc.pus_11_tc_sched import create_time_tagged_cmd from tmtccmd.core.base import BackendRequest from tmtccmd.pus import VerificationWrapper from tmtccmd.tm import CcsdsTmHandler, SpecificApidHandlerBase -from tmtccmd.com_if import ComInterface +from tmtccmd.com import ComInterface from tmtccmd.config import ( default_json_path, SetupParams, @@ -41,7 +44,7 @@ from tmtccmd.tc import ( SendCbParams, DefaultPusQueueHelper, ) -from tmtccmd.tm.pus_5_event import Service5Tm +from tmtccmd.tm.pus_5_fsfw_event import Service5Tm from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.util.obj_id import ObjectIdDictT @@ -57,7 +60,7 @@ class SatRsConfigHook(TmTcCfgHookBase): super().__init__(json_cfg_path=json_cfg_path) def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: - from tmtccmd.config.com_if import ( + from tmtccmd.config.com import ( create_com_interface_default, create_com_interface_cfg_default, ) @@ -94,6 +97,13 @@ class SatRsConfigHook(TmTcCfgHookBase): info="PUS Service 3 Housekeeping", op_code_entry=srv_3 ) + srv_11 = OpCodeEntry() + srv_11.add("0", "Scheduled TC Test") + defs.add_service( + name=CoreServiceList.SERVICE_11, + info="PUS Service 11 TC Scheduling", + op_code_entry=srv_11, + ) return defs def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int): @@ -120,7 +130,7 @@ class PusHandler(SpecificApidHandlerBase): def handle_tm(self, packet: bytes, _user_args: any): try: - tm_packet = PusTelemetry.unpack(packet) + tm_packet = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) except ValueError as e: LOGGER.warning("Could not generate PUS TM object from raw data") LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") @@ -128,7 +138,7 @@ class PusHandler(SpecificApidHandlerBase): service = tm_packet.service dedicated_handler = False if service == 1: - tm_packet = Service1Tm.unpack(data=packet, params=UnpackParams(1, 2)) + tm_packet = Service1Tm.unpack(data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2)) res = self.verif_wrapper.add_tm(tm_packet) if res is None: LOGGER.info( @@ -145,16 +155,16 @@ class PusHandler(SpecificApidHandlerBase): if service == 3: LOGGER.info("No handling for HK packets implemented") LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]") - pus_tm = PusTelemetry.unpack(packet) + pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) if pus_tm.subservice == 25: if len(pus_tm.source_data) < 8: raise ValueError("No addressable ID in HK packet") json_str = pus_tm.source_data[8:] dedicated_handler = True if service == 5: - tm_packet = Service5Tm.unpack(packet) + tm_packet = Service5Tm.unpack(packet, time_reader=CdsShortTimestamp.empty()) if service == 17: - tm_packet = Service17Tm.unpack(packet) + tm_packet = Service17Tm.unpack(packet, time_reader=CdsShortTimestamp.empty()) dedicated_handler = True if tm_packet.subservice == 2: self.printer.file_logger.info("Received Ping Reply TM[17,2]") @@ -205,7 +215,10 @@ class TcHandler(TcHandlerBase): self.verif_wrapper = verif_wrapper self.queue_helper = DefaultPusQueueHelper( queue_wrapper=None, + tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, seq_cnt_provider=seq_count_provider, + pus_verificator=self.verif_wrapper.pus_verificator, + default_pus_apid=EXAMPLE_PUS_APID ) def send_cb(self, send_params: SendCbParams): @@ -213,10 +226,6 @@ class TcHandler(TcHandlerBase): if entry_helper.is_tc: if entry_helper.entry_type == TcQueueEntryType.PUS_TC: pus_tc_wrapper = entry_helper.to_pus_tc_entry() - pus_tc_wrapper.pus_tc.seq_count = ( - self.seq_count_provider.get_and_increment() - ) - self.verif_wrapper.add_tc(pus_tc_wrapper.pus_tc) raw_tc = pus_tc_wrapper.pus_tc.pack() LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}") send_params.com_if.send(raw_tc) @@ -247,6 +256,14 @@ class TcHandler(TcHandlerBase): return q.add_pus_tc( PusTelecommand(service=17, subservice=1) ) + if service == CoreServiceList.SERVICE_11: + q.add_log_cmd("Sending PUS scheduled TC telecommand") + crt_time = CdsShortTimestamp.from_now() + time_stamp = crt_time + datetime.timedelta(seconds=10) + time_stamp = time_stamp.pack() + return q.add_pus_tc( + create_time_tagged_cmd(time_stamp, PusTelecommand(service=17, subservice=1), apid=EXAMPLE_PUS_APID) + ) if service == CoreServiceList.SERVICE_3: if op_code in HkOpCodes.GENERATE_ONE_SHOT: q.add_log_cmd("Sending HK one shot request") diff --git a/satrs-example/pyclient/requirements.txt b/satrs-example/pyclient/requirements.txt index 8e9634c..06fc287 100644 --- a/satrs-example/pyclient/requirements.txt +++ b/satrs-example/pyclient/requirements.txt @@ -1 +1,2 @@ -tmtccmd == 3.0.0 +# tmtccmd == 4.0.0a2 +-e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd diff --git a/satrs-example/src/pus.rs b/satrs-example/src/pus.rs index e2de493..a889704 100644 --- a/satrs-example/src/pus.rs +++ b/satrs-example/src/pus.rs @@ -6,6 +6,7 @@ use satrs_core::pool::StoreAddr; use satrs_core::pus::event::Subservices; use satrs_core::pus::event_man::{EventRequest, EventRequestWithToken}; use satrs_core::pus::hk; +use satrs_core::pus::scheduling::PusScheduler; use satrs_core::pus::verification::{ FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; @@ -17,7 +18,9 @@ use satrs_core::{ spacepackets::time::TimeWriter, spacepackets::SpHeader, }; use satrs_example::{hk_err, tmtc_err}; +use std::cell::RefCell; use std::collections::HashMap; +use std::rc::Rc; use std::sync::mpsc::Sender; pub struct PusReceiver { @@ -31,6 +34,7 @@ pub struct PusReceiver { request_map: HashMap>, stamper: TimeProvider, time_stamp: [u8; 7], + scheduler: Rc>, } impl PusReceiver { @@ -42,6 +46,7 @@ impl PusReceiver { tc_source: PusTcSource, event_request_tx: Sender, request_map: HashMap>, + scheduler: Rc>, ) -> Self { Self { tm_helper: PusTmWithCdsShortHelper::new(apid), @@ -53,6 +58,7 @@ impl PusReceiver { request_map, stamper: TimeProvider::new_with_u16_days(0, 0), time_stamp: [0; 7], + scheduler, } } } @@ -78,6 +84,8 @@ impl PusServiceProvider for PusReceiver { self.handle_event_request(pus_tc, accepted_token); } else if service == 3 { self.handle_hk_request(pus_tc, accepted_token); + } else if service == 11 { + self.handle_scheduled_tc(pus_tc, accepted_token); } else { self.update_time_stamp(); self.verif_reporter @@ -201,6 +209,7 @@ impl PusReceiver { )); } } + fn handle_event_request(&mut self, pus_tc: &PusTc, token: VerificationToken) { let send_start_failure = |verif_reporter: &mut StdVerifReporterWithSender, timestamp: &[u8; 7], @@ -273,4 +282,112 @@ impl PusReceiver { } } } + + fn handle_scheduled_tc(&mut self, pus_tc: &PusTc, token: VerificationToken) { + if pus_tc.user_data().is_none() { + self.update_time_stamp(); + self.verif_reporter + .start_failure( + token, + FailParams::new(Some(&self.time_stamp), &tmtc_err::NOT_ENOUGH_APP_DATA, None), + ) + .expect("Sending start failure TM failed"); + return; + } + + self.update_time_stamp(); + match pus_tc.subservice() { + 1 => { + let start_token = self + .verif_reporter + .start_success(token, Some(&self.time_stamp)) + .expect("Error sending start success"); + + let mut scheduler = self.scheduler.borrow_mut(); + scheduler.enable(); + if scheduler.is_enabled() { + self.verif_reporter + .completion_success(start_token, Some(&self.time_stamp)) + .expect("Error sending completion success"); + } else { + panic!("Failed to enable scheduler"); + } + drop(scheduler); + } + 2 => { + let start_token = self + .verif_reporter + .start_success(token, Some(&self.time_stamp)) + .expect("Error sending start success"); + + let mut scheduler = self.scheduler.borrow_mut(); + scheduler.disable(); + if !scheduler.is_enabled() { + self.verif_reporter + .completion_success(start_token, Some(&self.time_stamp)) + .expect("Error sending completion success"); + } else { + panic!("Failed to disable scheduler"); + } + drop(scheduler); + } + 3 => { + let start_token = self + .verif_reporter + .start_success(token, Some(&self.time_stamp)) + .expect("Error sending start success"); + + let mut pool = self + .tc_source + .tc_store + .pool + .write() + .expect("Locking pool failed"); + + let mut scheduler = self.scheduler.borrow_mut(); + scheduler + .reset(pool.as_mut()) + .expect("Error resetting TC Pool"); + drop(scheduler); + + self.verif_reporter + .completion_success(start_token, Some(&self.time_stamp)) + .expect("Error sending completion success"); + } + 4 => { + let start_token = self + .verif_reporter + .start_success(token, Some(&self.time_stamp)) + .expect("Error sending start success"); + + let mut pool = self + .tc_source + .tc_store + .pool + .write() + .expect("Locking pool failed"); + let mut scheduler = self.scheduler.borrow_mut(); + scheduler + .insert_wrapped_tc::(pus_tc, pool.as_mut()) + .expect("TODO: panic message"); + drop(scheduler); + + self.verif_reporter + .completion_success(start_token, Some(&self.time_stamp)) + .expect("Error sending completion success"); + } + _ => { + self.verif_reporter + .start_failure( + token, + FailParams::new( + Some(&self.time_stamp), + &tmtc_err::NOT_ENOUGH_APP_DATA, + None, + ), + ) + .expect("Sending start failure TM failed"); + } + } + } } diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index fc9aeba..3f5142a 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -1,10 +1,12 @@ use satrs_core::events::EventU32; use satrs_core::hal::host::udp_server::{ReceiveResult, UdpTcServer}; use satrs_core::params::Params; +use std::cell::RefCell; use std::collections::HashMap; use std::error::Error; use std::fmt::{Display, Formatter}; use std::net::SocketAddr; +use std::rc::Rc; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use std::thread; use std::time::Duration; @@ -14,6 +16,7 @@ use crate::pus::PusReceiver; use crate::requests::RequestWithToken; use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; use satrs_core::pus::event_man::EventRequestWithToken; +use satrs_core::pus::scheduling::PusScheduler; use satrs_core::pus::verification::StdVerifReporterWithSender; use satrs_core::spacepackets::{ecss::PusPacket, tc::PusTc, tm::PusTm, SpHeader}; use satrs_core::tmtc::{ @@ -41,6 +44,12 @@ pub struct TcArgs { pub tc_receiver: Receiver, } +impl TcArgs { + fn split(self) -> (PusTcSource, Receiver) { + (self.tc_source, self.tc_receiver) + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum MpscStoreAndSendError { StoreError(StoreError), @@ -152,6 +161,11 @@ impl ReceivesCcsdsTc for PusTcSource { } } pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { + let scheduler = Rc::new(RefCell::new( + PusScheduler::new_with_current_init_time(Duration::from_secs(5)).unwrap(), + )); + + let sched_clone = scheduler.clone(); let mut pus_receiver = PusReceiver::new( PUS_APID, tm_args.tm_sink_sender, @@ -160,11 +174,15 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { tc_args.tc_source.clone(), args.event_request_tx, args.request_map, + sched_clone, ); + let ccsds_receiver = CcsdsReceiver { tc_source: tc_args.tc_source.clone(), }; + let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let udp_tc_server = UdpTcServer::new(args.sock_addr, 2048, Box::new(ccsds_distributor)) .expect("Creating UDP TMTC server failed"); @@ -173,8 +191,17 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { tm_rx: tm_args.tm_server_rx, tm_store: tm_args.tm_store.pool.clone(), }; + + let mut tc_buf: [u8; 4096] = [0; 4096]; loop { - core_tmtc_loop(&mut udp_tmtc_server, &mut tc_args, &mut pus_receiver); + let tmtc_sched = scheduler.clone(); + core_tmtc_loop( + &mut udp_tmtc_server, + &mut tc_args, + &mut tc_buf, + &mut pus_receiver, + tmtc_sched, + ); thread::sleep(Duration::from_millis(400)); } } @@ -182,8 +209,34 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { fn core_tmtc_loop( udp_tmtc_server: &mut UdpTmtcServer, tc_args: &mut TcArgs, + tc_buf: &mut [u8], pus_receiver: &mut PusReceiver, + scheduler: Rc>, ) { + let releaser = |enabled: bool, addr: &StoreAddr| -> bool { + tc_args.tc_source.tc_source.send(*addr).is_ok() + }; + + let mut pool = tc_args + .tc_source + .tc_store + .pool + .write() + .expect("error locking pool"); + + let mut scheduler = scheduler.borrow_mut(); + scheduler.update_time_from_now().unwrap(); + match scheduler.release_telecommands(releaser, pool.as_mut()) { + Ok(released_tcs) => { + if released_tcs > 0 { + println!("{} Tc(s) released from scheduler", released_tcs); + } + } + Err(_) => {} + } + drop(pool); + drop(scheduler); + while poll_tc_server(udp_tmtc_server) {} match tc_args.tc_receiver.try_recv() { Ok(addr) => { @@ -194,7 +247,9 @@ fn core_tmtc_loop( .read() .expect("locking tc pool failed"); let data = pool.read(&addr).expect("reading pool failed"); - match PusTc::from_bytes(data) { + tc_buf[0..data.len()].copy_from_slice(data); + drop(pool); + match PusTc::from_bytes(tc_buf) { Ok((pus_tc, _)) => { pus_receiver .handle_pus_tc_packet(pus_tc.service(), pus_tc.sp_header(), &pus_tc) @@ -202,7 +257,7 @@ fn core_tmtc_loop( } Err(e) => { println!("error creating PUS TC from raw data: {e}"); - println!("raw data: {data:x?}"); + println!("raw data: {tc_buf:x?}"); } } }