From bd35acead0ff57cf3842bc9a0ed5ec5ced823794 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 17 May 2021 15:23:54 +0200 Subject: [PATCH] update to new tmtccmd version --- tmtc/config/custom_mode_op.py | 2 -- tmtc/config/hook_base.py | 44 ++++++++++++--------------- tmtc/config/object_ids.py | 15 +++++---- tmtc/config/version.py | 2 +- tmtc/pus_tc/service_200_mode.py | 7 +++-- tmtc/pus_tc/service_20_parameters.py | 12 +++----- tmtc/pus_tc/service_2_raw_cmd.py | 6 ++-- tmtc/pus_tc/service_3_housekeeping.py | 8 ++--- tmtc/pus_tc/service_8_func_cmd.py | 7 +++-- tmtc/pus_tc/tc_packing.py | 21 +++++++------ tmtc/pus_tm/factory_hook.py | 2 +- tmtc/pus_tm/service_3_hk_handling.py | 29 ++++++++++-------- tmtc/pus_tm/service_8_handling.py | 8 ----- 13 files changed, 75 insertions(+), 88 deletions(-) diff --git a/tmtc/config/custom_mode_op.py b/tmtc/config/custom_mode_op.py index d033ceb..acd322c 100644 --- a/tmtc/config/custom_mode_op.py +++ b/tmtc/config/custom_mode_op.py @@ -15,7 +15,5 @@ def perform_mode_operation_user(tmtc_backend: TmTcHandler, mode: int): """ Custom modes can be implemented here """ - if tmtc_backend: - pass LOGGER.error(f"Unknown mode {mode}, Configuration error !") sys.exit() diff --git a/tmtc/config/hook_base.py b/tmtc/config/hook_base.py index c9f47b3..c668790 100644 --- a/tmtc/config/hook_base.py +++ b/tmtc/config/hook_base.py @@ -1,8 +1,8 @@ import argparse from typing import Dict, Union, Tuple -from tmtccmd.core.hook_base import TmTcHookBase -from tmtccmd.ecss.tc import PusTelecommand +from tmtccmd.config.definitions import ServiceOpCodeDictT +from tmtccmd.config.hook import TmTcHookBase from tmtccmd.pus_tm.service_3_base import Service3Base @@ -17,19 +17,26 @@ class FsfwHookBase(TmTcHookBase): from config.version import SW_NAME, SW_VERSION, SW_SUBVERSION, SW_SUBSUBVERSION return f"{SW_NAME} {SW_VERSION}.{SW_SUBVERSION}.{SW_SUBSUBVERSION}" + def get_json_config_file_path(self) -> str: + return "config/tmtc_config.json" + + def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT: + from tmtccmd.config.globals import get_default_service_op_code_dict + return get_default_service_op_code_dict() + def add_globals_pre_args_parsing(self, gui: bool = False): - from tmtccmd.defaults.globals_setup import set_default_globals_pre_args_parsing + from tmtccmd.config.globals import set_default_globals_pre_args_parsing set_default_globals_pre_args_parsing(gui=gui, apid=0xef) def add_globals_post_args_parsing(self, args: argparse.Namespace): - from tmtccmd.defaults.globals_setup import set_default_globals_post_args_parsing - set_default_globals_post_args_parsing(args=args, json_cfg_path=self.set_json_config_file_path()) + from tmtccmd.config.globals import set_default_globals_post_args_parsing + set_default_globals_post_args_parsing(args=args, json_cfg_path=self.get_json_config_file_path()) - def assign_communication_interface(self, com_if: int, tmtc_printer: TmTcPrinter) -> \ + def assign_communication_interface(self, com_if_key: str, tmtc_printer: TmTcPrinter) -> \ Union[CommunicationInterface, None]: - from tmtccmd.defaults.com_setup import create_communication_interface_default + from tmtccmd.config.com_if import create_communication_interface_default return create_communication_interface_default( - com_if=com_if, tmtc_printer=tmtc_printer, json_cfg_path=self.set_json_config_file_path() + com_if_key=com_if_key, tmtc_printer=tmtc_printer, json_cfg_path=self.get_json_config_file_path() ) def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int): @@ -39,17 +46,13 @@ class FsfwHookBase(TmTcHookBase): from pus_tc.tc_packing import pack_service_queue_user pack_service_queue_user(service=service, op_code=op_code, service_queue=service_queue) - def pack_total_service_queue(self) -> Union[None, TcQueueT]: - from pus_tc.tc_packing import create_total_tc_queue_user - return create_total_tc_queue_user() - def tm_user_factory_hook(self, raw_tm_packet: bytearray) -> Union[None, PusTelemetry]: from pus_tm.factory_hook import tm_user_factory_hook return tm_user_factory_hook(raw_tm_packet=raw_tm_packet) - def set_object_ids(self) -> Dict[bytes, list]: - from config.object_ids import set_object_ids - return set_object_ids() + def get_object_ids(self) -> Dict[bytes, list]: + from config.object_ids import get_object_ids + return get_object_ids() @staticmethod def handle_service_8_telemetry( @@ -62,18 +65,9 @@ class FsfwHookBase(TmTcHookBase): @staticmethod def handle_service_3_housekeeping( - object_id: bytearray, set_id: int, hk_data: bytearray, service3_packet: Service3Base + object_id: int, set_id: int, hk_data: bytearray, service3_packet: Service3Base ) -> Tuple[list, list, bytearray, int]: from pus_tm.service_3_hk_handling import service_3_hk_handling return service_3_hk_handling( object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet ) - - def command_preparation_hook(self) -> Union[None, PusTelecommand]: - pass - - def set_json_config_file_path(self) -> str: - return "config/tmtc_config.json" - - - diff --git a/tmtc/config/object_ids.py b/tmtc/config/object_ids.py index 752f9a8..cbea7e3 100644 --- a/tmtc/config/object_ids.py +++ b/tmtc/config/object_ids.py @@ -3,18 +3,17 @@ @details Template configuration file. Copy this folder to the TMTC commander root and adapt it to your needs. """ - from typing import Dict -SERVICE_17_OBJ_ID = bytes([0x53, 0x00, 0x00, 0x17]) -TEST_DEVICE_0_OBJ_ID = bytes([0x44, 0x01, 0xAF, 0xFE]) -TEST_DEVICE_1_OBJ_ID = bytes([0x44, 0x02, 0xAF, 0xFE]) +PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17]) +TEST_DEVICE_0_ID = bytes([0x44, 0x01, 0xAF, 0xFE]) +TEST_DEVICE_1_ID = bytes([0x44, 0x02, 0xAF, 0xFE]) -def set_object_ids() -> Dict[bytes, list]: +def get_object_ids() -> Dict[bytes, list]: object_id_dict = { - SERVICE_17_OBJ_ID: ["Service 17"], - TEST_DEVICE_0_OBJ_ID: ["Test Device 0"], - TEST_DEVICE_1_OBJ_ID: ["Test Device 1"], + PUS_SERVICE_17_ID: ["PUS Service 17"], + TEST_DEVICE_0_ID: ["Test Device 0"], + TEST_DEVICE_1_ID: ["Test Device 1"] } return object_id_dict diff --git a/tmtc/config/version.py b/tmtc/config/version.py index 8423d84..2c9e6a8 100644 --- a/tmtc/config/version.py +++ b/tmtc/config/version.py @@ -1,4 +1,4 @@ SW_NAME = "fsfw-tmtc" SW_VERSION = 1 -SW_SUBVERSION = 1 +SW_SUBVERSION = 2 SW_SUBSUBVERSION = 0 diff --git a/tmtc/pus_tc/service_200_mode.py b/tmtc/pus_tc/service_200_mode.py index c83f692..11a8363 100644 --- a/tmtc/pus_tc/service_200_mode.py +++ b/tmtc/pus_tc/service_200_mode.py @@ -5,12 +5,13 @@ @author R. Mueller @date 02.05.2020 """ -from config.object_ids import TEST_DEVICE_0_OBJ_ID, TEST_DEVICE_1_OBJ_ID -from tmtccmd.core.definitions import QueueCommands +from tmtccmd.config.definitions import QueueCommands from tmtccmd.ecss.tc import PusTelecommand from tmtccmd.pus_tc.packer import TcQueueT from tmtccmd.pus_tc.service_200_mode import pack_mode_data +from config.object_ids import TEST_DEVICE_0_ID + def pack_service_200_commands_into(tc_queue: TcQueueT, op_code: str): if op_code == "0": @@ -21,7 +22,7 @@ def pack_service_200_test_into(init_ssc: int, tc_queue: TcQueueT) -> int: new_ssc = init_ssc tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200")) # Object ID: DUMMY Device - object_id = TEST_DEVICE_0_OBJ_ID + object_id = TEST_DEVICE_0_ID # Set On Mode tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On")) mode_data = pack_mode_data(object_id, 1, 0) diff --git a/tmtc/pus_tc/service_20_parameters.py b/tmtc/pus_tc/service_20_parameters.py index 241b501..28116b1 100644 --- a/tmtc/pus_tc/service_20_parameters.py +++ b/tmtc/pus_tc/service_20_parameters.py @@ -1,18 +1,16 @@ import struct -from tmtccmd.core.definitions import QueueCommands +from tmtccmd.config.definitions import QueueCommands from tmtccmd.ecss.tc import PusTelecommand from tmtccmd.pus_tc.definitions import TcQueueT from tmtccmd.pus_tc.service_20_parameter import pack_type_and_matrix_data, \ pack_parameter_id from tmtccmd.pus_tc.service_200_mode import pack_mode_data -from tmtccmd.utility.tmtcc_logger import get_logger +from tmtccmd.utility.logger import get_logger - -from config.object_ids import TEST_DEVICE_0_OBJ_ID +from config.object_ids import TEST_DEVICE_0_ID LOGGER = get_logger() -TEST_DEVICE_0_ID = bytearray() def pack_service20_commands_into(tc_queue: TcQueueT, op_code: str): @@ -23,7 +21,7 @@ def pack_service20_commands_into(tc_queue: TcQueueT, op_code: str): def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False): if called_externally is False: tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20")) - object_id = TEST_DEVICE_0_OBJ_ID + object_id = TEST_DEVICE_0_ID # set mode normal tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Set Normal Mode")) @@ -40,7 +38,7 @@ def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False def load_param_0_simple_test_commands(tc_queue: TcQueueT): - object_id = TEST_DEVICE_0_OBJ_ID + object_id = TEST_DEVICE_0_ID parameter_id_0 = pack_parameter_id(domain_id=0, unique_id=0, linear_index=0) # test checking Load for uint32_t tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load uint32_t")) diff --git a/tmtc/pus_tc/service_2_raw_cmd.py b/tmtc/pus_tc/service_2_raw_cmd.py index 9fe2d1a..f26f33e 100644 --- a/tmtc/pus_tc/service_2_raw_cmd.py +++ b/tmtc/pus_tc/service_2_raw_cmd.py @@ -7,13 +7,13 @@ """ import struct -from tmtccmd.core.definitions import QueueCommands +from tmtccmd.config.definitions import QueueCommands from tmtccmd.ecss.tc import PusTelecommand from tmtccmd.pus_tc.definitions import TcQueueT from pus_tc.service_200_mode import pack_mode_data import pus_tc.command_data as cmd_data -from config.object_ids import TEST_DEVICE_0_OBJ_ID, TEST_DEVICE_1_OBJ_ID +from config.object_ids import TEST_DEVICE_0_ID def pack_service_2_commands_into(tc_queue: TcQueueT, op_code: str): @@ -25,7 +25,7 @@ def pack_service_2_commands_into(tc_queue: TcQueueT, op_code: str): def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int: new_ssc = init_ssc - object_id = TEST_DEVICE_0_OBJ_ID # dummy device + object_id = TEST_DEVICE_0_ID # dummy device # Set Raw Mode tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Raw Mode")) mode_data = pack_mode_data(object_id, 3, 0) diff --git a/tmtc/pus_tc/service_3_housekeeping.py b/tmtc/pus_tc/service_3_housekeeping.py index 36b9bb1..a5b4f94 100644 --- a/tmtc/pus_tc/service_3_housekeeping.py +++ b/tmtc/pus_tc/service_3_housekeeping.py @@ -1,4 +1,4 @@ -from tmtccmd.core.definitions import QueueCommands +from tmtccmd.config.definitions import QueueCommands from tmtccmd.pus_tc.service_200_mode import pack_mode_data from tmtccmd.pus_tc.service_20_parameter import pack_boolean_parameter_command from tmtccmd.pus_tc.service_3_housekeeping import make_sid, generate_one_hk_command, \ @@ -7,8 +7,8 @@ from tmtccmd.ecss.tc import PusTelecommand from tmtccmd.pus_tc.definitions import TcQueueT from tmtccmd.pus_tc.service_8_functional_cmd import generate_action_command +from config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID -from config.object_ids import TEST_DEVICE_1_OBJ_ID, TEST_DEVICE_0_OBJ_ID # Set IDs TEST_SET_ID = 0 @@ -25,9 +25,9 @@ def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str): # TODO: Import this from config instead device_idx = 0 if device_idx == 0: - object_id = TEST_DEVICE_0_OBJ_ID + object_id = TEST_DEVICE_0_ID else: - object_id = TEST_DEVICE_1_OBJ_ID + object_id = TEST_DEVICE_1_ID if op_code == "0": # This will pack all the tests diff --git a/tmtc/pus_tc/service_8_func_cmd.py b/tmtc/pus_tc/service_8_func_cmd.py index 24f3317..a53d44b 100644 --- a/tmtc/pus_tc/service_8_func_cmd.py +++ b/tmtc/pus_tc/service_8_func_cmd.py @@ -1,10 +1,11 @@ -from tmtccmd.core.definitions import QueueCommands +from tmtccmd.config.definitions import QueueCommands from tmtccmd.ecss.tc import PusTelecommand from tmtccmd.pus_tc.definitions import TcQueueT import pus_tc.command_data as cmd_data from pus_tc.service_200_mode import pack_mode_data -from config.object_ids import TEST_DEVICE_1_OBJ_ID, TEST_DEVICE_0_OBJ_ID + +from config.object_ids import TEST_DEVICE_0_ID def pack_service_8_commands_into(tc_queue: TcQueueT, op_code: str): @@ -16,7 +17,7 @@ def pack_service_8_commands_into(tc_queue: TcQueueT, op_code: str): def pack_generic_service_8_test_into(tc_queue: TcQueueT): tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8")) - object_id = TEST_DEVICE_0_OBJ_ID + object_id = TEST_DEVICE_0_ID # set mode on tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set On Mode")) diff --git a/tmtc/pus_tc/tc_packing.py b/tmtc/pus_tc/tc_packing.py index 389a182..757c58f 100644 --- a/tmtc/pus_tc/tc_packing.py +++ b/tmtc/pus_tc/tc_packing.py @@ -6,14 +6,15 @@ import os from collections import deque +from typing import Union from pus_tc.service_20_parameters import pack_service20_commands_into from pus_tc.service_2_raw_cmd import pack_service_2_commands_into from pus_tc.service_3_housekeeping import pack_service_3_commands_into from pus_tc.service_8_func_cmd import pack_service_8_commands_into -from tmtccmd.utility.tmtcc_logger import get_logger +from tmtccmd.utility.logger import get_logger from tmtccmd.pus_tc.definitions import TcQueueT -from tmtccmd.core.definitions import CoreServiceList +from tmtccmd.config.definitions import CoreServiceList from tmtccmd.pus_tc.service_5_event import pack_generic_service5_test_into from tmtccmd.pus_tc.service_17_test import pack_generic_service17_test from pus_tc.service_200_mode import pack_service_200_commands_into @@ -21,20 +22,20 @@ from pus_tc.service_200_mode import pack_service_200_commands_into LOGGER = get_logger() -def pack_service_queue_user(service: int, op_code: str, service_queue: TcQueueT): - if service == CoreServiceList.SERVICE_2: +def pack_service_queue_user(service: Union[str, int], op_code: str, service_queue: TcQueueT): + if service == CoreServiceList.SERVICE_2.value: return pack_service_2_commands_into(op_code=op_code, tc_queue=service_queue) - if service == CoreServiceList.SERVICE_3: + if service == CoreServiceList.SERVICE_3.value: return pack_service_3_commands_into(op_code=op_code, tc_queue=service_queue) - if service == CoreServiceList.SERVICE_5: + if service == CoreServiceList.SERVICE_5.value: return pack_generic_service5_test_into(tc_queue=service_queue) - if service == CoreServiceList.SERVICE_8: + if service == CoreServiceList.SERVICE_8.value: return pack_service_8_commands_into(op_code=op_code, tc_queue=service_queue) - if service == CoreServiceList.SERVICE_17: + if service == CoreServiceList.SERVICE_17.value: return pack_generic_service17_test(init_ssc=1700, tc_queue=service_queue) - if service == CoreServiceList.SERVICE_20: + if service == CoreServiceList.SERVICE_20.value: return pack_service20_commands_into(tc_queue=service_queue, op_code=op_code) - if service == CoreServiceList.SERVICE_200: + if service == CoreServiceList.SERVICE_200.value: return pack_service_200_commands_into(tc_queue=service_queue, op_code=op_code) LOGGER.warning("Invalid Service !") diff --git a/tmtc/pus_tm/factory_hook.py b/tmtc/pus_tm/factory_hook.py index 31f196f..8e5115a 100644 --- a/tmtc/pus_tm/factory_hook.py +++ b/tmtc/pus_tm/factory_hook.py @@ -5,7 +5,7 @@ """ from tmtccmd.ecss.tm import PusTelemetry -from tmtccmd.utility.tmtcc_logger import get_logger +from tmtccmd.utility.logger import get_logger from tmtccmd.pus_tm.service_1_verification import Service1TM from tmtccmd.pus_tm.service_2_raw_cmd import Service2TM from tmtccmd.pus_tm.service_3_housekeeping import Service3TM diff --git a/tmtc/pus_tm/service_3_hk_handling.py b/tmtc/pus_tm/service_3_hk_handling.py index 33aa483..81ad9d8 100644 --- a/tmtc/pus_tm/service_3_hk_handling.py +++ b/tmtc/pus_tm/service_3_hk_handling.py @@ -6,34 +6,37 @@ """ import struct from typing import Tuple +from config.object_ids import ObjectIds from tmtccmd.pus_tm.service_3_housekeeping import Service3Base from tmtccmd.utility.tmtcc_logger import get_logger -from config.object_ids import TEST_DEVICE_0_OBJ_ID, TEST_DEVICE_1_OBJ_ID LOGGER = get_logger() def service_3_hk_handling( - object_id: bytearray, set_id: int, hk_data: bytearray, service3_packet: Service3Base + object_id: int, set_id: int, hk_data: bytearray, service3_packet: Service3Base ) -> Tuple[list, list, bytearray, int]: """ This function is called when a Service 3 Housekeeping packet is received. - :param object_id: - :param set_id: - :param hk_data: - :param service3_packet: - :return: Expects a tuple, consisting of two lists, a bytearray and an integer + Please note that the object IDs should be compared by value because direct comparison of + enumerations does not work in Python. For example use: + + if object_id.value == ObjectIds.TEST_OBJECT.value + + to test equality based on the object ID list. + + @param object_id: + @param set_id: + @param hk_data: + @param service3_packet: + @return: Expects a tuple, consisting of two lists, a bytearray and an integer The first list contains the header columns, the second list the list with the corresponding values. The bytearray is the validity buffer, which is usually appended at the end of the housekeeping packet. The last value is the number of parameters. """ - if set_id: - pass - if service3_packet: - pass - if object_id == TEST_DEVICE_0_OBJ_ID or \ - object_id == TEST_DEVICE_1_OBJ_ID: + if object_id == ObjectIds.TEST_DEVICE_0.value or \ + object_id == ObjectIds.TEST_DEVICE_1.value: return handle_test_set_deserialization(hk_data=hk_data) else: LOGGER.info("Service3TM: Parsing for this SID has not been implemented.") diff --git a/tmtc/pus_tm/service_8_handling.py b/tmtc/pus_tm/service_8_handling.py index cfe04cb..259e7e4 100644 --- a/tmtc/pus_tm/service_8_handling.py +++ b/tmtc/pus_tm/service_8_handling.py @@ -14,14 +14,6 @@ def custom_service_8_handling( @param custom_data: @return: """ - if object_id: - pass - - if action_id: - pass - - if custom_data: - pass header_list = [] content_list = [] return header_list, content_list