From 7569244a9090943f8c636ae79b326ac12a27a498 Mon Sep 17 00:00:00 2001 From: lkoester Date: Mon, 30 Jan 2023 13:49:37 +0100 Subject: [PATCH] fixed scheduler logic in tmtc loop and pus handler, fixed pyclient implementation of service 11 --- satrs-core/src/pus/scheduling.rs | 86 +++++++++++++++++++++++-- satrs-example/log/tmtc_error.log | 0 satrs-example/pyclient/main.py | 41 ++++++++---- satrs-example/pyclient/requirements.txt | 3 +- satrs-example/src/pus.rs | 27 ++++++-- satrs-example/src/tmtc.rs | 35 +++++++--- 6 files changed, 157 insertions(+), 35 deletions(-) create mode 100644 satrs-example/log/tmtc_error.log diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index 9c77e2d..7c8ce92 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -10,6 +10,7 @@ use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc}; use spacepackets::time::cds::DaysLen24Bits; use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; use std::collections::BTreeMap; +use std::dbg; #[cfg(feature = "std")] use std::error::Error; #[cfg(feature = "std")] @@ -285,13 +286,14 @@ impl PusScheduler { /// Utility method which calls [Self::telecommands_to_release] and then calls a releaser /// closure for each telecommand which should be released. This function will also delete - /// the telecommands from the holding store after calling the release closure. + /// the telecommands from the holding store after calling the release closure, if the scheduler + /// is disabled. /// /// # Arguments /// /// * `releaser` - Closure where the first argument is whether the scheduler is enabled and /// the second argument is the store address. This closure should return whether the - /// command should be deleted. + /// command should be deleted if the scheduler is disabled to prevent memory leaks. /// * `store` - The holding store of the telecommands. pub fn release_telecommands bool>( &mut self, @@ -305,7 +307,7 @@ impl PusScheduler { for addr in tc.1 { let should_delete = releaser(self.enabled, addr); released_tcs += 1; - if should_delete { + if should_delete && !self.is_enabled() { let res = tc_store.delete(*addr); if res.is_err() { store_error = res; @@ -447,6 +449,16 @@ mod tests { assert!(expected_store_addrs.contains(store_addr)); *counter += 1; } + fn common_check_disabled( + enabled: bool, + store_addr: &StoreAddr, + expected_store_addrs: Vec, + counter: &mut usize, + ) { + assert_eq!(enabled, false); + assert!(expected_store_addrs.contains(store_addr)); + *counter += 1; + } #[test] fn release_basic() { @@ -481,7 +493,7 @@ mod tests { .release_telecommands(&mut test_closure_1, &mut pool) .expect("deletion failed"); assert_eq!(released, 1); - assert!(!pool.has_element_at(&first_addr).unwrap()); + assert!(pool.has_element_at(&first_addr).unwrap()); // test 3, late timestamp, release 1 overdue tc let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| { @@ -495,7 +507,7 @@ mod tests { .release_telecommands(&mut test_closure_2, &mut pool) .expect("deletion failed"); assert_eq!(released, 1); - assert!(!pool.has_element_at(&second_addr).unwrap()); + assert!(pool.has_element_at(&second_addr).unwrap()); //test 4: no tcs left scheduler @@ -540,8 +552,8 @@ mod tests { .release_telecommands(&mut test_closure, &mut pool) .expect("deletion failed"); assert_eq!(released, 2); - assert!(!pool.has_element_at(&first_addr).unwrap()); - assert!(!pool.has_element_at(&second_addr).unwrap()); + assert!(pool.has_element_at(&first_addr).unwrap()); + assert!(pool.has_element_at(&second_addr).unwrap()); //test 3: no tcs left released = scheduler @@ -553,6 +565,66 @@ mod tests { assert_eq!(i, 2); } + #[test] + fn release_with_scheduler_disabled() { + let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)])); + let mut scheduler = + PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5)); + + scheduler.disable(); + + let first_addr = pool.add(&[2, 2, 2]).unwrap(); + + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr); + + let second_addr = pool.add(&[5, 6, 7]).unwrap(); + scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr); + + let mut i = 0; + let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| { + common_check_disabled(boolvar, store_addr, vec![first_addr], &mut i); + true + }; + + // test 1: too early, no tcs + scheduler.update_time(UnixTimestamp::new_only_seconds(99)); + + scheduler + .release_telecommands(&mut test_closure_1, &mut pool) + .expect("deletion failed"); + + // test 2: exact time stamp of tc, releases 1 tc + scheduler.update_time(UnixTimestamp::new_only_seconds(100)); + + let mut released = scheduler + .release_telecommands(&mut test_closure_1, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 1); + assert!(!pool.has_element_at(&first_addr).unwrap()); + + // test 3, late timestamp, release 1 overdue tc + let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| { + common_check_disabled(boolvar, store_addr, vec![second_addr], &mut i); + true + }; + + scheduler.update_time(UnixTimestamp::new_only_seconds(206)); + + released = scheduler + .release_telecommands(&mut test_closure_2, &mut pool) + .expect("deletion failed"); + assert_eq!(released, 1); + assert!(!pool.has_element_at(&second_addr).unwrap()); + + //test 4: no tcs left + scheduler + .release_telecommands(&mut test_closure_2, &mut pool) + .expect("deletion failed"); + + // check that 2 total tcs have been released + assert_eq!(i, 2); + } + fn scheduled_tc(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc { let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(×tamp).unwrap(); diff --git a/satrs-example/log/tmtc_error.log b/satrs-example/log/tmtc_error.log new file mode 100644 index 0000000..e69de29 diff --git a/satrs-example/pyclient/main.py b/satrs-example/pyclient/main.py index c06f9ed..3d449b2 100755 --- a/satrs-example/pyclient/main.py +++ b/satrs-example/pyclient/main.py @@ -5,18 +5,21 @@ import struct import sys import time from typing import Optional +import datetime import tmtccmd from spacepackets.ecss import PusTelemetry, PusTelecommand, PusVerificator from spacepackets.ecss.pus_17_test import Service17Tm from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm +from spacepackets.ccsds.time import CdsShortTimestamp from tmtccmd import CcsdsTmtcBackend, TcHandlerBase, ProcedureParamsWrapper from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid +from tmtccmd.tc.pus_11_tc_sched import create_time_tagged_cmd from tmtccmd.core.base import BackendRequest from tmtccmd.pus import VerificationWrapper from tmtccmd.tm import CcsdsTmHandler, SpecificApidHandlerBase -from tmtccmd.com_if import ComInterface +from tmtccmd.com import ComInterface from tmtccmd.config import ( default_json_path, SetupParams, @@ -41,7 +44,7 @@ from tmtccmd.tc import ( SendCbParams, DefaultPusQueueHelper, ) -from tmtccmd.tm.pus_5_event import Service5Tm +from tmtccmd.tm.pus_5_fsfw_event import Service5Tm from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.util.obj_id import ObjectIdDictT @@ -57,7 +60,7 @@ class SatRsConfigHook(TmTcCfgHookBase): super().__init__(json_cfg_path=json_cfg_path) def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: - from tmtccmd.config.com_if import ( + from tmtccmd.config.com import ( create_com_interface_default, create_com_interface_cfg_default, ) @@ -94,6 +97,13 @@ class SatRsConfigHook(TmTcCfgHookBase): info="PUS Service 3 Housekeeping", op_code_entry=srv_3 ) + srv_11 = OpCodeEntry() + srv_11.add("0", "Scheduled TC Test") + defs.add_service( + name=CoreServiceList.SERVICE_11, + info="PUS Service 11 TC Scheduling", + op_code_entry=srv_11, + ) return defs def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int): @@ -120,7 +130,7 @@ class PusHandler(SpecificApidHandlerBase): def handle_tm(self, packet: bytes, _user_args: any): try: - tm_packet = PusTelemetry.unpack(packet) + tm_packet = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) except ValueError as e: LOGGER.warning("Could not generate PUS TM object from raw data") LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") @@ -128,7 +138,7 @@ class PusHandler(SpecificApidHandlerBase): service = tm_packet.service dedicated_handler = False if service == 1: - tm_packet = Service1Tm.unpack(data=packet, params=UnpackParams(1, 2)) + tm_packet = Service1Tm.unpack(data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2)) res = self.verif_wrapper.add_tm(tm_packet) if res is None: LOGGER.info( @@ -145,16 +155,16 @@ class PusHandler(SpecificApidHandlerBase): if service == 3: LOGGER.info("No handling for HK packets implemented") LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]") - pus_tm = PusTelemetry.unpack(packet) + pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) if pus_tm.subservice == 25: if len(pus_tm.source_data) < 8: raise ValueError("No addressable ID in HK packet") json_str = pus_tm.source_data[8:] dedicated_handler = True if service == 5: - tm_packet = Service5Tm.unpack(packet) + tm_packet = Service5Tm.unpack(packet, time_reader=CdsShortTimestamp.empty()) if service == 17: - tm_packet = Service17Tm.unpack(packet) + tm_packet = Service17Tm.unpack(packet, time_reader=CdsShortTimestamp.empty()) dedicated_handler = True if tm_packet.subservice == 2: self.printer.file_logger.info("Received Ping Reply TM[17,2]") @@ -205,7 +215,10 @@ class TcHandler(TcHandlerBase): self.verif_wrapper = verif_wrapper self.queue_helper = DefaultPusQueueHelper( queue_wrapper=None, + tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, seq_cnt_provider=seq_count_provider, + pus_verificator=self.verif_wrapper.pus_verificator, + default_pus_apid=EXAMPLE_PUS_APID ) def send_cb(self, send_params: SendCbParams): @@ -213,10 +226,6 @@ class TcHandler(TcHandlerBase): if entry_helper.is_tc: if entry_helper.entry_type == TcQueueEntryType.PUS_TC: pus_tc_wrapper = entry_helper.to_pus_tc_entry() - pus_tc_wrapper.pus_tc.seq_count = ( - self.seq_count_provider.get_and_increment() - ) - self.verif_wrapper.add_tc(pus_tc_wrapper.pus_tc) raw_tc = pus_tc_wrapper.pus_tc.pack() LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}") send_params.com_if.send(raw_tc) @@ -247,6 +256,14 @@ class TcHandler(TcHandlerBase): return q.add_pus_tc( PusTelecommand(service=17, subservice=1) ) + if service == CoreServiceList.SERVICE_11: + q.add_log_cmd("Sending PUS scheduled TC telecommand") + crt_time = CdsShortTimestamp.from_now() + time_stamp = crt_time + datetime.timedelta(seconds=10) + time_stamp = time_stamp.pack() + return q.add_pus_tc( + create_time_tagged_cmd(time_stamp, PusTelecommand(service=17, subservice=1), apid=EXAMPLE_PUS_APID) + ) if service == CoreServiceList.SERVICE_3: if op_code in HkOpCodes.GENERATE_ONE_SHOT: q.add_log_cmd("Sending HK one shot request") diff --git a/satrs-example/pyclient/requirements.txt b/satrs-example/pyclient/requirements.txt index 8e9634c..06fc287 100644 --- a/satrs-example/pyclient/requirements.txt +++ b/satrs-example/pyclient/requirements.txt @@ -1 +1,2 @@ -tmtccmd == 3.0.0 +# tmtccmd == 4.0.0a2 +-e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd diff --git a/satrs-example/src/pus.rs b/satrs-example/src/pus.rs index d33be86..c4d0e9a 100644 --- a/satrs-example/src/pus.rs +++ b/satrs-example/src/pus.rs @@ -287,8 +287,18 @@ impl PusReceiver { } fn handle_scheduled_tc(&mut self, pus_tc: &PusTc, token: VerificationToken) { - self.update_time_stamp(); + if pus_tc.user_data().is_none() { + self.update_time_stamp(); + self.verif_reporter + .start_failure( + token, + FailParams::new(Some(&self.time_stamp), &tmtc_err::NOT_ENOUGH_APP_DATA, None), + ) + .expect("Sending start failure TM failed"); + return; + } + self.update_time_stamp(); match pus_tc.subservice() { 1 => { let start_token = self @@ -305,6 +315,7 @@ impl PusReceiver { } else { panic!("Failed to enable scheduler"); } + drop(scheduler); } 2 => { let start_token = self @@ -321,6 +332,7 @@ impl PusReceiver { } else { panic!("Failed to disable scheduler"); } + drop(scheduler); } 3 => { let start_token = self @@ -328,7 +340,6 @@ impl PusReceiver { .start_success(token, Some(&self.time_stamp)) .expect("Error sending start success"); - let mut scheduler = self.scheduler.borrow_mut(); let mut pool = self .tc_source .tc_store @@ -336,9 +347,11 @@ impl PusReceiver { .write() .expect("Locking pool failed"); + let mut scheduler = self.scheduler.borrow_mut(); scheduler .reset(pool.as_mut()) .expect("Error resetting TC Pool"); + drop(scheduler); self.verif_reporter .completion_success(start_token, Some(&self.time_stamp)) @@ -350,19 +363,19 @@ impl PusReceiver { .start_success(token, Some(&self.time_stamp)) .expect("Error sending start success"); - let mut scheduler = self.scheduler.borrow_mut(); let mut pool = self .tc_source .tc_store .pool .write() .expect("Locking pool failed"); + let mut scheduler = self.scheduler.borrow_mut(); scheduler - .insert_wrapped_tc::( - pus_tc, - pool.as_mut(), - ) + .insert_wrapped_tc::(pus_tc, pool.as_mut()) .expect("TODO: panic message"); + let time = + TimeProvider::from_bytes_with_u16_days(&pus_tc.user_data().unwrap()).unwrap(); + drop(scheduler); self.verif_reporter .completion_success(start_token, Some(&self.time_stamp)) diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index f653736..bd0ab0b 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -195,11 +195,13 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { //let (mut tc_source, mut tc_receiver) = tc_args.split(); + let mut tc_buf: [u8; 4096] = [0; 4096]; loop { let mut tmtc_sched = scheduler.clone(); core_tmtc_loop( &mut udp_tmtc_server, &mut tc_args, + &mut tc_buf, //&mut tc_source, //&mut tc_receiver, &mut pus_receiver, @@ -212,6 +214,7 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { fn core_tmtc_loop( udp_tmtc_server: &mut UdpTmtcServer, tc_args: &mut TcArgs, + tc_buf: &mut [u8], //tc_source: &mut PusTcSource, //tc_receiver: &mut Receiver, pus_receiver: &mut PusReceiver, @@ -224,23 +227,40 @@ fn core_tmtc_loop( } }; + let mut pool = tc_args + .tc_source + .tc_store + .pool + .write() + .expect("error locking pool"); + let mut scheduler = scheduler.borrow_mut(); - let mut pool = tc_args.tc_source.tc_store.pool.write().expect("error locking pool"); - scheduler - .release_telecommands(releaser, pool.as_mut()) - .expect("error releasing tc"); + scheduler.update_time_from_now().unwrap(); + match scheduler.release_telecommands(releaser, pool.as_mut()) { + Ok(released_tcs) => { + if released_tcs > 0 { + println!("{} Tc(s) released from scheduler", released_tcs); + } + } + Err(_) => {} + } + //.expect("error releasing tc"); drop(pool); + drop(scheduler); while poll_tc_server(udp_tmtc_server) {} match tc_args.tc_receiver.try_recv() { Ok(addr) => { - let pool = tc_args.tc_source + let pool = tc_args + .tc_source .tc_store .pool .read() .expect("locking tc pool failed"); let data = pool.read(&addr).expect("reading pool failed"); - match PusTc::from_bytes(data) { + tc_buf[0..data.len()].copy_from_slice(data); + drop(pool); + match PusTc::from_bytes(tc_buf) { Ok((pus_tc, _)) => { pus_receiver .handle_pus_tc_packet(pus_tc.service(), pus_tc.sp_header(), &pus_tc) @@ -248,7 +268,7 @@ fn core_tmtc_loop( } Err(e) => { println!("error creating PUS TC from raw data: {e}"); - println!("raw data: {data:x?}"); + println!("raw data: {tc_buf:x?}"); } } } @@ -264,7 +284,6 @@ fn core_tmtc_loop( } fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { - match udp_tmtc_server.udp_tc_server.try_recv_tc() { Ok(_) => true, Err(e) => match e {