From 6de31ccd3f098c6beb5d2805c1149dd335d88bf2 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 28 Jul 2022 16:37:01 +0200 Subject: [PATCH] update for new default pus queue helper --- common.py | 61 ++++++++++++++------------------ pus_tc/pus_11_tc_sched.py | 6 ++-- pus_tc/pus_17_test.py | 4 +-- pus_tc/pus_200_mode.py | 7 ++-- pus_tc/service_20_parameters.py | 15 ++++---- pus_tc/service_2_raw_cmd.py | 6 ++-- pus_tc/service_3_housekeeping.py | 19 ++++++---- pus_tc/service_8_func_cmd.py | 6 ++-- 8 files changed, 62 insertions(+), 62 deletions(-) diff --git a/common.py b/common.py index d249708..8d44be2 100644 --- a/common.py +++ b/common.py @@ -4,6 +4,7 @@ import sys from tmtccmd.com_if import ComInterface from tmtccmd.logging import get_current_time_string from tmtccmd.pus.pus_11_tc_sched import Subservices as Pus11Subservices +from tmtccmd.tc.queue import DefaultPusQueueHelper from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter @@ -21,7 +22,7 @@ except ImportError as error: print("Python tmtccmd module could not be imported. Make sure it is installed") sys.exit(1) -from spacepackets.ecss import PusVerificator, PusTelecommand +from spacepackets.ecss import PusVerificator, PusTelecommand, PusServices from common_tmtc.pus_tc.pus_11_tc_sched import pack_service_11_commands from common_tmtc.pus_tc.pus_17_test import pack_service_17_commands @@ -76,6 +77,7 @@ class PusHandler(SpecificApidHandlerBase): class TcHandler(TcHandlerBase): def __init__( self, + apid: int, seq_count_provider: FileSeqCountProvider, pus_verificator: PusVerificator, file_logger: logging.Logger, @@ -86,37 +88,44 @@ class TcHandler(TcHandlerBase): self.pus_verificator = pus_verificator self.file_logger = file_logger self.raw_logger = raw_logger + self.queue_helper = DefaultPusQueueHelper( + queue_wrapper=None, + apid=apid, + seq_cnt_provider=seq_count_provider, + pus_verificator=pus_verificator, + ) def feed_cb(self, info: ProcedureHelper, wrapper: FeedWrapper): + self.queue_helper.queue_wrapper = wrapper.queue_wrapper if info.proc_type == TcProcedureType.DEFAULT: def_proc = info.to_def_procedure() service = def_proc.service op_code = def_proc.op_code if service == CoreServiceList.SERVICE_2.value: return pack_service_2_commands_into( - op_code=op_code, q=wrapper.queue_helper + op_code=op_code, q=self.queue_helper ) if service == CoreServiceList.SERVICE_3.value: return pack_service_3_commands_into( - op_code=op_code, q=wrapper.queue_helper + op_code=op_code, q=self.queue_helper ) if service == CoreServiceList.SERVICE_5.value: - return pack_generic_service_5_test_into(q=wrapper.queue_helper) + return pack_generic_service_5_test_into(q=self.queue_helper) if service == CoreServiceList.SERVICE_8.value: return pack_service_8_commands_into( - op_code=op_code, q=wrapper.queue_helper + op_code=op_code, q=self.queue_helper ) if service == CoreServiceList.SERVICE_11.value: - return pack_service_11_commands(op_code=op_code, q=wrapper.queue_helper) + return pack_service_11_commands(op_code=op_code, q=self.queue_helper) if service == CoreServiceList.SERVICE_17.value: - return pack_service_17_commands(op_code=op_code, q=wrapper.queue_helper) + return pack_service_17_commands(op_code=op_code, q=self.queue_helper) if service == CoreServiceList.SERVICE_20.value: return pack_service20_commands_into( - q=wrapper.queue_helper, op_code=op_code + q=self.queue_helper, op_code=op_code ) if service == CoreServiceList.SERVICE_200.value: return pack_service_200_commands_into( - q=wrapper.queue_helper, op_code=op_code + q=self.queue_helper, op_code=op_code ) LOGGER.warning("Invalid Service !") @@ -131,36 +140,19 @@ class TcHandler(TcHandlerBase): def handle_tc_send_cb(self, params: SendCbParams): pus_tc_wrapper = params.entry.to_pus_tc_entry() - # TODO: All of this stuff should be done during queue insertion time - # This requires the queue helper to be optionally able to perform TC - # post-processing via a callback or something similar. Then an API can be - # added which is also able to pack time tagged TCs and stamping both TCs - pus_tc_wrapper.pus_tc.seq_count = ( - self.seq_count_provider.get_and_increment() - ) - pus_tc_wrapper.pus_tc.apid = EXAMPLE_APID if ( - pus_tc_wrapper.pus_tc.service == 11 - and pus_tc_wrapper.pus_tc.subservice == Pus11Subservices.TC_INSERT + pus_tc_wrapper.pus_tc.service == PusServices.S11_TC_SCHED + and pus_tc_wrapper.pus_tc.subservice == Pus11Subservices.TC_INSERT ): - try: - pus_tc = PusTelecommand.unpack( - pus_tc_wrapper.pus_tc.app_data[4:] - ) - self.pus_verificator.add_tc(pus_tc) - except ValueError as e: - LOGGER.warning( - f"Attempt of unpacking time tagged TC failed with exception {e}" - ) - # Add TC after Sequence Count stamping - self.pus_verificator.add_tc(pus_tc_wrapper.pus_tc) + wrapped_tc = PusTelecommand.unpack(pus_tc_wrapper.pus_tc.app_data[4:]) + tc_info_string = f"Sending time-tagged command {wrapped_tc}" + LOGGER.info(tc_info_string) + self.file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}") raw_tc = pus_tc_wrapper.pus_tc.pack() self.raw_logger.log_tc(pus_tc_wrapper.pus_tc) - tc_info_string = f"Sent {pus_tc_wrapper.pus_tc}" + tc_info_string = f"Sending {pus_tc_wrapper.pus_tc}" LOGGER.info(tc_info_string) - self.file_logger.info( - f"{get_current_time_string(True)}: {tc_info_string}" - ) + self.file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}") params.com_if.send(raw_tc) def queue_finished_cb(self, info: ProcedureHelper): @@ -196,6 +188,7 @@ def setup_tmtc_handlers( ccsds_handler = CcsdsTmHandler(None) ccsds_handler.add_apid_handler(pus_handler) tc_handler = TcHandler( + apid=EXAMPLE_APID, file_logger=printer.file_logger, raw_logger=raw_logger, pus_verificator=verif_wrapper.pus_verificator, diff --git a/pus_tc/pus_11_tc_sched.py b/pus_tc/pus_11_tc_sched.py index 3fa31ac..7a938ab 100644 --- a/pus_tc/pus_11_tc_sched.py +++ b/pus_tc/pus_11_tc_sched.py @@ -8,13 +8,13 @@ from tmtccmd.config import TmTcDefWrapper from tmtccmd.config.tmtc import OpCodeEntry from tmtccmd.pus.pus_17_test import pack_service_17_ping_command from tmtccmd.pus.pus_11_tc_sched import TypeOfTimeWindow, Subservices, TcSchedReqId -from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_11_tc_sched import ( generate_enable_tc_sched_cmd, generate_disable_tc_sched_cmd, generate_reset_tc_sched_cmd, pack_time_tagged_tc_app_data, ) +from tmtccmd.tc.queue import DefaultPusQueueHelper class OpCodes: @@ -49,7 +49,7 @@ def add_tc_sched_cmds(cmd_dict: TmTcDefWrapper): ) -def __generic_pack_three_time_tagged_cmds(q: QueueHelper): +def __generic_pack_three_time_tagged_cmds(q: DefaultPusQueueHelper): q.add_log_cmd("Testing Time-Tagged Command insertion") q.add_pus_tc(generate_enable_tc_sched_cmd()) current_time = int(round(time.time())) @@ -74,7 +74,7 @@ def __generic_pack_three_time_tagged_cmds(q: QueueHelper): q.add_wait(timedelta(seconds=25.0)) -def pack_service_11_commands(op_code: str, q: QueueHelper): +def pack_service_11_commands(op_code: str, q: DefaultPusQueueHelper): if op_code in OpCodes.TEST_INSERT: q.add_log_cmd("Testing Time-Tagged Command deletion") __generic_pack_three_time_tagged_cmds(q=q) diff --git a/pus_tc/pus_17_test.py b/pus_tc/pus_17_test.py index 2ea63ae..8b803d8 100644 --- a/pus_tc/pus_17_test.py +++ b/pus_tc/pus_17_test.py @@ -2,10 +2,10 @@ from tmtccmd.pus.pus_17_test import ( pack_service_17_ping_command, pack_generic_service17_test, ) -from tmtccmd.tc import QueueHelper +from tmtccmd.tc.queue import DefaultPusQueueHelper -def pack_service_17_commands(op_code: str, q: QueueHelper): +def pack_service_17_commands(op_code: str, q: DefaultPusQueueHelper): if op_code == "0": q.add_pus_tc(pack_service_17_ping_command()) else: diff --git a/pus_tc/pus_200_mode.py b/pus_tc/pus_200_mode.py index 89e243e..a8cc82b 100644 --- a/pus_tc/pus_200_mode.py +++ b/pus_tc/pus_200_mode.py @@ -1,13 +1,14 @@ # -*- coding: utf-8 -*- from spacepackets.ecss.tc import PusTelecommand -from tmtccmd.tc import QueueHelper +from tmtccmd.tc import QueueHelperBase from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, ASSEMBLY_ID +from tmtccmd.tc.queue import DefaultPusQueueHelper -def pack_service_200_commands_into(q: QueueHelper, op_code: str): +def pack_service_200_commands_into(q: DefaultPusQueueHelper, op_code: str): if op_code == "test": pack_service_200_test_into(q) elif op_code == "asm_to_normal" or op_code == "0": @@ -18,7 +19,7 @@ def pack_service_200_commands_into(q: QueueHelper, op_code: str): q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) -def pack_service_200_test_into(q: QueueHelper): +def pack_service_200_test_into(q: DefaultPusQueueHelper): q.add_log_cmd("Testing Service 200") # Object ID: DUMMY Device object_id = TEST_DEVICE_0_ID diff --git a/pus_tc/service_20_parameters.py b/pus_tc/service_20_parameters.py index c1c3381..55f25ee 100644 --- a/pus_tc/service_20_parameters.py +++ b/pus_tc/service_20_parameters.py @@ -4,12 +4,13 @@ from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss import PusServices from tmtccmd.config import TmTcDefWrapper, OpCodeEntry -from tmtccmd.tc import QueueHelper +from tmtccmd.tc import QueueHelperBase from tmtccmd.tc.pus_20_params import pack_type_and_matrix_data, pack_parameter_id from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.logging import get_console_logger from common_tmtc.config.object_ids import TEST_DEVICE_0_ID +from tmtccmd.tc.queue import DefaultPusQueueHelper LOGGER = get_console_logger() @@ -20,16 +21,16 @@ def add_param_cmds(defs: TmTcDefWrapper): defs.add_service( name=str(PusServices.S20_PARAMETER.value), info="PUS Service 20 Parameters", - op_code_entry=op_code_entry + op_code_entry=op_code_entry, ) -def pack_service20_commands_into(q: QueueHelper, op_code: str): +def pack_service20_commands_into(q: DefaultPusQueueHelper, op_code: str): if op_code == "0": pack_service20_test_into(q=q) -def pack_service20_test_into(q: QueueHelper, called_externally: bool = False): +def pack_service20_test_into(q: DefaultPusQueueHelper, called_externally: bool = False): if called_externally is False: q.add_log_cmd("Testing Service 20") object_id = TEST_DEVICE_0_ID @@ -48,7 +49,7 @@ def pack_service20_test_into(q: QueueHelper, called_externally: bool = False): load_param_2_simple_test_commands(q) -def load_param_0_simple_test_commands(q: QueueHelper): +def load_param_0_simple_test_commands(q: DefaultPusQueueHelper): object_id = TEST_DEVICE_0_ID parameter_id_0 = pack_parameter_id(domain_id=0, unique_id=0, linear_index=0) # test checking Load for uint32_t @@ -64,7 +65,7 @@ def load_param_0_simple_test_commands(q: QueueHelper): q.add_pus_tc(PusTelecommand(service=20, subservice=129, app_data=payload)) -def load_param_1_simple_test_commands(q: QueueHelper): +def load_param_1_simple_test_commands(q: DefaultPusQueueHelper): object_id = TEST_DEVICE_0_ID parameter_id_1 = pack_parameter_id(domain_id=0, unique_id=1, linear_index=0) # test checking Load for int32_t @@ -80,7 +81,7 @@ def load_param_1_simple_test_commands(q: QueueHelper): q.add_pus_tc(PusTelecommand(service=20, subservice=129, app_data=payload)) -def load_param_2_simple_test_commands(q: QueueHelper): +def load_param_2_simple_test_commands(q: DefaultPusQueueHelper): object_id = TEST_DEVICE_0_ID parameter_id_2 = pack_parameter_id(domain_id=0, unique_id=2, linear_index=0) # test checking Load for float diff --git a/pus_tc/service_2_raw_cmd.py b/pus_tc/service_2_raw_cmd.py index d779291..7288287 100644 --- a/pus_tc/service_2_raw_cmd.py +++ b/pus_tc/service_2_raw_cmd.py @@ -9,21 +9,21 @@ import struct from spacepackets.ecss.tc import PusTelecommand -from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data from common_tmtc.pus_tc import command_data as cmd_data from common_tmtc.config.object_ids import TEST_DEVICE_0_ID +from tmtccmd.tc.queue import DefaultPusQueueHelper -def pack_service_2_commands_into(q: QueueHelper, op_code: str): +def pack_service_2_commands_into(q: DefaultPusQueueHelper, op_code: str): if op_code == "0": pack_generic_service_2_test_into(0, q) else: print(f"pack_service_2_test: Operation code {op_code} unknown!") -def pack_generic_service_2_test_into(init_ssc: int, q: QueueHelper) -> int: +def pack_generic_service_2_test_into(init_ssc: int, q: DefaultPusQueueHelper) -> int: new_ssc = init_ssc object_id = TEST_DEVICE_0_ID # dummy device # Set Raw Mode diff --git a/pus_tc/service_3_housekeeping.py b/pus_tc/service_3_housekeeping.py index 2959849..cbe9beb 100644 --- a/pus_tc/service_3_housekeeping.py +++ b/pus_tc/service_3_housekeeping.py @@ -4,7 +4,6 @@ from spacepackets.ecss.tc import PusTelecommand from deps.spacepackets.spacepackets.ecss import PusServices from tmtccmd.config import TmTcDefWrapper, OpCodeEntry -from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.tc.pus_20_params import ( pack_boolean_parameter_app_data, @@ -18,6 +17,8 @@ from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID # Set IDs +from tmtccmd.tc.queue import DefaultPusQueueHelper + TEST_SET_ID = 0 # Action IDs @@ -33,11 +34,11 @@ def add_hk_cmds(defs: TmTcDefWrapper): defs.add_service( name=str(PusServices.S3_HOUSEKEEPING.value), info="PUS Service 3 Housekeeping", - op_code_entry=op_code_entry + op_code_entry=op_code_entry, ) -def pack_service_3_commands_into(q: QueueHelper, op_code: str): +def pack_service_3_commands_into(q: DefaultPusQueueHelper, op_code: str): current_ssc = 3000 device_idx = 0 if device_idx == 0: @@ -67,7 +68,9 @@ def pack_service_3_commands_into(q: QueueHelper, op_code: str): pack_notification_basic_test(q=q, object_id=object_id) -def pack_service_3_test_info(q: QueueHelper, device_idx: int, object_id: bytearray): +def pack_service_3_test_info( + q: DefaultPusQueueHelper, device_idx: int, object_id: bytearray +): q.add_log_cmd("Service 3 (Housekeeping Service): All tests") pack_gen_one_hk_command(q=q, device_idx=device_idx, object_id=object_id) pack_housekeeping_basic_test(q=q, object_id=object_id) @@ -78,7 +81,9 @@ def pack_service_3_test_info(q: QueueHelper, device_idx: int, object_id: bytearr ) -def pack_gen_one_hk_command(q: QueueHelper, device_idx: int, object_id: bytearray): +def pack_gen_one_hk_command( + q: DefaultPusQueueHelper, device_idx: int, object_id: bytearray +): test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID) q.add_log_cmd( f"Service 3 Test: Generate one test set packet for test device {device_idx}" @@ -87,7 +92,7 @@ def pack_gen_one_hk_command(q: QueueHelper, device_idx: int, object_id: bytearra def pack_housekeeping_basic_test( - q: QueueHelper, + q: DefaultPusQueueHelper, object_id: bytearray, enable_normal_mode: bool = True, ): @@ -147,7 +152,7 @@ def pack_housekeeping_basic_test( def pack_notification_basic_test( - q: QueueHelper, + q: DefaultPusQueueHelper, object_id: bytearray, enable_normal_mode: bool = True, ): diff --git a/pus_tc/service_8_func_cmd.py b/pus_tc/service_8_func_cmd.py index 1a33699..fd01124 100644 --- a/pus_tc/service_8_func_cmd.py +++ b/pus_tc/service_8_func_cmd.py @@ -1,21 +1,21 @@ from spacepackets.ecss.tc import PusTelecommand -from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes import common_tmtc.pus_tc.command_data as cmd_data from common_tmtc.config.object_ids import TEST_DEVICE_0_ID +from tmtccmd.tc.queue import DefaultPusQueueHelper -def pack_service_8_commands_into(q: QueueHelper, op_code: str): +def pack_service_8_commands_into(q: DefaultPusQueueHelper, op_code: str): if op_code == "0": pack_generic_service_8_test_into(q=q) else: print(f"pack_service_8_test: Operation code {op_code} unknown!") -def pack_generic_service_8_test_into(q: QueueHelper): +def pack_generic_service_8_test_into(q: DefaultPusQueueHelper): q.add_log_cmd("Testing Service 8") object_id = TEST_DEVICE_0_ID