From f088219c34821763f9f7bd1d815290978372954f Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 14 Jul 2021 00:35:15 +0200 Subject: [PATCH] deleting common tmtc to readd as submodule --- tmtc/common_tmtc/__init__.py | 0 tmtc/common_tmtc/config/__init__.py | 0 tmtc/common_tmtc/config/custom_definitions.py | 24 --- tmtc/common_tmtc/config/custom_mode_op.py | 19 --- tmtc/common_tmtc/config/definitions.py | 1 - .../common_tmtc/config/hook_implementation.py | 77 --------- tmtc/common_tmtc/config/object_ids.py | 19 --- tmtc/common_tmtc/config/version.py | 4 - tmtc/common_tmtc/pus_tc/__init__.py | 0 tmtc/common_tmtc/pus_tc/command_data.py | 11 -- tmtc/common_tmtc/pus_tc/service_17_test.py | 9 - tmtc/common_tmtc/pus_tc/service_200_mode.py | 52 ------ .../pus_tc/service_20_parameters.py | 93 ----------- tmtc/common_tmtc/pus_tc/service_2_raw_cmd.py | 77 --------- .../pus_tc/service_3_housekeeping.py | 154 ------------------ tmtc/common_tmtc/pus_tc/service_8_func_cmd.py | 56 ------- tmtc/common_tmtc/pus_tc/tc_packing.py | 50 ------ tmtc/common_tmtc/pus_tm/__init__.py | 0 tmtc/common_tmtc/pus_tm/factory_hook.py | 52 ------ .../pus_tm/service_3_hk_handling.py | 70 -------- tmtc/common_tmtc/pus_tm/service_8_handling.py | 19 --- 21 files changed, 787 deletions(-) delete mode 100644 tmtc/common_tmtc/__init__.py delete mode 100644 tmtc/common_tmtc/config/__init__.py delete mode 100644 tmtc/common_tmtc/config/custom_definitions.py delete mode 100644 tmtc/common_tmtc/config/custom_mode_op.py delete mode 100644 tmtc/common_tmtc/config/definitions.py delete mode 100644 tmtc/common_tmtc/config/hook_implementation.py delete mode 100644 tmtc/common_tmtc/config/object_ids.py delete mode 100644 tmtc/common_tmtc/config/version.py delete mode 100644 tmtc/common_tmtc/pus_tc/__init__.py delete mode 100644 tmtc/common_tmtc/pus_tc/command_data.py delete mode 100644 tmtc/common_tmtc/pus_tc/service_17_test.py delete mode 100644 tmtc/common_tmtc/pus_tc/service_200_mode.py delete mode 100644 tmtc/common_tmtc/pus_tc/service_20_parameters.py delete mode 100644 tmtc/common_tmtc/pus_tc/service_2_raw_cmd.py delete mode 100644 tmtc/common_tmtc/pus_tc/service_3_housekeeping.py delete mode 100644 tmtc/common_tmtc/pus_tc/service_8_func_cmd.py delete mode 100644 tmtc/common_tmtc/pus_tc/tc_packing.py delete mode 100644 tmtc/common_tmtc/pus_tm/__init__.py delete mode 100644 tmtc/common_tmtc/pus_tm/factory_hook.py delete mode 100644 tmtc/common_tmtc/pus_tm/service_3_hk_handling.py delete mode 100644 tmtc/common_tmtc/pus_tm/service_8_handling.py diff --git a/tmtc/common_tmtc/__init__.py b/tmtc/common_tmtc/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tmtc/common_tmtc/config/__init__.py b/tmtc/common_tmtc/config/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tmtc/common_tmtc/config/custom_definitions.py b/tmtc/common_tmtc/config/custom_definitions.py deleted file mode 100644 index b34597e..0000000 --- a/tmtc/common_tmtc/config/custom_definitions.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -@brief This file transfers control of the custom definitions like modes to the user. -@details Template configuration file. Copy this folder to the TMTC commander root and adapt - it to your needs. -""" - -import enum -from enum import auto - - -class CustomServiceList(enum.IntEnum): - pass - - -class SerialConfig(enum.Enum): - SERIAL_PORT = auto() - SERIAL_BAUD_RATE = auto() - SERIAL_TIMEOUT = auto() - SERIAL_COMM_TYPE = auto() - - -class EthernetConfig(enum.Enum): - SEND_ADDRESS = auto() - RECV_ADDRESS = auto() diff --git a/tmtc/common_tmtc/config/custom_mode_op.py b/tmtc/common_tmtc/config/custom_mode_op.py deleted file mode 100644 index 5977c3c..0000000 --- a/tmtc/common_tmtc/config/custom_mode_op.py +++ /dev/null @@ -1,19 +0,0 @@ -""" -@brief This file transfers control of custom mode handling to the user. -@details Template configuration file. Copy this folder to the TMTC commander root and adapt - it to your needs. -""" -import sys - -from tmtccmd.core.backend import TmTcHandler -from tmtccmd.utility.logger import get_logger - -LOGGER = get_logger() - - -def perform_mode_operation_user(tmtc_backend: TmTcHandler, mode: int): - """ - Custom modes can be implemented here - """ - LOGGER.error(f"Unknown mode {mode}, Configuration error !") - sys.exit() diff --git a/tmtc/common_tmtc/config/definitions.py b/tmtc/common_tmtc/config/definitions.py deleted file mode 100644 index c517c70..0000000 --- a/tmtc/common_tmtc/config/definitions.py +++ /dev/null @@ -1 +0,0 @@ -PUS_APID = 0xef \ No newline at end of file diff --git a/tmtc/common_tmtc/config/hook_implementation.py b/tmtc/common_tmtc/config/hook_implementation.py deleted file mode 100644 index 768ea8f..0000000 --- a/tmtc/common_tmtc/config/hook_implementation.py +++ /dev/null @@ -1,77 +0,0 @@ -import argparse -from typing import Dict, Tuple, Optional - -from tmtccmd.com_if.com_interface_base import CommunicationInterface -from tmtccmd.config.definitions import ServiceOpCodeDictT -from tmtccmd.config.hook import TmTcHookBase -from tmtccmd.core.backend import TmTcHandler -from tmtccmd.pus_tc.definitions import TcQueueT -from tmtccmd.pus_tm.service_3_base import Service3Base -from tmtccmd.ecss.conf import PusVersion -from tmtccmd.utility.tmtc_printer import TmTcPrinter - -from common_tmtc.config.definitions import PUS_APID - - -class FsfwHookBase(TmTcHookBase): - - def get_version(self) -> str: - from common_tmtc.config.version import SW_NAME, SW_VERSION, SW_SUBVERSION, SW_SUBSUBVERSION - return f"{SW_NAME} {SW_VERSION}.{SW_SUBVERSION}.{SW_SUBSUBVERSION}" - - def get_json_config_file_path(self) -> str: - return "config/tmtc_config.json" - - def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT: - from tmtccmd.config.globals import get_default_service_op_code_dict - return get_default_service_op_code_dict() - - def add_globals_pre_args_parsing(self, gui: bool = False): - from tmtccmd.config.globals import set_default_globals_pre_args_parsing - set_default_globals_pre_args_parsing( - gui=gui, pus_tm_version=PusVersion.PUS_C, pus_tc_version=PusVersion.PUS_C, apid=PUS_APID - ) - - def add_globals_post_args_parsing(self, args: argparse.Namespace): - from tmtccmd.config.globals import set_default_globals_post_args_parsing - set_default_globals_post_args_parsing( - args=args, json_cfg_path=self.get_json_config_file_path() - ) - - def assign_communication_interface( - self, com_if_key: str, tmtc_printer: TmTcPrinter - ) -> Optional[CommunicationInterface]: - from tmtccmd.config.com_if import create_communication_interface_default - return create_communication_interface_default( - com_if_key=com_if_key, tmtc_printer=tmtc_printer, - json_cfg_path=self.get_json_config_file_path() - ) - - def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int): - print("No custom mode operation implemented") - - def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT): - from common_tmtc.pus_tc.tc_packing import pack_service_queue_user - pack_service_queue_user(service=service, op_code=op_code, tc_queue=service_queue) - - def get_object_ids(self) -> Dict[bytes, list]: - from common_tmtc.config.object_ids import get_object_ids - return get_object_ids() - - @staticmethod - def handle_service_8_telemetry( - object_id: int, action_id: int, custom_data: bytearray - ) -> Tuple[list, list]: - from common_tmtc.pus_tm.service_8_handling import custom_service_8_handling - return custom_service_8_handling( - object_id=object_id, action_id=action_id, custom_data=custom_data - ) - - @staticmethod - def handle_service_3_housekeeping( - object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base - ) -> Tuple[list, list, bytearray, int]: - from common_tmtc.pus_tm import service_3_hk_handling - return service_3_hk_handling( - object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet - ) diff --git a/tmtc/common_tmtc/config/object_ids.py b/tmtc/common_tmtc/config/object_ids.py deleted file mode 100644 index cbea7e3..0000000 --- a/tmtc/common_tmtc/config/object_ids.py +++ /dev/null @@ -1,19 +0,0 @@ -""" -@brief This file transfers control of the object IDs to the user. -@details Template configuration file. Copy this folder to the TMTC commander root and adapt - it to your needs. -""" -from typing import Dict - -PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17]) -TEST_DEVICE_0_ID = bytes([0x44, 0x01, 0xAF, 0xFE]) -TEST_DEVICE_1_ID = bytes([0x44, 0x02, 0xAF, 0xFE]) - - -def get_object_ids() -> Dict[bytes, list]: - object_id_dict = { - PUS_SERVICE_17_ID: ["PUS Service 17"], - TEST_DEVICE_0_ID: ["Test Device 0"], - TEST_DEVICE_1_ID: ["Test Device 1"] - } - return object_id_dict diff --git a/tmtc/common_tmtc/config/version.py b/tmtc/common_tmtc/config/version.py deleted file mode 100644 index 2c9e6a8..0000000 --- a/tmtc/common_tmtc/config/version.py +++ /dev/null @@ -1,4 +0,0 @@ -SW_NAME = "fsfw-tmtc" -SW_VERSION = 1 -SW_SUBVERSION = 2 -SW_SUBSUBVERSION = 0 diff --git a/tmtc/common_tmtc/pus_tc/__init__.py b/tmtc/common_tmtc/pus_tc/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tmtc/common_tmtc/pus_tc/command_data.py b/tmtc/common_tmtc/pus_tc/command_data.py deleted file mode 100644 index 3142021..0000000 --- a/tmtc/common_tmtc/pus_tc/command_data.py +++ /dev/null @@ -1,11 +0,0 @@ -import struct - - -# Commands -TEST_COMMAND_0 = struct.pack("!I", 1) -TEST_COMMAND_1 = struct.pack("!I", 2) - -TEST_COMMAND_1_PARAM_1 = bytearray([0xBA, 0xB0]) -TEST_COMMAND_1_PARAM_2 = bytearray([0x00, 0x00, 0x00, 0x52, 0x4F, 0x42, 0x49, 0x4E]) - -DUMMY_COMMAND_3 = bytearray([0xBA, 0xDE, 0xAF, 0xFE]) diff --git a/tmtc/common_tmtc/pus_tc/service_17_test.py b/tmtc/common_tmtc/pus_tc/service_17_test.py deleted file mode 100644 index 0a290fa..0000000 --- a/tmtc/common_tmtc/pus_tc/service_17_test.py +++ /dev/null @@ -1,9 +0,0 @@ -from tmtccmd.pus_tc.definitions import TcQueueT -from tmtccmd.pus_tc.service_17_test import pack_service17_ping_command, pack_generic_service17_test - - -def pack_service_17_commands(op_code: str, init_ssc: int, tc_queue: TcQueueT): - if op_code == "0": - tc_queue.appendleft(pack_service17_ping_command(ssc=init_ssc).pack_command_tuple()) - else: - pack_generic_service17_test(tc_queue=tc_queue, init_ssc=init_ssc) diff --git a/tmtc/common_tmtc/pus_tc/service_200_mode.py b/tmtc/common_tmtc/pus_tc/service_200_mode.py deleted file mode 100644 index eac75a1..0000000 --- a/tmtc/common_tmtc/pus_tc/service_200_mode.py +++ /dev/null @@ -1,52 +0,0 @@ -# -*- coding: utf-8 -*- -""" -@file tmtcc_tc_service_200_mode.py -@brief PUS Service 200: PUS custom service 200: Mode commanding -@author R. Mueller -@date 02.05.2020 -""" -from tmtccmd.config.definitions import QueueCommands -from tmtccmd.ecss.tc import PusTelecommand -from tmtccmd.pus_tc.packer import TcQueueT -from tmtccmd.pus_tc.service_200_mode import pack_mode_data - -from common_tmtc.config.object_ids import TEST_DEVICE_0_ID - - -def pack_service_200_commands_into(tc_queue: TcQueueT, op_code: str): - if op_code == "0": - pack_service_200_test_into(tc_queue=tc_queue, init_ssc=2000) - - -def pack_service_200_test_into(init_ssc: int, tc_queue: TcQueueT) -> int: - new_ssc = init_ssc - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200")) - # Object ID: DUMMY Device - object_id = TEST_DEVICE_0_ID - # Set On Mode - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On")) - mode_data = pack_mode_data(object_id, 1, 0) - command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 - # Set Normal mode - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal")) - mode_data = pack_mode_data(object_id, 2, 0) - command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 - # Set Raw Mode - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw")) - mode_data = pack_mode_data(object_id, 3, 0) - command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 - # Set Off Mode - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off")) - mode_data = pack_mode_data(object_id, 0, 0) - command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 - tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt")) - return new_ssc - diff --git a/tmtc/common_tmtc/pus_tc/service_20_parameters.py b/tmtc/common_tmtc/pus_tc/service_20_parameters.py deleted file mode 100644 index 3ee5564..0000000 --- a/tmtc/common_tmtc/pus_tc/service_20_parameters.py +++ /dev/null @@ -1,93 +0,0 @@ -import struct - -from tmtccmd.config.definitions import QueueCommands -from tmtccmd.ecss.tc import PusTelecommand -from tmtccmd.pus_tc.definitions import TcQueueT -from tmtccmd.pus_tc.service_20_parameter import pack_type_and_matrix_data, \ - pack_parameter_id -from tmtccmd.pus_tc.service_200_mode import pack_mode_data -from tmtccmd.utility.logger import get_console_logger - -from common_tmtc.config.object_ids import TEST_DEVICE_0_ID - -LOGGER = get_console_logger() - - -def pack_service20_commands_into(tc_queue: TcQueueT, op_code: str): - if op_code == "0": - pack_service20_test_into(tc_queue=tc_queue) - - -def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False): - if called_externally is False: - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20")) - object_id = TEST_DEVICE_0_ID - - # set mode normal - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Set Normal Mode")) - mode_data = pack_mode_data(object_id, 2, 0) - command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - - load_param_0_simple_test_commands(tc_queue=tc_queue) - load_param_1_simple_test_commands(tc_queue=tc_queue) - load_param_2_simple_test_commands(tc_queue=tc_queue) - - if called_externally is False: - tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service20.txt")) - - -def load_param_0_simple_test_commands(tc_queue: TcQueueT): - object_id = TEST_DEVICE_0_ID - parameter_id_0 = pack_parameter_id(domain_id=0, unique_id=0, linear_index=0) - # test checking Load for uint32_t - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load uint32_t")) - type_and_matrix_data = pack_type_and_matrix_data(3, 14, 1, 1) - parameter_data = struct.pack("!I", 42) - payload = object_id + parameter_id_0 + type_and_matrix_data + parameter_data - command = PusTelecommand(service=20, subservice=128, ssc=2010, app_data=payload) - tc_queue.appendleft(command.pack_command_tuple()) - - # test checking Dump for uint32_t - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump uint32_t")) - payload = object_id + parameter_id_0 - command = PusTelecommand(service=20, subservice=129, ssc=2020, app_data=payload) - tc_queue.appendleft(command.pack_command_tuple()) - - -def load_param_1_simple_test_commands(tc_queue: TcQueueT): - object_id = TEST_DEVICE_0_ID - parameter_id_1 = pack_parameter_id(domain_id=0, unique_id=1, linear_index=0) - # test checking Load for int32_t - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load int32_t")) - type_and_matrix_data = pack_type_and_matrix_data(4, 14, 1, 1) - parameter_data = struct.pack("!i", -42) - payload = object_id + parameter_id_1 + type_and_matrix_data + parameter_data - command = PusTelecommand(service=20, subservice=128, ssc=2030, app_data=payload) - tc_queue.appendleft(command.pack_command_tuple()) - - # test checking Dump for int32_t - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump int32_t")) - payload = object_id + parameter_id_1 - command = PusTelecommand(service=20, subservice=129, ssc=2040, app_data=payload) - tc_queue.appendleft(command.pack_command_tuple()) - - -def load_param_2_simple_test_commands(tc_queue: TcQueueT): - object_id = TEST_DEVICE_0_ID - parameter_id_2 = pack_parameter_id(domain_id=0, unique_id=2, linear_index=0) - # test checking Load for float - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load float")) - type_and_matrix_data = pack_type_and_matrix_data(ptc=5, pfc=1, rows=1, columns=3) - parameter_data = struct.pack("!fff", 4.2, -4.2, 49) - payload = object_id + parameter_id_2 + type_and_matrix_data + parameter_data - command = PusTelecommand(service=20, subservice=128, ssc=2050, app_data=payload) - tc_queue.appendleft(command.pack_command_tuple()) - - # test checking Dump for float - # Skip dump for now, still not properly implemented - # tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump float")) - # payload = object_id + parameter_id_2 - # command = PusTelecommand(service=20, subservice=129, ssc=2060, app_data=payload) - # tc_queue.appendleft(command.pack_command_tuple()) - diff --git a/tmtc/common_tmtc/pus_tc/service_2_raw_cmd.py b/tmtc/common_tmtc/pus_tc/service_2_raw_cmd.py deleted file mode 100644 index 450bfd1..0000000 --- a/tmtc/common_tmtc/pus_tc/service_2_raw_cmd.py +++ /dev/null @@ -1,77 +0,0 @@ -# -*- coding: utf-8 -*- -""" -@file tmtcc_tc_service_2_raw_cmd.py -@brief PUS Service 2: Device Access, native low-level commanding -@author R. Mueller -@date 01.11.2019 -""" -import struct - -from tmtccmd.config.definitions import QueueCommands -from tmtccmd.ecss.tc import PusTelecommand -from tmtccmd.pus_tc.definitions import TcQueueT -from common_tmtc.pus_tc.service_200_mode import pack_mode_data - -from common_tmtc import pus_tc as cmd_data -from common_tmtc.config.object_ids import TEST_DEVICE_0_ID - - -def pack_service_2_commands_into(tc_queue: TcQueueT, op_code: str): - if op_code == "0": - pack_generic_service_2_test_into(0, tc_queue) - else: - print(f"pack_service_2_test: Operation code {op_code} unknown!") - - -def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int: - new_ssc = init_ssc - object_id = TEST_DEVICE_0_ID # dummy device - # Set Raw Mode - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Raw Mode")) - mode_data = pack_mode_data(object_id, 3, 0) - command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 - # toggle wiretapping raw - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Toggling Wiretapping Raw")) - wiretapping_toggle_data = pack_wiretapping_mode(object_id, 1) - toggle_wiretapping_on_command = PusTelecommand( - service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data - ) - tc_queue.appendleft(toggle_wiretapping_on_command.pack_command_tuple()) - new_ssc += 1 - # send raw command, wiretapping should be returned via TM[2,130] and TC[2,131] - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Sending Raw Command")) - raw_command = cmd_data.TEST_COMMAND_0 - raw_data = object_id + raw_command - raw_command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data) - tc_queue.appendleft(raw_command.pack_command_tuple()) - new_ssc += 1 - # toggle wiretapping off - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Toggle Wiretapping Off")) - wiretapping_toggle_data = pack_wiretapping_mode(object_id, 0) - toggle_wiretapping_off_command = PusTelecommand(service=2, subservice=129, ssc=new_ssc, - app_data=wiretapping_toggle_data) - tc_queue.appendleft(toggle_wiretapping_off_command.pack_command_tuple()) - new_ssc += 1 - # send raw command which should be returned via TM[2,130] - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Send second raw command")) - command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 - - # Set mode off - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Off Mode")) - mode_data = pack_mode_data(object_id, 0, 0) - command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 - tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service_2.txt")) - return new_ssc - - -# wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW -def pack_wiretapping_mode(object_id, wiretapping_mode_): - wiretapping_mode = struct.pack(">B", wiretapping_mode_) # MODE_OFF : 0x00, MODE_RAW: 0x01 - wiretapping_toggle_data = object_id + wiretapping_mode - return wiretapping_toggle_data diff --git a/tmtc/common_tmtc/pus_tc/service_3_housekeeping.py b/tmtc/common_tmtc/pus_tc/service_3_housekeeping.py deleted file mode 100644 index 7c4268d..0000000 --- a/tmtc/common_tmtc/pus_tc/service_3_housekeeping.py +++ /dev/null @@ -1,154 +0,0 @@ -from tmtccmd.config.definitions import QueueCommands -from tmtccmd.pus_tc.service_200_mode import pack_mode_data -from tmtccmd.pus_tc.service_20_parameter import pack_boolean_parameter_command -from tmtccmd.pus_tc.service_3_housekeeping import make_sid, generate_one_hk_command, \ - Srv3Subservice -from tmtccmd.ecss.tc import PusTelecommand -from tmtccmd.pus_tc.definitions import TcQueueT -from tmtccmd.pus_tc.service_8_functional_cmd import generate_action_command - -from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID - - -# Set IDs -TEST_SET_ID = 0 - -# Action IDs -TEST_NOTIFICATION_ACTION_ID = 3 - -# Parameters -PARAM_ACTIVATE_CHANGING_DATASETS = 4 - - -def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str): - current_ssc = 3000 - # TODO: Import this from config instead - device_idx = 0 - if device_idx == 0: - object_id = TEST_DEVICE_0_ID - else: - object_id = TEST_DEVICE_1_ID - - if op_code == "0": - # This will pack all the tests - pack_service_3_test_info(tc_queue=tc_queue, init_ssc=current_ssc, object_id=object_id, - device_idx=device_idx) - elif op_code == "1": - # Extremely simple, generate one HK packet - pack_gen_one_hk_command(tc_queue=tc_queue, device_idx=device_idx, init_ssc=current_ssc, - object_id=object_id) - elif op_code == "2": - # Housekeeping basic test - pack_housekeeping_basic_test(tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc) - elif op_code == "3": - # Notification demo - pack_notification_basic_test(tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc) - - -def pack_service_3_test_info(tc_queue: TcQueueT, device_idx: int, object_id: bytearray, - init_ssc: int): - tc_queue.appendleft((QueueCommands.PRINT, "Service 3 (Housekeeping Service): All tests")) - current_ssc = init_ssc - - current_ssc += pack_gen_one_hk_command( - tc_queue=tc_queue, device_idx=device_idx, object_id=object_id, init_ssc=current_ssc - ) - current_ssc += pack_housekeeping_basic_test( - tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc - ) - current_ssc += pack_notification_basic_test( - tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc, enable_normal_mode=False - ) - - -def pack_gen_one_hk_command( - tc_queue: TcQueueT, device_idx: int, init_ssc: int, object_id: bytearray -) -> int: - test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID) - tc_queue.appendleft( - (QueueCommands.PRINT, f"Service 3 Test: Generate one test set packet for " - f"test device {device_idx}") - ) - command = generate_one_hk_command(ssc=init_ssc, sid=test_sid) - init_ssc += 1 - tc_queue.appendleft(command.pack_command_tuple()) - return init_ssc - - -def pack_housekeeping_basic_test( - tc_queue: TcQueueT, object_id: bytearray, init_ssc: int, enable_normal_mode: bool = True -) -> int: - """ - This basic test will request one HK packet, then it will enable periodic packets and listen - to the periodic packets for a few seconds. After that, HK packets will be disabled again. - """ - test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID) - current_ssc = init_ssc - # Enable changing datasets via parameter service (Service 20) - tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Performing basic HK tests")) - - if enable_normal_mode: - # Set mode normal so that sets are changed/read regularly - tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode")) - mode_data = pack_mode_data(object_id, 2, 0) - command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data) - current_ssc += 1 - tc_queue.appendleft(command.pack_command_tuple()) - - tc_queue.appendleft((QueueCommands.PRINT, "Enabling changing datasets")) - command = pack_boolean_parameter_command( - object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS, - parameter=True, ssc=current_ssc - ) - current_ssc += 1 - tc_queue.appendleft(command.pack_command_tuple()) - - # Enable periodic reporting - tc_queue.appendleft((QueueCommands.PRINT, - "Enabling periodic thermal sensor packet generation: ")) - command = PusTelecommand(service=3, subservice=Srv3Subservice.ENABLE_PERIODIC_HK_GEN.value, - ssc=current_ssc, app_data=test_sid) - current_ssc += 1 - tc_queue.appendleft(command.pack_command_tuple()) - - tc_queue.appendleft((QueueCommands.WAIT, 2.0)) - - # Disable periodic reporting - tc_queue.appendleft((QueueCommands.PRINT, - "Disabling periodic thermal sensor packet generation: ")) - command = PusTelecommand(service=3, subservice=Srv3Subservice.DISABLE_PERIODIC_HK_GEN.value, - ssc=current_ssc, app_data=test_sid) - current_ssc += 1 - tc_queue.appendleft(command.pack_command_tuple()) - - # Disable changing datasets via parameter service (Service 20) - tc_queue.appendleft((QueueCommands.PRINT, "Disabling changing datasets")) - command = pack_boolean_parameter_command( - object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS, - parameter=False, ssc=current_ssc - ) - current_ssc += 1 - tc_queue.appendleft(command.pack_command_tuple()) - return current_ssc - - -def pack_notification_basic_test(tc_queue: TcQueueT, object_id: bytearray, init_ssc: int, - enable_normal_mode: bool = True) -> int: - current_ssc = init_ssc - tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Performing notification tests")) - - if enable_normal_mode: - # Set mode normal so that sets are changed/read regularly - tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode")) - mode_data = pack_mode_data(object_id, 2, 0) - command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data) - current_ssc += 1 - tc_queue.appendleft(command.pack_command_tuple()) - - tc_queue.appendleft((QueueCommands.PRINT, "Triggering notification")) - command = generate_action_command( - object_id=object_id, action_id=TEST_NOTIFICATION_ACTION_ID, ssc=current_ssc - ) - tc_queue.appendleft(command.pack_command_tuple()) - current_ssc += 1 - return current_ssc diff --git a/tmtc/common_tmtc/pus_tc/service_8_func_cmd.py b/tmtc/common_tmtc/pus_tc/service_8_func_cmd.py deleted file mode 100644 index 81a3f3c..0000000 --- a/tmtc/common_tmtc/pus_tc/service_8_func_cmd.py +++ /dev/null @@ -1,56 +0,0 @@ -from tmtccmd.config.definitions import QueueCommands -from tmtccmd.ecss.tc import PusTelecommand -from tmtccmd.pus_tc.definitions import TcQueueT - -from common_tmtc import pus_tc as cmd_data -from common_tmtc.pus_tc.service_200_mode import pack_mode_data - -from common_tmtc.config.object_ids import TEST_DEVICE_0_ID - - -def pack_service_8_commands_into(tc_queue: TcQueueT, op_code: str): - if op_code == "0": - pack_generic_service_8_test_into(tc_queue=tc_queue) - else: - print(f"pack_service_8_test: Operation code {op_code} unknown!") - - -def pack_generic_service_8_test_into(tc_queue: TcQueueT): - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8")) - object_id = TEST_DEVICE_0_ID - - # set mode on - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set On Mode")) - mode_data = pack_mode_data(object_id, 1, 0) - command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - - # set mode normal - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Normal Mode")) - mode_data = pack_mode_data(object_id, 2, 0) - command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - - # Direct command which triggers completion reply - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Trigger Step and Completion Reply")) - action_id = cmd_data.TEST_COMMAND_0 - direct_command = object_id + action_id - command = PusTelecommand(service=8, subservice=128, ssc=820, app_data=direct_command) - tc_queue.appendleft(command.pack_command_tuple()) - - # Direct command which triggers _tm_data reply - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Trigger Data Reply")) - action_id = cmd_data.TEST_COMMAND_1 - command_param1 = cmd_data.TEST_COMMAND_1_PARAM_1 - command_param2 = cmd_data.TEST_COMMAND_1_PARAM_2 - direct_command = object_id + action_id + command_param1 + command_param2 - command = PusTelecommand(service=8, subservice=128, ssc=830, app_data=direct_command) - tc_queue.appendleft(command.pack_command_tuple()) - - # Set mode off - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Off Mode")) - mode_data = pack_mode_data(object_id, 0, 0) - command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - - tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service_8.txt")) diff --git a/tmtc/common_tmtc/pus_tc/tc_packing.py b/tmtc/common_tmtc/pus_tc/tc_packing.py deleted file mode 100644 index c4d1113..0000000 --- a/tmtc/common_tmtc/pus_tc/tc_packing.py +++ /dev/null @@ -1,50 +0,0 @@ -""" -@brief This file transfers control of TC packing to the user -@details Template configuration file. Copy this folder to the TMTC commander root and adapt - it to your needs. -""" - -import os -from collections import deque -from typing import Union - -from common_tmtc.pus_tc.service_20_parameters import pack_service20_commands_into -from common_tmtc.pus_tc.service_2_raw_cmd import pack_service_2_commands_into -from common_tmtc.pus_tc.service_3_housekeeping import pack_service_3_commands_into -from common_tmtc.pus_tc.service_17_test import pack_service_17_commands -from common_tmtc.pus_tc.service_8_func_cmd import pack_service_8_commands_into -from tmtccmd.utility.logger import get_console_logger -from tmtccmd.pus_tc.definitions import TcQueueT -from tmtccmd.config.definitions import CoreServiceList -from tmtccmd.pus_tc.service_5_event import pack_generic_service5_test_into -from tmtccmd.pus_tc.service_17_test import pack_generic_service17_test -from common_tmtc.pus_tc.service_200_mode import pack_service_200_commands_into - -LOGGER = get_console_logger() - - -def pack_service_queue_user(service: Union[str, int], op_code: str, tc_queue: TcQueueT): - if service == CoreServiceList.SERVICE_2.value: - return pack_service_2_commands_into(op_code=op_code, tc_queue=tc_queue) - if service == CoreServiceList.SERVICE_3.value: - return pack_service_3_commands_into(op_code=op_code, tc_queue=tc_queue) - if service == CoreServiceList.SERVICE_5.value: - return pack_generic_service5_test_into(tc_queue=tc_queue) - if service == CoreServiceList.SERVICE_8.value: - return pack_service_8_commands_into(op_code=op_code, tc_queue=tc_queue) - if service == CoreServiceList.SERVICE_17.value: - return pack_service_17_commands(op_code=op_code, tc_queue=tc_queue, init_ssc=0) - if service == CoreServiceList.SERVICE_20.value: - return pack_service20_commands_into(tc_queue=tc_queue, op_code=op_code) - if service == CoreServiceList.SERVICE_200.value: - return pack_service_200_commands_into(tc_queue=tc_queue, op_code=op_code) - LOGGER.warning("Invalid Service !") - - -def create_total_tc_queue_user() -> TcQueueT: - if not os.path.exists("log"): - os.mkdir("log") - tc_queue = deque() - pack_service_2_commands_into(op_code="0", tc_queue=tc_queue) - pack_generic_service17_test(init_ssc=1700, tc_queue=tc_queue) - return tc_queue diff --git a/tmtc/common_tmtc/pus_tm/__init__.py b/tmtc/common_tmtc/pus_tm/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tmtc/common_tmtc/pus_tm/factory_hook.py b/tmtc/common_tmtc/pus_tm/factory_hook.py deleted file mode 100644 index 3bd23a3..0000000 --- a/tmtc/common_tmtc/pus_tm/factory_hook.py +++ /dev/null @@ -1,52 +0,0 @@ -""" -@brief This file transfers control of TM parsing to the user -@details Template configuration file. Copy this folder to the TMTC commander root and adapt - it to your needs. -""" - -from tmtccmd.ecss.tm import PusTelemetry -from tmtccmd.utility.logger import get_console_logger -from tmtccmd.pus_tm.service_1_verification import Service1TM -from tmtccmd.pus_tm.service_2_raw_cmd import Service2TM -from tmtccmd.pus_tm.service_3_housekeeping import Service3TM -from tmtccmd.pus_tm.service_5_event import Service5TM -from tmtccmd.pus_tm.service_8_functional_cmd import Service8TM -from tmtccmd.pus_tm.service_17_test import Service17TM -from tmtccmd.pus_tm.service_20_parameters import Service20TM -from tmtccmd.pus_tm.service_200_mode import Service200TM -from tmtccmd.utility.tmtc_printer import TmTcPrinter - -from common_tmtc.config.definitions import PUS_APID - - -LOGGER = get_console_logger() - - -def ccsds_tm_handler(apid: int, raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter) -> None: - if apid == PUS_APID: - pus_packet_factory(raw_tm_packet=raw_tm_packet, tmtc_printer=tmtc_printer) - - -def pus_packet_factory(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter): - service_type = raw_tm_packet[7] - tm_packet = None - if service_type == 1: - tm_packet = Service1TM(raw_tm_packet) - if service_type == 2: - tm_packet = Service2TM(raw_tm_packet) - if service_type == 3: - tm_packet = Service3TM(raw_tm_packet) - if service_type == 8: - tm_packet = Service8TM(raw_tm_packet) - if service_type == 5: - tm_packet = Service5TM(raw_tm_packet) - if service_type == 17: - tm_packet = Service17TM(raw_tm_packet) - if service_type == 20: - tm_packet = Service20TM(raw_tm_packet) - if service_type == 200: - tm_packet = Service200TM(raw_tm_packet) - if tm_packet is None: - LOGGER.info("The service " + str(service_type) + " is not implemented in Telemetry Factory") - tm_packet = PusTelemetry(raw_tm_packet) - tmtc_printer.print_telemetry(packet=tm_packet) diff --git a/tmtc/common_tmtc/pus_tm/service_3_hk_handling.py b/tmtc/common_tmtc/pus_tm/service_3_hk_handling.py deleted file mode 100644 index d9966f7..0000000 --- a/tmtc/common_tmtc/pus_tm/service_3_hk_handling.py +++ /dev/null @@ -1,70 +0,0 @@ -""" -@brief This file transfers control of housekeeping handling (PUS service 3) to the - developer -@details Template configuration file. Copy this folder to the TMTC commander root and adapt - it to your needs. -""" -import struct -from typing import Tuple -from tmtccmd.pus_tm.service_3_housekeeping import Service3Base -from tmtccmd.utility.logger import get_console_logger - -from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID -LOGGER = get_console_logger() - - -def service_3_hk_handling( - object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base -) -> Tuple[list, list, bytearray, int]: - """ - This function is called when a Service 3 Housekeeping packet is received. - - Please note that the object IDs should be compared by value because direct comparison of - enumerations does not work in Python. For example use: - - if object_id.value == ObjectIds.TEST_OBJECT.value - - to test equality based on the object ID list. - - @param object_id: - @param set_id: - @param hk_data: - @param service3_packet: - @return: Expects a tuple, consisting of two lists, a bytearray and an integer - The first list contains the header columns, the second list the list with - the corresponding values. The bytearray is the validity buffer, which is usually appended - at the end of the housekeeping packet. The last value is the number of parameters. - """ - if object_id == TEST_DEVICE_0_ID or object_id == TEST_DEVICE_1_ID: - return handle_test_set_deserialization(hk_data=hk_data) - else: - LOGGER.info("Service3TM: Parsing for this SID has not been implemented.") - return [], [], bytearray(), 0 - - -def handle_test_set_deserialization(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: - header_list = [] - content_list = [] - validity_buffer = bytearray() - # uint8 (1) + uint32_t (4) + float vector with 3 entries (12) + validity buffer (1) - if len(hk_data) < 18: - LOGGER.warning("Invalid HK data format for test set reply!") - return header_list, content_list, validity_buffer, 0 - uint8_value = struct.unpack('!B', hk_data[0:1])[0] - uint32_value = struct.unpack('!I', hk_data[1:5])[0] - float_value_1 = struct.unpack('!f', hk_data[5:9])[0] - float_value_2 = struct.unpack('!f', hk_data[9:13])[0] - float_value_3 = struct.unpack('!f', hk_data[13:17])[0] - validity_buffer.append(hk_data[17]) - header_list.append("uint8 value") - header_list.append("uint32 value") - header_list.append("float vec value 1") - header_list.append("float vec value 2") - header_list.append("float vec value 3") - - content_list.append(uint8_value) - content_list.append(uint32_value) - content_list.append(float_value_1) - content_list.append(float_value_2) - content_list.append(float_value_3) - return header_list, content_list, validity_buffer, 3 diff --git a/tmtc/common_tmtc/pus_tm/service_8_handling.py b/tmtc/common_tmtc/pus_tm/service_8_handling.py deleted file mode 100644 index 259e7e4..0000000 --- a/tmtc/common_tmtc/pus_tm/service_8_handling.py +++ /dev/null @@ -1,19 +0,0 @@ -from typing import Tuple - - -def custom_service_8_handling( - object_id: int, action_id: int, custom_data: bytearray) -> Tuple[list, list]: - """ - This function is called by the TMTC core if a Service 8 data reply (subservice 130) - is received. The user can return a tuple of two lists, where the first list - is a list of header strings to print and the second list is a list of values to print. - The TMTC core will take care of printing both lists and logging them. - - @param object_id: - @param action_id: - @param custom_data: - @return: - """ - header_list = [] - content_list = [] - return header_list, content_list