commit 1da4e9caba796fbc5bff1834a6672a515f494f5e Author: Robin Mueller Date: Wed Jul 14 00:28:33 2021 +0200 init commit diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/config/__pycache__/hook_base.cpython-38.pyc b/config/__pycache__/hook_base.cpython-38.pyc new file mode 100644 index 0000000..823af7f Binary files /dev/null and b/config/__pycache__/hook_base.cpython-38.pyc differ diff --git a/config/custom_definitions.py b/config/custom_definitions.py new file mode 100644 index 0000000..b34597e --- /dev/null +++ b/config/custom_definitions.py @@ -0,0 +1,24 @@ +""" +@brief This file transfers control of the custom definitions like modes to the user. +@details Template configuration file. Copy this folder to the TMTC commander root and adapt + it to your needs. +""" + +import enum +from enum import auto + + +class CustomServiceList(enum.IntEnum): + pass + + +class SerialConfig(enum.Enum): + SERIAL_PORT = auto() + SERIAL_BAUD_RATE = auto() + SERIAL_TIMEOUT = auto() + SERIAL_COMM_TYPE = auto() + + +class EthernetConfig(enum.Enum): + SEND_ADDRESS = auto() + RECV_ADDRESS = auto() diff --git a/config/custom_mode_op.py b/config/custom_mode_op.py new file mode 100644 index 0000000..5977c3c --- /dev/null +++ b/config/custom_mode_op.py @@ -0,0 +1,19 @@ +""" +@brief This file transfers control of custom mode handling to the user. +@details Template configuration file. Copy this folder to the TMTC commander root and adapt + it to your needs. +""" +import sys + +from tmtccmd.core.backend import TmTcHandler +from tmtccmd.utility.logger import get_logger + +LOGGER = get_logger() + + +def perform_mode_operation_user(tmtc_backend: TmTcHandler, mode: int): + """ + Custom modes can be implemented here + """ + LOGGER.error(f"Unknown mode {mode}, Configuration error !") + sys.exit() diff --git a/config/definitions.py b/config/definitions.py new file mode 100644 index 0000000..c517c70 --- /dev/null +++ b/config/definitions.py @@ -0,0 +1 @@ +PUS_APID = 0xef \ No newline at end of file diff --git a/config/hook_implementation.py b/config/hook_implementation.py new file mode 100644 index 0000000..768ea8f --- /dev/null +++ b/config/hook_implementation.py @@ -0,0 +1,77 @@ +import argparse +from typing import Dict, Tuple, Optional + +from tmtccmd.com_if.com_interface_base import CommunicationInterface +from tmtccmd.config.definitions import ServiceOpCodeDictT +from tmtccmd.config.hook import TmTcHookBase +from tmtccmd.core.backend import TmTcHandler +from tmtccmd.pus_tc.definitions import TcQueueT +from tmtccmd.pus_tm.service_3_base import Service3Base +from tmtccmd.ecss.conf import PusVersion +from tmtccmd.utility.tmtc_printer import TmTcPrinter + +from common_tmtc.config.definitions import PUS_APID + + +class FsfwHookBase(TmTcHookBase): + + def get_version(self) -> str: + from common_tmtc.config.version import SW_NAME, SW_VERSION, SW_SUBVERSION, SW_SUBSUBVERSION + return f"{SW_NAME} {SW_VERSION}.{SW_SUBVERSION}.{SW_SUBSUBVERSION}" + + def get_json_config_file_path(self) -> str: + return "config/tmtc_config.json" + + def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT: + from tmtccmd.config.globals import get_default_service_op_code_dict + return get_default_service_op_code_dict() + + def add_globals_pre_args_parsing(self, gui: bool = False): + from tmtccmd.config.globals import set_default_globals_pre_args_parsing + set_default_globals_pre_args_parsing( + gui=gui, pus_tm_version=PusVersion.PUS_C, pus_tc_version=PusVersion.PUS_C, apid=PUS_APID + ) + + def add_globals_post_args_parsing(self, args: argparse.Namespace): + from tmtccmd.config.globals import set_default_globals_post_args_parsing + set_default_globals_post_args_parsing( + args=args, json_cfg_path=self.get_json_config_file_path() + ) + + def assign_communication_interface( + self, com_if_key: str, tmtc_printer: TmTcPrinter + ) -> Optional[CommunicationInterface]: + from tmtccmd.config.com_if import create_communication_interface_default + return create_communication_interface_default( + com_if_key=com_if_key, tmtc_printer=tmtc_printer, + json_cfg_path=self.get_json_config_file_path() + ) + + def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int): + print("No custom mode operation implemented") + + def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT): + from common_tmtc.pus_tc.tc_packing import pack_service_queue_user + pack_service_queue_user(service=service, op_code=op_code, tc_queue=service_queue) + + def get_object_ids(self) -> Dict[bytes, list]: + from common_tmtc.config.object_ids import get_object_ids + return get_object_ids() + + @staticmethod + def handle_service_8_telemetry( + object_id: int, action_id: int, custom_data: bytearray + ) -> Tuple[list, list]: + from common_tmtc.pus_tm.service_8_handling import custom_service_8_handling + return custom_service_8_handling( + object_id=object_id, action_id=action_id, custom_data=custom_data + ) + + @staticmethod + def handle_service_3_housekeeping( + object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base + ) -> Tuple[list, list, bytearray, int]: + from common_tmtc.pus_tm import service_3_hk_handling + return service_3_hk_handling( + object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet + ) diff --git a/config/object_ids.py b/config/object_ids.py new file mode 100644 index 0000000..cbea7e3 --- /dev/null +++ b/config/object_ids.py @@ -0,0 +1,19 @@ +""" +@brief This file transfers control of the object IDs to the user. +@details Template configuration file. Copy this folder to the TMTC commander root and adapt + it to your needs. +""" +from typing import Dict + +PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17]) +TEST_DEVICE_0_ID = bytes([0x44, 0x01, 0xAF, 0xFE]) +TEST_DEVICE_1_ID = bytes([0x44, 0x02, 0xAF, 0xFE]) + + +def get_object_ids() -> Dict[bytes, list]: + object_id_dict = { + PUS_SERVICE_17_ID: ["PUS Service 17"], + TEST_DEVICE_0_ID: ["Test Device 0"], + TEST_DEVICE_1_ID: ["Test Device 1"] + } + return object_id_dict diff --git a/config/tmtc_config.json b/config/tmtc_config.json new file mode 100644 index 0000000..6c77d3c --- /dev/null +++ b/config/tmtc_config.json @@ -0,0 +1,6 @@ +{ + "COM_IF_KEY": "udp", + "TCPIP_UDP_RECV_MAX_SIZE": 1500, + "TCPIP_UDP_DEST_IP_ADDRESS": "127.0.0.1", + "TCPIP_UDP_DEST_PORT": 7301 +} \ No newline at end of file diff --git a/config/version.py b/config/version.py new file mode 100644 index 0000000..2c9e6a8 --- /dev/null +++ b/config/version.py @@ -0,0 +1,4 @@ +SW_NAME = "fsfw-tmtc" +SW_VERSION = 1 +SW_SUBVERSION = 2 +SW_SUBSUBVERSION = 0 diff --git a/pus_tc/__init__.py b/pus_tc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pus_tc/command_data.py b/pus_tc/command_data.py new file mode 100644 index 0000000..3142021 --- /dev/null +++ b/pus_tc/command_data.py @@ -0,0 +1,11 @@ +import struct + + +# Commands +TEST_COMMAND_0 = struct.pack("!I", 1) +TEST_COMMAND_1 = struct.pack("!I", 2) + +TEST_COMMAND_1_PARAM_1 = bytearray([0xBA, 0xB0]) +TEST_COMMAND_1_PARAM_2 = bytearray([0x00, 0x00, 0x00, 0x52, 0x4F, 0x42, 0x49, 0x4E]) + +DUMMY_COMMAND_3 = bytearray([0xBA, 0xDE, 0xAF, 0xFE]) diff --git a/pus_tc/service_17_test.py b/pus_tc/service_17_test.py new file mode 100644 index 0000000..0a290fa --- /dev/null +++ b/pus_tc/service_17_test.py @@ -0,0 +1,9 @@ +from tmtccmd.pus_tc.definitions import TcQueueT +from tmtccmd.pus_tc.service_17_test import pack_service17_ping_command, pack_generic_service17_test + + +def pack_service_17_commands(op_code: str, init_ssc: int, tc_queue: TcQueueT): + if op_code == "0": + tc_queue.appendleft(pack_service17_ping_command(ssc=init_ssc).pack_command_tuple()) + else: + pack_generic_service17_test(tc_queue=tc_queue, init_ssc=init_ssc) diff --git a/pus_tc/service_200_mode.py b/pus_tc/service_200_mode.py new file mode 100644 index 0000000..eac75a1 --- /dev/null +++ b/pus_tc/service_200_mode.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +""" +@file tmtcc_tc_service_200_mode.py +@brief PUS Service 200: PUS custom service 200: Mode commanding +@author R. Mueller +@date 02.05.2020 +""" +from tmtccmd.config.definitions import QueueCommands +from tmtccmd.ecss.tc import PusTelecommand +from tmtccmd.pus_tc.packer import TcQueueT +from tmtccmd.pus_tc.service_200_mode import pack_mode_data + +from common_tmtc.config.object_ids import TEST_DEVICE_0_ID + + +def pack_service_200_commands_into(tc_queue: TcQueueT, op_code: str): + if op_code == "0": + pack_service_200_test_into(tc_queue=tc_queue, init_ssc=2000) + + +def pack_service_200_test_into(init_ssc: int, tc_queue: TcQueueT) -> int: + new_ssc = init_ssc + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200")) + # Object ID: DUMMY Device + object_id = TEST_DEVICE_0_ID + # Set On Mode + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On")) + mode_data = pack_mode_data(object_id, 1, 0) + command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + # Set Normal mode + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + # Set Raw Mode + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw")) + mode_data = pack_mode_data(object_id, 3, 0) + command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + # Set Off Mode + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off")) + mode_data = pack_mode_data(object_id, 0, 0) + command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt")) + return new_ssc + diff --git a/pus_tc/service_20_parameters.py b/pus_tc/service_20_parameters.py new file mode 100644 index 0000000..3ee5564 --- /dev/null +++ b/pus_tc/service_20_parameters.py @@ -0,0 +1,93 @@ +import struct + +from tmtccmd.config.definitions import QueueCommands +from tmtccmd.ecss.tc import PusTelecommand +from tmtccmd.pus_tc.definitions import TcQueueT +from tmtccmd.pus_tc.service_20_parameter import pack_type_and_matrix_data, \ + pack_parameter_id +from tmtccmd.pus_tc.service_200_mode import pack_mode_data +from tmtccmd.utility.logger import get_console_logger + +from common_tmtc.config.object_ids import TEST_DEVICE_0_ID + +LOGGER = get_console_logger() + + +def pack_service20_commands_into(tc_queue: TcQueueT, op_code: str): + if op_code == "0": + pack_service20_test_into(tc_queue=tc_queue) + + +def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False): + if called_externally is False: + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20")) + object_id = TEST_DEVICE_0_ID + + # set mode normal + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Set Normal Mode")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + load_param_0_simple_test_commands(tc_queue=tc_queue) + load_param_1_simple_test_commands(tc_queue=tc_queue) + load_param_2_simple_test_commands(tc_queue=tc_queue) + + if called_externally is False: + tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service20.txt")) + + +def load_param_0_simple_test_commands(tc_queue: TcQueueT): + object_id = TEST_DEVICE_0_ID + parameter_id_0 = pack_parameter_id(domain_id=0, unique_id=0, linear_index=0) + # test checking Load for uint32_t + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load uint32_t")) + type_and_matrix_data = pack_type_and_matrix_data(3, 14, 1, 1) + parameter_data = struct.pack("!I", 42) + payload = object_id + parameter_id_0 + type_and_matrix_data + parameter_data + command = PusTelecommand(service=20, subservice=128, ssc=2010, app_data=payload) + tc_queue.appendleft(command.pack_command_tuple()) + + # test checking Dump for uint32_t + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump uint32_t")) + payload = object_id + parameter_id_0 + command = PusTelecommand(service=20, subservice=129, ssc=2020, app_data=payload) + tc_queue.appendleft(command.pack_command_tuple()) + + +def load_param_1_simple_test_commands(tc_queue: TcQueueT): + object_id = TEST_DEVICE_0_ID + parameter_id_1 = pack_parameter_id(domain_id=0, unique_id=1, linear_index=0) + # test checking Load for int32_t + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load int32_t")) + type_and_matrix_data = pack_type_and_matrix_data(4, 14, 1, 1) + parameter_data = struct.pack("!i", -42) + payload = object_id + parameter_id_1 + type_and_matrix_data + parameter_data + command = PusTelecommand(service=20, subservice=128, ssc=2030, app_data=payload) + tc_queue.appendleft(command.pack_command_tuple()) + + # test checking Dump for int32_t + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump int32_t")) + payload = object_id + parameter_id_1 + command = PusTelecommand(service=20, subservice=129, ssc=2040, app_data=payload) + tc_queue.appendleft(command.pack_command_tuple()) + + +def load_param_2_simple_test_commands(tc_queue: TcQueueT): + object_id = TEST_DEVICE_0_ID + parameter_id_2 = pack_parameter_id(domain_id=0, unique_id=2, linear_index=0) + # test checking Load for float + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load float")) + type_and_matrix_data = pack_type_and_matrix_data(ptc=5, pfc=1, rows=1, columns=3) + parameter_data = struct.pack("!fff", 4.2, -4.2, 49) + payload = object_id + parameter_id_2 + type_and_matrix_data + parameter_data + command = PusTelecommand(service=20, subservice=128, ssc=2050, app_data=payload) + tc_queue.appendleft(command.pack_command_tuple()) + + # test checking Dump for float + # Skip dump for now, still not properly implemented + # tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump float")) + # payload = object_id + parameter_id_2 + # command = PusTelecommand(service=20, subservice=129, ssc=2060, app_data=payload) + # tc_queue.appendleft(command.pack_command_tuple()) + diff --git a/pus_tc/service_2_raw_cmd.py b/pus_tc/service_2_raw_cmd.py new file mode 100644 index 0000000..450bfd1 --- /dev/null +++ b/pus_tc/service_2_raw_cmd.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +""" +@file tmtcc_tc_service_2_raw_cmd.py +@brief PUS Service 2: Device Access, native low-level commanding +@author R. Mueller +@date 01.11.2019 +""" +import struct + +from tmtccmd.config.definitions import QueueCommands +from tmtccmd.ecss.tc import PusTelecommand +from tmtccmd.pus_tc.definitions import TcQueueT +from common_tmtc.pus_tc.service_200_mode import pack_mode_data + +from common_tmtc import pus_tc as cmd_data +from common_tmtc.config.object_ids import TEST_DEVICE_0_ID + + +def pack_service_2_commands_into(tc_queue: TcQueueT, op_code: str): + if op_code == "0": + pack_generic_service_2_test_into(0, tc_queue) + else: + print(f"pack_service_2_test: Operation code {op_code} unknown!") + + +def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int: + new_ssc = init_ssc + object_id = TEST_DEVICE_0_ID # dummy device + # Set Raw Mode + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Raw Mode")) + mode_data = pack_mode_data(object_id, 3, 0) + command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + # toggle wiretapping raw + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Toggling Wiretapping Raw")) + wiretapping_toggle_data = pack_wiretapping_mode(object_id, 1) + toggle_wiretapping_on_command = PusTelecommand( + service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data + ) + tc_queue.appendleft(toggle_wiretapping_on_command.pack_command_tuple()) + new_ssc += 1 + # send raw command, wiretapping should be returned via TM[2,130] and TC[2,131] + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Sending Raw Command")) + raw_command = cmd_data.TEST_COMMAND_0 + raw_data = object_id + raw_command + raw_command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data) + tc_queue.appendleft(raw_command.pack_command_tuple()) + new_ssc += 1 + # toggle wiretapping off + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Toggle Wiretapping Off")) + wiretapping_toggle_data = pack_wiretapping_mode(object_id, 0) + toggle_wiretapping_off_command = PusTelecommand(service=2, subservice=129, ssc=new_ssc, + app_data=wiretapping_toggle_data) + tc_queue.appendleft(toggle_wiretapping_off_command.pack_command_tuple()) + new_ssc += 1 + # send raw command which should be returned via TM[2,130] + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Send second raw command")) + command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + + # Set mode off + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Off Mode")) + mode_data = pack_mode_data(object_id, 0, 0) + command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service_2.txt")) + return new_ssc + + +# wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW +def pack_wiretapping_mode(object_id, wiretapping_mode_): + wiretapping_mode = struct.pack(">B", wiretapping_mode_) # MODE_OFF : 0x00, MODE_RAW: 0x01 + wiretapping_toggle_data = object_id + wiretapping_mode + return wiretapping_toggle_data diff --git a/pus_tc/service_3_housekeeping.py b/pus_tc/service_3_housekeeping.py new file mode 100644 index 0000000..7c4268d --- /dev/null +++ b/pus_tc/service_3_housekeeping.py @@ -0,0 +1,154 @@ +from tmtccmd.config.definitions import QueueCommands +from tmtccmd.pus_tc.service_200_mode import pack_mode_data +from tmtccmd.pus_tc.service_20_parameter import pack_boolean_parameter_command +from tmtccmd.pus_tc.service_3_housekeeping import make_sid, generate_one_hk_command, \ + Srv3Subservice +from tmtccmd.ecss.tc import PusTelecommand +from tmtccmd.pus_tc.definitions import TcQueueT +from tmtccmd.pus_tc.service_8_functional_cmd import generate_action_command + +from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID + + +# Set IDs +TEST_SET_ID = 0 + +# Action IDs +TEST_NOTIFICATION_ACTION_ID = 3 + +# Parameters +PARAM_ACTIVATE_CHANGING_DATASETS = 4 + + +def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str): + current_ssc = 3000 + # TODO: Import this from config instead + device_idx = 0 + if device_idx == 0: + object_id = TEST_DEVICE_0_ID + else: + object_id = TEST_DEVICE_1_ID + + if op_code == "0": + # This will pack all the tests + pack_service_3_test_info(tc_queue=tc_queue, init_ssc=current_ssc, object_id=object_id, + device_idx=device_idx) + elif op_code == "1": + # Extremely simple, generate one HK packet + pack_gen_one_hk_command(tc_queue=tc_queue, device_idx=device_idx, init_ssc=current_ssc, + object_id=object_id) + elif op_code == "2": + # Housekeeping basic test + pack_housekeeping_basic_test(tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc) + elif op_code == "3": + # Notification demo + pack_notification_basic_test(tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc) + + +def pack_service_3_test_info(tc_queue: TcQueueT, device_idx: int, object_id: bytearray, + init_ssc: int): + tc_queue.appendleft((QueueCommands.PRINT, "Service 3 (Housekeeping Service): All tests")) + current_ssc = init_ssc + + current_ssc += pack_gen_one_hk_command( + tc_queue=tc_queue, device_idx=device_idx, object_id=object_id, init_ssc=current_ssc + ) + current_ssc += pack_housekeeping_basic_test( + tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc + ) + current_ssc += pack_notification_basic_test( + tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc, enable_normal_mode=False + ) + + +def pack_gen_one_hk_command( + tc_queue: TcQueueT, device_idx: int, init_ssc: int, object_id: bytearray +) -> int: + test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID) + tc_queue.appendleft( + (QueueCommands.PRINT, f"Service 3 Test: Generate one test set packet for " + f"test device {device_idx}") + ) + command = generate_one_hk_command(ssc=init_ssc, sid=test_sid) + init_ssc += 1 + tc_queue.appendleft(command.pack_command_tuple()) + return init_ssc + + +def pack_housekeeping_basic_test( + tc_queue: TcQueueT, object_id: bytearray, init_ssc: int, enable_normal_mode: bool = True +) -> int: + """ + This basic test will request one HK packet, then it will enable periodic packets and listen + to the periodic packets for a few seconds. After that, HK packets will be disabled again. + """ + test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID) + current_ssc = init_ssc + # Enable changing datasets via parameter service (Service 20) + tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Performing basic HK tests")) + + if enable_normal_mode: + # Set mode normal so that sets are changed/read regularly + tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data) + current_ssc += 1 + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "Enabling changing datasets")) + command = pack_boolean_parameter_command( + object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS, + parameter=True, ssc=current_ssc + ) + current_ssc += 1 + tc_queue.appendleft(command.pack_command_tuple()) + + # Enable periodic reporting + tc_queue.appendleft((QueueCommands.PRINT, + "Enabling periodic thermal sensor packet generation: ")) + command = PusTelecommand(service=3, subservice=Srv3Subservice.ENABLE_PERIODIC_HK_GEN.value, + ssc=current_ssc, app_data=test_sid) + current_ssc += 1 + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.WAIT, 2.0)) + + # Disable periodic reporting + tc_queue.appendleft((QueueCommands.PRINT, + "Disabling periodic thermal sensor packet generation: ")) + command = PusTelecommand(service=3, subservice=Srv3Subservice.DISABLE_PERIODIC_HK_GEN.value, + ssc=current_ssc, app_data=test_sid) + current_ssc += 1 + tc_queue.appendleft(command.pack_command_tuple()) + + # Disable changing datasets via parameter service (Service 20) + tc_queue.appendleft((QueueCommands.PRINT, "Disabling changing datasets")) + command = pack_boolean_parameter_command( + object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS, + parameter=False, ssc=current_ssc + ) + current_ssc += 1 + tc_queue.appendleft(command.pack_command_tuple()) + return current_ssc + + +def pack_notification_basic_test(tc_queue: TcQueueT, object_id: bytearray, init_ssc: int, + enable_normal_mode: bool = True) -> int: + current_ssc = init_ssc + tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Performing notification tests")) + + if enable_normal_mode: + # Set mode normal so that sets are changed/read regularly + tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data) + current_ssc += 1 + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "Triggering notification")) + command = generate_action_command( + object_id=object_id, action_id=TEST_NOTIFICATION_ACTION_ID, ssc=current_ssc + ) + tc_queue.appendleft(command.pack_command_tuple()) + current_ssc += 1 + return current_ssc diff --git a/pus_tc/service_8_func_cmd.py b/pus_tc/service_8_func_cmd.py new file mode 100644 index 0000000..81a3f3c --- /dev/null +++ b/pus_tc/service_8_func_cmd.py @@ -0,0 +1,56 @@ +from tmtccmd.config.definitions import QueueCommands +from tmtccmd.ecss.tc import PusTelecommand +from tmtccmd.pus_tc.definitions import TcQueueT + +from common_tmtc import pus_tc as cmd_data +from common_tmtc.pus_tc.service_200_mode import pack_mode_data + +from common_tmtc.config.object_ids import TEST_DEVICE_0_ID + + +def pack_service_8_commands_into(tc_queue: TcQueueT, op_code: str): + if op_code == "0": + pack_generic_service_8_test_into(tc_queue=tc_queue) + else: + print(f"pack_service_8_test: Operation code {op_code} unknown!") + + +def pack_generic_service_8_test_into(tc_queue: TcQueueT): + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8")) + object_id = TEST_DEVICE_0_ID + + # set mode on + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set On Mode")) + mode_data = pack_mode_data(object_id, 1, 0) + command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + # set mode normal + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Normal Mode")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers completion reply + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Trigger Step and Completion Reply")) + action_id = cmd_data.TEST_COMMAND_0 + direct_command = object_id + action_id + command = PusTelecommand(service=8, subservice=128, ssc=820, app_data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers _tm_data reply + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Trigger Data Reply")) + action_id = cmd_data.TEST_COMMAND_1 + command_param1 = cmd_data.TEST_COMMAND_1_PARAM_1 + command_param2 = cmd_data.TEST_COMMAND_1_PARAM_2 + direct_command = object_id + action_id + command_param1 + command_param2 + command = PusTelecommand(service=8, subservice=128, ssc=830, app_data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + + # Set mode off + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Off Mode")) + mode_data = pack_mode_data(object_id, 0, 0) + command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service_8.txt")) diff --git a/pus_tc/tc_packing.py b/pus_tc/tc_packing.py new file mode 100644 index 0000000..c4d1113 --- /dev/null +++ b/pus_tc/tc_packing.py @@ -0,0 +1,50 @@ +""" +@brief This file transfers control of TC packing to the user +@details Template configuration file. Copy this folder to the TMTC commander root and adapt + it to your needs. +""" + +import os +from collections import deque +from typing import Union + +from common_tmtc.pus_tc.service_20_parameters import pack_service20_commands_into +from common_tmtc.pus_tc.service_2_raw_cmd import pack_service_2_commands_into +from common_tmtc.pus_tc.service_3_housekeeping import pack_service_3_commands_into +from common_tmtc.pus_tc.service_17_test import pack_service_17_commands +from common_tmtc.pus_tc.service_8_func_cmd import pack_service_8_commands_into +from tmtccmd.utility.logger import get_console_logger +from tmtccmd.pus_tc.definitions import TcQueueT +from tmtccmd.config.definitions import CoreServiceList +from tmtccmd.pus_tc.service_5_event import pack_generic_service5_test_into +from tmtccmd.pus_tc.service_17_test import pack_generic_service17_test +from common_tmtc.pus_tc.service_200_mode import pack_service_200_commands_into + +LOGGER = get_console_logger() + + +def pack_service_queue_user(service: Union[str, int], op_code: str, tc_queue: TcQueueT): + if service == CoreServiceList.SERVICE_2.value: + return pack_service_2_commands_into(op_code=op_code, tc_queue=tc_queue) + if service == CoreServiceList.SERVICE_3.value: + return pack_service_3_commands_into(op_code=op_code, tc_queue=tc_queue) + if service == CoreServiceList.SERVICE_5.value: + return pack_generic_service5_test_into(tc_queue=tc_queue) + if service == CoreServiceList.SERVICE_8.value: + return pack_service_8_commands_into(op_code=op_code, tc_queue=tc_queue) + if service == CoreServiceList.SERVICE_17.value: + return pack_service_17_commands(op_code=op_code, tc_queue=tc_queue, init_ssc=0) + if service == CoreServiceList.SERVICE_20.value: + return pack_service20_commands_into(tc_queue=tc_queue, op_code=op_code) + if service == CoreServiceList.SERVICE_200.value: + return pack_service_200_commands_into(tc_queue=tc_queue, op_code=op_code) + LOGGER.warning("Invalid Service !") + + +def create_total_tc_queue_user() -> TcQueueT: + if not os.path.exists("log"): + os.mkdir("log") + tc_queue = deque() + pack_service_2_commands_into(op_code="0", tc_queue=tc_queue) + pack_generic_service17_test(init_ssc=1700, tc_queue=tc_queue) + return tc_queue diff --git a/pus_tm/__init__.py b/pus_tm/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pus_tm/factory_hook.py b/pus_tm/factory_hook.py new file mode 100644 index 0000000..3bd23a3 --- /dev/null +++ b/pus_tm/factory_hook.py @@ -0,0 +1,52 @@ +""" +@brief This file transfers control of TM parsing to the user +@details Template configuration file. Copy this folder to the TMTC commander root and adapt + it to your needs. +""" + +from tmtccmd.ecss.tm import PusTelemetry +from tmtccmd.utility.logger import get_console_logger +from tmtccmd.pus_tm.service_1_verification import Service1TM +from tmtccmd.pus_tm.service_2_raw_cmd import Service2TM +from tmtccmd.pus_tm.service_3_housekeeping import Service3TM +from tmtccmd.pus_tm.service_5_event import Service5TM +from tmtccmd.pus_tm.service_8_functional_cmd import Service8TM +from tmtccmd.pus_tm.service_17_test import Service17TM +from tmtccmd.pus_tm.service_20_parameters import Service20TM +from tmtccmd.pus_tm.service_200_mode import Service200TM +from tmtccmd.utility.tmtc_printer import TmTcPrinter + +from common_tmtc.config.definitions import PUS_APID + + +LOGGER = get_console_logger() + + +def ccsds_tm_handler(apid: int, raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter) -> None: + if apid == PUS_APID: + pus_packet_factory(raw_tm_packet=raw_tm_packet, tmtc_printer=tmtc_printer) + + +def pus_packet_factory(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter): + service_type = raw_tm_packet[7] + tm_packet = None + if service_type == 1: + tm_packet = Service1TM(raw_tm_packet) + if service_type == 2: + tm_packet = Service2TM(raw_tm_packet) + if service_type == 3: + tm_packet = Service3TM(raw_tm_packet) + if service_type == 8: + tm_packet = Service8TM(raw_tm_packet) + if service_type == 5: + tm_packet = Service5TM(raw_tm_packet) + if service_type == 17: + tm_packet = Service17TM(raw_tm_packet) + if service_type == 20: + tm_packet = Service20TM(raw_tm_packet) + if service_type == 200: + tm_packet = Service200TM(raw_tm_packet) + if tm_packet is None: + LOGGER.info("The service " + str(service_type) + " is not implemented in Telemetry Factory") + tm_packet = PusTelemetry(raw_tm_packet) + tmtc_printer.print_telemetry(packet=tm_packet) diff --git a/pus_tm/service_3_hk_handling.py b/pus_tm/service_3_hk_handling.py new file mode 100644 index 0000000..d9966f7 --- /dev/null +++ b/pus_tm/service_3_hk_handling.py @@ -0,0 +1,70 @@ +""" +@brief This file transfers control of housekeeping handling (PUS service 3) to the + developer +@details Template configuration file. Copy this folder to the TMTC commander root and adapt + it to your needs. +""" +import struct +from typing import Tuple +from tmtccmd.pus_tm.service_3_housekeeping import Service3Base +from tmtccmd.utility.logger import get_console_logger + +from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID +LOGGER = get_console_logger() + + +def service_3_hk_handling( + object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base +) -> Tuple[list, list, bytearray, int]: + """ + This function is called when a Service 3 Housekeeping packet is received. + + Please note that the object IDs should be compared by value because direct comparison of + enumerations does not work in Python. For example use: + + if object_id.value == ObjectIds.TEST_OBJECT.value + + to test equality based on the object ID list. + + @param object_id: + @param set_id: + @param hk_data: + @param service3_packet: + @return: Expects a tuple, consisting of two lists, a bytearray and an integer + The first list contains the header columns, the second list the list with + the corresponding values. The bytearray is the validity buffer, which is usually appended + at the end of the housekeeping packet. The last value is the number of parameters. + """ + if object_id == TEST_DEVICE_0_ID or object_id == TEST_DEVICE_1_ID: + return handle_test_set_deserialization(hk_data=hk_data) + else: + LOGGER.info("Service3TM: Parsing for this SID has not been implemented.") + return [], [], bytearray(), 0 + + +def handle_test_set_deserialization(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: + header_list = [] + content_list = [] + validity_buffer = bytearray() + # uint8 (1) + uint32_t (4) + float vector with 3 entries (12) + validity buffer (1) + if len(hk_data) < 18: + LOGGER.warning("Invalid HK data format for test set reply!") + return header_list, content_list, validity_buffer, 0 + uint8_value = struct.unpack('!B', hk_data[0:1])[0] + uint32_value = struct.unpack('!I', hk_data[1:5])[0] + float_value_1 = struct.unpack('!f', hk_data[5:9])[0] + float_value_2 = struct.unpack('!f', hk_data[9:13])[0] + float_value_3 = struct.unpack('!f', hk_data[13:17])[0] + validity_buffer.append(hk_data[17]) + header_list.append("uint8 value") + header_list.append("uint32 value") + header_list.append("float vec value 1") + header_list.append("float vec value 2") + header_list.append("float vec value 3") + + content_list.append(uint8_value) + content_list.append(uint32_value) + content_list.append(float_value_1) + content_list.append(float_value_2) + content_list.append(float_value_3) + return header_list, content_list, validity_buffer, 3 diff --git a/pus_tm/service_8_handling.py b/pus_tm/service_8_handling.py new file mode 100644 index 0000000..259e7e4 --- /dev/null +++ b/pus_tm/service_8_handling.py @@ -0,0 +1,19 @@ +from typing import Tuple + + +def custom_service_8_handling( + object_id: int, action_id: int, custom_data: bytearray) -> Tuple[list, list]: + """ + This function is called by the TMTC core if a Service 8 data reply (subservice 130) + is received. The user can return a tuple of two lists, where the first list + is a list of header strings to print and the second list is a list of values to print. + The TMTC core will take care of printing both lists and logging them. + + @param object_id: + @param action_id: + @param custom_data: + @return: + """ + header_list = [] + content_list = [] + return header_list, content_list