Implementation of scheduler in pus and tmtc handler #29

Merged
muellerr merged 13 commits from pus_schedule_implementation into main 2023-02-01 13:40:49 +01:00
6 changed files with 157 additions and 35 deletions
Showing only changes of commit 7569244a90 - Show all commits

View File

@ -10,6 +10,7 @@ use spacepackets::tc::{GenericPusTcSecondaryHeader, PusTc};
use spacepackets::time::cds::DaysLen24Bits; use spacepackets::time::cds::DaysLen24Bits;
use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp}; use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::dbg;
lkoester marked this conversation as resolved Outdated

should be removed (or put behind std feature and `#[allow(unused_imports)]

should be removed (or put behind std feature and `#[allow(unused_imports)]
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::error::Error; use std::error::Error;
#[cfg(feature = "std")] #[cfg(feature = "std")]
@ -285,13 +286,14 @@ impl PusScheduler {
/// Utility method which calls [Self::telecommands_to_release] and then calls a releaser /// Utility method which calls [Self::telecommands_to_release] and then calls a releaser
/// closure for each telecommand which should be released. This function will also delete /// closure for each telecommand which should be released. This function will also delete
/// the telecommands from the holding store after calling the release closure. /// the telecommands from the holding store after calling the release closure, if the scheduler
/// is disabled.
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `releaser` - Closure where the first argument is whether the scheduler is enabled and /// * `releaser` - Closure where the first argument is whether the scheduler is enabled and
/// the second argument is the store address. This closure should return whether the /// the second argument is the store address. This closure should return whether the
/// command should be deleted. /// command should be deleted if the scheduler is disabled to prevent memory leaks.
/// * `store` - The holding store of the telecommands. /// * `store` - The holding store of the telecommands.
pub fn release_telecommands<R: FnMut(bool, &StoreAddr) -> bool>( pub fn release_telecommands<R: FnMut(bool, &StoreAddr) -> bool>(
&mut self, &mut self,
@ -305,7 +307,7 @@ impl PusScheduler {
for addr in tc.1 { for addr in tc.1 {
let should_delete = releaser(self.enabled, addr); let should_delete = releaser(self.enabled, addr);
released_tcs += 1; released_tcs += 1;
if should_delete { if should_delete && !self.is_enabled() {
let res = tc_store.delete(*addr); let res = tc_store.delete(*addr);
if res.is_err() { if res.is_err() {
store_error = res; store_error = res;
@ -447,6 +449,16 @@ mod tests {
assert!(expected_store_addrs.contains(store_addr)); assert!(expected_store_addrs.contains(store_addr));
*counter += 1; *counter += 1;
} }
fn common_check_disabled(
enabled: bool,
store_addr: &StoreAddr,
expected_store_addrs: Vec<StoreAddr>,
counter: &mut usize,
) {
assert_eq!(enabled, false);
assert!(expected_store_addrs.contains(store_addr));
*counter += 1;
}
#[test] #[test]
fn release_basic() { fn release_basic() {
@ -481,7 +493,7 @@ mod tests {
.release_telecommands(&mut test_closure_1, &mut pool) .release_telecommands(&mut test_closure_1, &mut pool)
.expect("deletion failed"); .expect("deletion failed");
assert_eq!(released, 1); assert_eq!(released, 1);
assert!(!pool.has_element_at(&first_addr).unwrap()); assert!(pool.has_element_at(&first_addr).unwrap());
// test 3, late timestamp, release 1 overdue tc // test 3, late timestamp, release 1 overdue tc
let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| { let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| {
@ -495,7 +507,7 @@ mod tests {
.release_telecommands(&mut test_closure_2, &mut pool) .release_telecommands(&mut test_closure_2, &mut pool)
.expect("deletion failed"); .expect("deletion failed");
assert_eq!(released, 1); assert_eq!(released, 1);
assert!(!pool.has_element_at(&second_addr).unwrap()); assert!(pool.has_element_at(&second_addr).unwrap());
//test 4: no tcs left //test 4: no tcs left
scheduler scheduler
@ -540,8 +552,8 @@ mod tests {
.release_telecommands(&mut test_closure, &mut pool) .release_telecommands(&mut test_closure, &mut pool)
.expect("deletion failed"); .expect("deletion failed");
assert_eq!(released, 2); assert_eq!(released, 2);
assert!(!pool.has_element_at(&first_addr).unwrap()); assert!(pool.has_element_at(&first_addr).unwrap());
assert!(!pool.has_element_at(&second_addr).unwrap()); assert!(pool.has_element_at(&second_addr).unwrap());
//test 3: no tcs left //test 3: no tcs left
released = scheduler released = scheduler
@ -553,6 +565,66 @@ mod tests {
assert_eq!(i, 2); assert_eq!(i, 2);
} }
#[test]
fn release_with_scheduler_disabled() {
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
scheduler.disable();
let first_addr = pool.add(&[2, 2, 2]).unwrap();
scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr);
let second_addr = pool.add(&[5, 6, 7]).unwrap();
scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr);
let mut i = 0;
let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| {
common_check_disabled(boolvar, store_addr, vec![first_addr], &mut i);
true
};
// test 1: too early, no tcs
scheduler.update_time(UnixTimestamp::new_only_seconds(99));
scheduler
.release_telecommands(&mut test_closure_1, &mut pool)
.expect("deletion failed");
// test 2: exact time stamp of tc, releases 1 tc
scheduler.update_time(UnixTimestamp::new_only_seconds(100));
let mut released = scheduler
.release_telecommands(&mut test_closure_1, &mut pool)
.expect("deletion failed");
assert_eq!(released, 1);
assert!(!pool.has_element_at(&first_addr).unwrap());
// test 3, late timestamp, release 1 overdue tc
let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| {
common_check_disabled(boolvar, store_addr, vec![second_addr], &mut i);
true
};
scheduler.update_time(UnixTimestamp::new_only_seconds(206));
released = scheduler
.release_telecommands(&mut test_closure_2, &mut pool)
.expect("deletion failed");
assert_eq!(released, 1);
assert!(!pool.has_element_at(&second_addr).unwrap());
//test 4: no tcs left
scheduler
.release_telecommands(&mut test_closure_2, &mut pool)
.expect("deletion failed");
// check that 2 total tcs have been released
assert_eq!(i, 2);
}
fn scheduled_tc(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc { fn scheduled_tc(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc {
let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(&timestamp).unwrap(); let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(&timestamp).unwrap();

View File

View File

@ -5,18 +5,21 @@ import struct
import sys import sys
import time import time
from typing import Optional from typing import Optional
import datetime
import tmtccmd import tmtccmd
from spacepackets.ecss import PusTelemetry, PusTelecommand, PusVerificator from spacepackets.ecss import PusTelemetry, PusTelecommand, PusVerificator
from spacepackets.ecss.pus_17_test import Service17Tm from spacepackets.ecss.pus_17_test import Service17Tm
from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm
from spacepackets.ccsds.time import CdsShortTimestamp
from tmtccmd import CcsdsTmtcBackend, TcHandlerBase, ProcedureParamsWrapper from tmtccmd import CcsdsTmtcBackend, TcHandlerBase, ProcedureParamsWrapper
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.tc.pus_11_tc_sched import create_time_tagged_cmd
from tmtccmd.core.base import BackendRequest from tmtccmd.core.base import BackendRequest
from tmtccmd.pus import VerificationWrapper from tmtccmd.pus import VerificationWrapper
from tmtccmd.tm import CcsdsTmHandler, SpecificApidHandlerBase from tmtccmd.tm import CcsdsTmHandler, SpecificApidHandlerBase
from tmtccmd.com_if import ComInterface from tmtccmd.com import ComInterface
from tmtccmd.config import ( from tmtccmd.config import (
default_json_path, default_json_path,
SetupParams, SetupParams,
@ -41,7 +44,7 @@ from tmtccmd.tc import (
SendCbParams, SendCbParams,
DefaultPusQueueHelper, DefaultPusQueueHelper,
) )
from tmtccmd.tm.pus_5_event import Service5Tm from tmtccmd.tm.pus_5_fsfw_event import Service5Tm
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.obj_id import ObjectIdDictT from tmtccmd.util.obj_id import ObjectIdDictT
@ -57,7 +60,7 @@ class SatRsConfigHook(TmTcCfgHookBase):
super().__init__(json_cfg_path=json_cfg_path) super().__init__(json_cfg_path=json_cfg_path)
def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
from tmtccmd.config.com_if import ( from tmtccmd.config.com import (
create_com_interface_default, create_com_interface_default,
create_com_interface_cfg_default, create_com_interface_cfg_default,
) )
@ -94,6 +97,13 @@ class SatRsConfigHook(TmTcCfgHookBase):
info="PUS Service 3 Housekeeping", info="PUS Service 3 Housekeeping",
op_code_entry=srv_3 op_code_entry=srv_3
) )
srv_11 = OpCodeEntry()
srv_11.add("0", "Scheduled TC Test")
defs.add_service(
name=CoreServiceList.SERVICE_11,
info="PUS Service 11 TC Scheduling",
op_code_entry=srv_11,
)
return defs return defs
def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int): def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int):
@ -120,7 +130,7 @@ class PusHandler(SpecificApidHandlerBase):
def handle_tm(self, packet: bytes, _user_args: any): def handle_tm(self, packet: bytes, _user_args: any):
try: try:
tm_packet = PusTelemetry.unpack(packet) tm_packet = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty())
except ValueError as e: except ValueError as e:
LOGGER.warning("Could not generate PUS TM object from raw data") LOGGER.warning("Could not generate PUS TM object from raw data")
LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}")
@ -128,7 +138,7 @@ class PusHandler(SpecificApidHandlerBase):
service = tm_packet.service service = tm_packet.service
dedicated_handler = False dedicated_handler = False
if service == 1: if service == 1:
tm_packet = Service1Tm.unpack(data=packet, params=UnpackParams(1, 2)) tm_packet = Service1Tm.unpack(data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2))
res = self.verif_wrapper.add_tm(tm_packet) res = self.verif_wrapper.add_tm(tm_packet)
if res is None: if res is None:
LOGGER.info( LOGGER.info(
@ -145,16 +155,16 @@ class PusHandler(SpecificApidHandlerBase):
if service == 3: if service == 3:
LOGGER.info("No handling for HK packets implemented") LOGGER.info("No handling for HK packets implemented")
LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]") LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]")
pus_tm = PusTelemetry.unpack(packet) pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty())
if pus_tm.subservice == 25: if pus_tm.subservice == 25:
if len(pus_tm.source_data) < 8: if len(pus_tm.source_data) < 8:
raise ValueError("No addressable ID in HK packet") raise ValueError("No addressable ID in HK packet")
json_str = pus_tm.source_data[8:] json_str = pus_tm.source_data[8:]
dedicated_handler = True dedicated_handler = True
if service == 5: if service == 5:
tm_packet = Service5Tm.unpack(packet) tm_packet = Service5Tm.unpack(packet, time_reader=CdsShortTimestamp.empty())
if service == 17: if service == 17:
tm_packet = Service17Tm.unpack(packet) tm_packet = Service17Tm.unpack(packet, time_reader=CdsShortTimestamp.empty())
dedicated_handler = True dedicated_handler = True
if tm_packet.subservice == 2: if tm_packet.subservice == 2:
self.printer.file_logger.info("Received Ping Reply TM[17,2]") self.printer.file_logger.info("Received Ping Reply TM[17,2]")
@ -205,7 +215,10 @@ class TcHandler(TcHandlerBase):
self.verif_wrapper = verif_wrapper self.verif_wrapper = verif_wrapper
self.queue_helper = DefaultPusQueueHelper( self.queue_helper = DefaultPusQueueHelper(
queue_wrapper=None, queue_wrapper=None,
tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE,
seq_cnt_provider=seq_count_provider, seq_cnt_provider=seq_count_provider,
pus_verificator=self.verif_wrapper.pus_verificator,
default_pus_apid=EXAMPLE_PUS_APID
) )
def send_cb(self, send_params: SendCbParams): def send_cb(self, send_params: SendCbParams):
@ -213,10 +226,6 @@ class TcHandler(TcHandlerBase):
if entry_helper.is_tc: if entry_helper.is_tc:
if entry_helper.entry_type == TcQueueEntryType.PUS_TC: if entry_helper.entry_type == TcQueueEntryType.PUS_TC:
pus_tc_wrapper = entry_helper.to_pus_tc_entry() pus_tc_wrapper = entry_helper.to_pus_tc_entry()
pus_tc_wrapper.pus_tc.seq_count = (
self.seq_count_provider.get_and_increment()
)
self.verif_wrapper.add_tc(pus_tc_wrapper.pus_tc)
raw_tc = pus_tc_wrapper.pus_tc.pack() raw_tc = pus_tc_wrapper.pus_tc.pack()
LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}") LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}")
send_params.com_if.send(raw_tc) send_params.com_if.send(raw_tc)
@ -247,6 +256,14 @@ class TcHandler(TcHandlerBase):
return q.add_pus_tc( return q.add_pus_tc(
PusTelecommand(service=17, subservice=1) PusTelecommand(service=17, subservice=1)
) )
if service == CoreServiceList.SERVICE_11:
q.add_log_cmd("Sending PUS scheduled TC telecommand")
crt_time = CdsShortTimestamp.from_now()
time_stamp = crt_time + datetime.timedelta(seconds=10)
time_stamp = time_stamp.pack()
return q.add_pus_tc(
create_time_tagged_cmd(time_stamp, PusTelecommand(service=17, subservice=1), apid=EXAMPLE_PUS_APID)
)
if service == CoreServiceList.SERVICE_3: if service == CoreServiceList.SERVICE_3:
if op_code in HkOpCodes.GENERATE_ONE_SHOT: if op_code in HkOpCodes.GENERATE_ONE_SHOT:
q.add_log_cmd("Sending HK one shot request") q.add_log_cmd("Sending HK one shot request")

View File

@ -1 +1,2 @@
tmtccmd == 3.0.0 # tmtccmd == 4.0.0a2
-e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd

View File

@ -287,8 +287,18 @@ impl PusReceiver {
} }
fn handle_scheduled_tc(&mut self, pus_tc: &PusTc, token: VerificationToken<TcStateAccepted>) { fn handle_scheduled_tc(&mut self, pus_tc: &PusTc, token: VerificationToken<TcStateAccepted>) {
self.update_time_stamp(); if pus_tc.user_data().is_none() {
self.update_time_stamp();
self.verif_reporter
.start_failure(
token,
FailParams::new(Some(&self.time_stamp), &tmtc_err::NOT_ENOUGH_APP_DATA, None),
)
.expect("Sending start failure TM failed");
return;
}
self.update_time_stamp();
match pus_tc.subservice() { match pus_tc.subservice() {
1 => { 1 => {
let start_token = self let start_token = self
@ -305,6 +315,7 @@ impl PusReceiver {
} else { } else {
panic!("Failed to enable scheduler"); panic!("Failed to enable scheduler");
} }
drop(scheduler);
} }
2 => { 2 => {
let start_token = self let start_token = self
@ -321,6 +332,7 @@ impl PusReceiver {
} else { } else {
panic!("Failed to disable scheduler"); panic!("Failed to disable scheduler");
} }
drop(scheduler);
} }
3 => { 3 => {
let start_token = self let start_token = self
@ -328,7 +340,6 @@ impl PusReceiver {
.start_success(token, Some(&self.time_stamp)) .start_success(token, Some(&self.time_stamp))
.expect("Error sending start success"); .expect("Error sending start success");
let mut scheduler = self.scheduler.borrow_mut();
let mut pool = self let mut pool = self
.tc_source .tc_source
.tc_store .tc_store
@ -336,9 +347,11 @@ impl PusReceiver {
.write() .write()
.expect("Locking pool failed"); .expect("Locking pool failed");
let mut scheduler = self.scheduler.borrow_mut();
scheduler scheduler
.reset(pool.as_mut()) .reset(pool.as_mut())
.expect("Error resetting TC Pool"); .expect("Error resetting TC Pool");
drop(scheduler);
self.verif_reporter self.verif_reporter
.completion_success(start_token, Some(&self.time_stamp)) .completion_success(start_token, Some(&self.time_stamp))
@ -350,19 +363,19 @@ impl PusReceiver {
.start_success(token, Some(&self.time_stamp)) .start_success(token, Some(&self.time_stamp))
.expect("Error sending start success"); .expect("Error sending start success");
let mut scheduler = self.scheduler.borrow_mut();
let mut pool = self let mut pool = self
.tc_source .tc_source
.tc_store .tc_store
.pool .pool
.write() .write()
.expect("Locking pool failed"); .expect("Locking pool failed");
let mut scheduler = self.scheduler.borrow_mut();
scheduler scheduler
.insert_wrapped_tc::<spacepackets::time::cds::TimeProvider>( .insert_wrapped_tc::<TimeProvider>(pus_tc, pool.as_mut())
pus_tc,
pool.as_mut(),
)
.expect("TODO: panic message"); .expect("TODO: panic message");
let time =
TimeProvider::from_bytes_with_u16_days(&pus_tc.user_data().unwrap()).unwrap();
drop(scheduler);
self.verif_reporter self.verif_reporter
.completion_success(start_token, Some(&self.time_stamp)) .completion_success(start_token, Some(&self.time_stamp))

View File

@ -195,11 +195,13 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) {
//let (mut tc_source, mut tc_receiver) = tc_args.split(); //let (mut tc_source, mut tc_receiver) = tc_args.split();
let mut tc_buf: [u8; 4096] = [0; 4096];
loop { loop {
let mut tmtc_sched = scheduler.clone(); let mut tmtc_sched = scheduler.clone();
core_tmtc_loop( core_tmtc_loop(
&mut udp_tmtc_server, &mut udp_tmtc_server,
&mut tc_args, &mut tc_args,
&mut tc_buf,
//&mut tc_source, //&mut tc_source,
lkoester marked this conversation as resolved Outdated

old code can be deleted

old code can be deleted
//&mut tc_receiver, //&mut tc_receiver,
&mut pus_receiver, &mut pus_receiver,
@ -212,6 +214,7 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) {
fn core_tmtc_loop( fn core_tmtc_loop(
udp_tmtc_server: &mut UdpTmtcServer, udp_tmtc_server: &mut UdpTmtcServer,
tc_args: &mut TcArgs, tc_args: &mut TcArgs,
tc_buf: &mut [u8],
//tc_source: &mut PusTcSource, //tc_source: &mut PusTcSource,
lkoester marked this conversation as resolved Outdated

unused code can be deleted

unused code can be deleted
//tc_receiver: &mut Receiver<StoreAddr>, //tc_receiver: &mut Receiver<StoreAddr>,
pus_receiver: &mut PusReceiver, pus_receiver: &mut PusReceiver,
@ -224,23 +227,40 @@ fn core_tmtc_loop(
} }
}; };
let mut pool = tc_args
.tc_source
.tc_store
.pool
.write()
.expect("error locking pool");
let mut scheduler = scheduler.borrow_mut(); let mut scheduler = scheduler.borrow_mut();
let mut pool = tc_args.tc_source.tc_store.pool.write().expect("error locking pool"); scheduler.update_time_from_now().unwrap();
scheduler match scheduler.release_telecommands(releaser, pool.as_mut()) {
.release_telecommands(releaser, pool.as_mut()) Ok(released_tcs) => {
.expect("error releasing tc"); if released_tcs > 0 {
println!("{} Tc(s) released from scheduler", released_tcs);
}
}
Err(_) => {}
}
//.expect("error releasing tc");
lkoester marked this conversation as resolved Outdated

can be deleted

can be deleted
drop(pool); drop(pool);
drop(scheduler);
while poll_tc_server(udp_tmtc_server) {} while poll_tc_server(udp_tmtc_server) {}
match tc_args.tc_receiver.try_recv() { match tc_args.tc_receiver.try_recv() {
Ok(addr) => { Ok(addr) => {
let pool = tc_args.tc_source let pool = tc_args
.tc_source
.tc_store .tc_store
.pool .pool
.read() .read()
.expect("locking tc pool failed"); .expect("locking tc pool failed");
let data = pool.read(&addr).expect("reading pool failed"); let data = pool.read(&addr).expect("reading pool failed");
match PusTc::from_bytes(data) { tc_buf[0..data.len()].copy_from_slice(data);
drop(pool);
match PusTc::from_bytes(tc_buf) {
Ok((pus_tc, _)) => { Ok((pus_tc, _)) => {
pus_receiver pus_receiver
.handle_pus_tc_packet(pus_tc.service(), pus_tc.sp_header(), &pus_tc) .handle_pus_tc_packet(pus_tc.service(), pus_tc.sp_header(), &pus_tc)
@ -248,7 +268,7 @@ fn core_tmtc_loop(
} }
Err(e) => { Err(e) => {
println!("error creating PUS TC from raw data: {e}"); println!("error creating PUS TC from raw data: {e}");
println!("raw data: {data:x?}"); println!("raw data: {tc_buf:x?}");
} }
} }
} }
@ -264,7 +284,6 @@ fn core_tmtc_loop(
} }
fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool {
match udp_tmtc_server.udp_tc_server.try_recv_tc() { match udp_tmtc_server.udp_tc_server.try_recv_tc() {
Ok(_) => true, Ok(_) => true,
Err(e) => match e { Err(e) => match e {