diff --git a/.gitignore b/.gitignore index 0fe4197..50cba5f 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,12 @@ log .idea/* !.idea/runConfigurations -*.json \ No newline at end of file +*.json + +/Lib +/Scripts +/pyvenv.cfg +/bin +/lib +/lib64 +/share diff --git a/README.md b/README.md index 0b7b7ab..c52a047 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,46 @@ -### How to use this folder +# TMTC Commander EIVE -This folder contains template files to set up the TMTC commander -for a new mission or project. These files are the adaption -point to customize the TMTC commander. +# Set up virtual environment + +## Linux + +1. Create virtual environment + +```sh +python3 -m venv . +``` + +2. Activate virtual environment + +```sh +./Scripts/activate +``` + +3. Install `tmtccmd` for virtual environment. `-e` for interactive installation. + +```sh +cd tmtccmd +python3 -m pip install -e .[gui] +``` + +## Windows + +1. Create virtual environment + +```sh +py -m venv . +``` + +2. Activate virtual environment + +```sh +Scripts\activate.bat +``` + +3. Install `tmtccmd` for virtual environment. `-e` for interactive installation. + +```sh +cd tmtccmd +py -m pip install -e .[gui] +``` -To do so, simply copy all folder inside the TMTC commander root. This -step is also required because the TMTC commander core will load some modules. diff --git a/config/definitions.py b/config/definitions.py index a0b54b5..75b7b21 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -7,6 +7,9 @@ import enum +PUS_APID = 0x65 + + class CustomServiceList(enum.Enum): TEST_DEVICE = "test", P60DOCK = "p60dock" @@ -15,6 +18,8 @@ class CustomServiceList(enum.Enum): ACU = "acu" TMP1075_1 = "tmp1075_1" TMP1075_2 = "tmp1075_2" - HEATER = "heater", + HEATER = "heater" + IMTQ = "imtq" + PLOC = "ploc" PCDU = "pcdu", SA_DEPLYOMENT = "sa_depl" diff --git a/config/globals_config.py b/config/globals_config.py index c1199c6..b8274bb 100644 --- a/config/globals_config.py +++ b/config/globals_config.py @@ -9,7 +9,7 @@ import argparse # All globals can be added here and will be part of a globals dictionary. -from config.definitions import CustomServiceList +from config.definitions import CustomServiceList, PUS_APID from config.custom_mode_op import CustomModeList from tmtccmd.config.definitions import CoreComInterfaces from tmtccmd.config.globals import set_default_globals_pre_args_parsing, \ @@ -25,7 +25,8 @@ class CustomGlobalIds(enum.Enum): def set_globals_pre_args_parsing(gui: bool = False): - set_default_globals_pre_args_parsing(gui=gui, apid=0x65, com_if_id=CoreComInterfaces.TCPIP_UDP.value) + set_default_globals_pre_args_parsing( + gui=gui, apid=PUS_APID, com_if_id=CoreComInterfaces.TCPIP_UDP.value) def add_globals_post_args_parsing(args: argparse.Namespace, json_cfg_path: str): diff --git a/config/hook_implementations.py b/config/hook_implementations.py index 498f921..3b0e2ac 100644 --- a/config/hook_implementations.py +++ b/config/hook_implementations.py @@ -50,6 +50,19 @@ class EiveHookObject(TmTcHookBase): } service_heater_tuple = ("Heater Device", op_code_dict_srv_heater) + op_code_dict_srv_imtq = { + "0": ("IMTQ Tests All", {OpCodeDictKeys.TIMEOUT: 2.0}), + "1": ("IMTQ perform pos X self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "2": ("IMTQ perform neg X self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "3": ("IMTQ perform pos Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "4": ("IMTQ perform neg Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "5": ("IMTQ perform pos Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "6": ("IMTQ perform neg Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "7": ("IMTQ command dipole", {OpCodeDictKeys.TIMEOUT: 2.0}), + "8": ("IMTQ get commanded dipole", {OpCodeDictKeys.TIMEOUT: 2.0}), + } + service_imtq_tuple = ("IMTQ Device", op_code_dict_srv_imtq) + service_op_code_dict[CustomServiceList.ACU.value] = service_acu_tuple service_op_code_dict[CustomServiceList.TMP1075_1.value] = service_tmp1075_1_tuple service_op_code_dict[CustomServiceList.TMP1075_2.value] = service_tmp1075_2_tuple @@ -58,6 +71,7 @@ class EiveHookObject(TmTcHookBase): service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu2_tuple service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu2_tuple service_op_code_dict[CustomServiceList.HEATER.value] = service_heater_tuple + service_op_code_dict[CustomServiceList.IMTQ.value] = service_imtq_tuple return service_op_code_dict def get_json_config_file_path(self) -> str: @@ -83,7 +97,8 @@ class EiveHookObject(TmTcHookBase): Union[CommunicationInterface, None]: from tmtccmd.config.com_if import create_communication_interface_default return create_communication_interface_default( - com_if_key=com_if_key, tmtc_printer=tmtc_printer, json_cfg_path=self.get_json_config_file_path() + com_if_key=com_if_key, tmtc_printer=tmtc_printer, + json_cfg_path=self.get_json_config_file_path() ) def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int): @@ -113,7 +128,7 @@ class EiveHookObject(TmTcHookBase): @staticmethod def handle_service_3_housekeeping( - object_id: int, set_id: int, hk_data: bytearray, service3_packet: Service3Base + object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base ) -> Tuple[list, list, bytearray, int]: from pus_tm.hk_handling import handle_user_hk_packet return handle_user_hk_packet( diff --git a/config/object_ids.py b/config/object_ids.py index 1ae3693..3dd4c29 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -17,6 +17,8 @@ HEATER_ID = bytes([0x54, 0x00, 0x00, 0x1]) PCDU_HANDLER_ID = bytes([0x44, 0x00, 0x10, 0x00]) SOLAR_ARRAY_DEPLOYMENT_ID = bytes([0x44, 0x00, 0x10, 0x01]) SYRLINKS_HANDLER = bytes([0x44, 0x00, 0x10, 0x02]) +IMTQ_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x14]) +PLOC_ID = bytearray([0x44, 0x00, 0x00, 0x15]) def get_object_ids() -> Dict[bytes, list]: diff --git a/config/version.py b/config/version.py index a6e02b7..ef8fd1f 100644 --- a/config/version.py +++ b/config/version.py @@ -1,4 +1,4 @@ SW_NAME = "eive" VERSION_MAJOR = 1 -VERSION_MINOR = 5 -VERSION_SUBMINOR = 0 +VERSION_MINOR = 6 +VERSION_SUBMINOR = 1 diff --git a/gomspace/gomspace_pdu_definitions.py b/gomspace/gomspace_pdu_definitions.py index 009eb31..f695737 100644 --- a/gomspace/gomspace_pdu_definitions.py +++ b/gomspace/gomspace_pdu_definitions.py @@ -6,6 +6,11 @@ class PDUConfigTable: out_en_1 = TableEntry(bytearray([0x00, 0x49]), TableEntry.uint8_size) out_en_2 = TableEntry(bytearray([0x00, 0x4A]), TableEntry.uint8_size) out_en_3 = TableEntry(bytearray([0x00, 0x4B]), TableEntry.uint8_size) + out_en_4 = TableEntry(bytearray([0x00, 0x4C]), TableEntry.uint8_size) + out_en_5 = TableEntry(bytearray([0x00, 0x4D]), TableEntry.uint8_size) + out_en_6 = TableEntry(bytearray([0x00, 0x4E]), TableEntry.uint8_size) + out_en_7 = TableEntry(bytearray([0x00, 0x4F]), TableEntry.uint8_size) + out_en_8 = TableEntry(bytearray([0x00, 0x50]), TableEntry.uint8_size) # When channel consumes more than cur_lu_lim, channel is turned of immediately cur_lu_lim_0 = TableEntry(bytearray([0x00, 0xB8]), TableEntry.uint16_size) diff --git a/pus_tc/imtq.py b/pus_tc/imtq.py new file mode 100644 index 0000000..c6bee28 --- /dev/null +++ b/pus_tc/imtq.py @@ -0,0 +1,196 @@ +# -*- coding: utf-8 -*- +""" +@file imtq.py +@brief Tests for the ISIS IMTQ (Magnettorquer) device handler +@author J. Meier +@date 25.03.2021 +""" +from tmtccmd.config.definitions import QueueCommands + +from tmtccmd.pus_tc.packer import TcQueueT +from tmtccmd.ecss.tc import PusTelecommand +from tmtccmd.pus_tc.service_3_housekeeping import make_sid, generate_one_hk_command + + +class ImtqTestProcedure: + """ + @brief Use this class to define the tests to perform for the IMTQ Handler. + @details Setting all to True will run all tests. + Setting all to False will only run the tests set to True. + """ + all = False + command_dipole = False + get_commanded_dipole = False + positive_x_test = True + negative_x_test = False + positive_y_test = False + negative_y_test = False + positive_z_test = False + negative_z_test = False + + +class ImtqSetIds: + ENG_HK_SET = 1 + CAL_MTM_SET = 2 + RAW_MTM_SET = 3 + POSITIVE_X_TEST = 4 + NEGATIVE_X_TEST = 5 + POSITIVE_Y_TEST = 6 + NEGATIVE_Y_TEST = 7 + POSITIVE_Z_TEST = 8 + NEGATIVE_Z_TEST = 9 + + +class ImtqActionIds: + start_actuation_dipole = bytearray([0x0, 0x0, 0x0, 0x02]) + get_commanded_dipole = bytearray([0x0, 0x0, 0x0, 0x03]) + perform_positive_x_test = bytearray([0x0, 0x0, 0x0, 0x07]) + perform_negative_x_test = bytearray([0x0, 0x0, 0x0, 0x08]) + perform_positive_y_test = bytearray([0x0, 0x0, 0x0, 0x09]) + perform_negative_y_test = bytearray([0x0, 0x0, 0x0, 0x0A]) + perform_positive_z_test = bytearray([0x0, 0x0, 0x0, 0x0B]) + perform_negative_z_test = bytearray([0x0, 0x0, 0x0, 0x0C]) + # Initiates the reading of the last performed self test. After sending this command the results can be downlinked + # via the housekeeping service by using the appropriate set ids listed above. + read_self_test_results = bytearray([0x0, 0x0, 0x0, 0x0D]) + + +def pack_imtq_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT: + tc_queue.appendleft( + (QueueCommands.PRINT, + "Testing ISIS IMTQ handler with object id: 0x" + object_id.hex()) + ) + + if op_code == "0" or op_code == "1": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform positive x self test")) + command = object_id + ImtqActionIds.perform_positive_x_test + command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Initiate reading of positive x self test results")) + command = object_id + ImtqActionIds.read_self_test_results + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Request dataset with positive x self test results")) + sid = make_sid(object_id, ImtqSetIds.POSITIVE_X_TEST) + command = generate_one_hk_command(sid, 24) + tc_queue.appendleft(command.pack_command_tuple()) + + if op_code == "0" or op_code == "2": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform negative x self test")) + command = object_id + ImtqActionIds.perform_negative_x_test + command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Initiate reading of negative x self test results")) + command = object_id + ImtqActionIds.read_self_test_results + command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Request dataset with negative x self test results")) + sid = make_sid(object_id, ImtqSetIds.NEGATIVE_X_TEST) + command = generate_one_hk_command(sid, 27) + tc_queue.appendleft(command.pack_command_tuple()) + + if op_code == "0" or op_code == "3": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform positive y self test")) + command = object_id + ImtqActionIds.perform_positive_y_test + command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Initiate reading of positive y self test results")) + command = object_id + ImtqActionIds.read_self_test_results + command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Request dataset with positive y self test results")) + sid = make_sid(object_id, ImtqSetIds.POSITIVE_Y_TEST) + command = generate_one_hk_command(sid, 30) + tc_queue.appendleft(command.pack_command_tuple()) + + if op_code == "0" or op_code == "4": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform negative y self test")) + command = object_id + ImtqActionIds.perform_negative_y_test + command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Initiate reading of negative y self test results")) + command = object_id + ImtqActionIds.read_self_test_results + command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Request dataset with negative y self test results")) + sid = make_sid(object_id, ImtqSetIds.NEGATIVE_Y_TEST) + command = generate_one_hk_command(sid, 33) + tc_queue.appendleft(command.pack_command_tuple()) + + if op_code == "0" or op_code == "5": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform positive z self test")) + command = object_id + ImtqActionIds.perform_positive_z_test + command = PusTelecommand(service=8, subservice=128, ssc=34, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Initiate reading of positive z self test results")) + command = object_id + ImtqActionIds.read_self_test_results + command = PusTelecommand(service=8, subservice=128, ssc=35, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Request dataset with positive z self test results")) + sid = make_sid(object_id, ImtqSetIds.POSITIVE_Y_TEST) + command = generate_one_hk_command(sid, 36) + tc_queue.appendleft(command.pack_command_tuple()) + + if op_code == "0" or op_code == "6": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform negative z self test")) + command = object_id + ImtqActionIds.perform_negative_z_test + command = PusTelecommand(service=8, subservice=128, ssc=35, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Initiate reading of negative z self test results")) + command = object_id + ImtqActionIds.read_self_test_results + command = PusTelecommand(service=8, subservice=128, ssc=36, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Request dataset with negative z self test results")) + sid = make_sid(object_id, ImtqSetIds.NEGATIVE_Z_TEST) + command = generate_one_hk_command(sid, 37) + tc_queue.appendleft(command.pack_command_tuple()) + + if op_code == "0" or op_code == "7": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Commanding dipole")) + x_dipole = 0 + y_dipole = 0 + z_dipole = 0 + duration = 0 # ms + command = pack_dipole_command(object_id, x_dipole, y_dipole, z_dipole, duration) + command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + if op_code == "0" or op_code == "8": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Get commanded dipole")) + command = object_id + ImtqActionIds.get_commanded_dipole + command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + return tc_queue + + +def pack_dipole_command(object_id: bytearray, x_dipole: int, y_dipole: int, z_dipole: int, duration: int) -> bytearray: + """ This function packs the command causing the ISIS IMTQ to generate a dipole. + @param object_id The object id of the gomspace device handler. + @param x_dipole The dipole of the x coil in 10^-4*Am^2 (max. 2000) + @param y_dipole The dipole of the y coil in 10^-4*Am^2 (max. 2000) + @param z_dipole The dipole of the z coil in 10^-4*Am^2 (max. 2000) + @param duration The duration in milliseconds the dipole will be generated by the coils. + When set to 0, the dipole will be generated until a new dipole actuation + command is sent. + """ + action_id = ImtqActionIds.start_actuation_dipole + command = bytearray() + command = object_id + action_id + command.extend(x_dipole.to_bytes(length=2, byteorder='big')) + command.extend(y_dipole.to_bytes(length=2, byteorder='big')) + command.extend(z_dipole.to_bytes(length=2, byteorder='big')) + command.extend(duration.to_bytes(length=2, byteorder='big')) + return command diff --git a/pus_tc/pdu1.py b/pus_tc/pdu1.py index 27c0dd3..0130040 100644 --- a/pus_tc/pdu1.py +++ b/pus_tc/pdu1.py @@ -24,6 +24,10 @@ class PDU1TestProcedure: reboot = False ping = False read_temperature = False + turn_channel_2_on = False # Star Tracker connected to this channel (5V) + turn_channel_2_off = False + turn_channel_3_on = False # MTQ connected to this channel (5V) + turn_channel_3_off = True def pack_pdu1_test_into( @@ -31,14 +35,6 @@ def pack_pdu1_test_into( ): tc_queue.appendleft((QueueCommands.PRINT, "Testing PDU1")) - tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Enabling PDU1")) - command = pack_set_param_command( - p60dock_object_id, P60DockConfigTable.out_en_1.parameter_address, - P60DockConfigTable.out_en_1.parameter_size, Channel.on - ) - command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) - if PDU1TestProcedure.all or PDU1TestProcedure.ping: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Ping Test")) ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) @@ -53,3 +49,27 @@ def pack_pdu1_test_into( ) command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) + if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_2_on: + tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 2 on (Star Tracker)")) + command = pack_set_param_command(pdu1_object_id, PDUConfigTable.out_en_2.parameter_address, + PDUConfigTable.out_en_2.parameter_size, Channel.on) + command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_2_off: + tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 2 off (Star Tracker)")) + command = pack_set_param_command(pdu1_object_id, PDUConfigTable.out_en_2.parameter_address, + PDUConfigTable.out_en_2.parameter_size, Channel.off) + command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_on: + tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 on (MTQ)")) + command = pack_set_param_command(pdu1_object_id, PDUConfigTable.out_en_3.parameter_address, + PDUConfigTable.out_en_3.parameter_size, Channel.on) + command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_off: + tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 off (MTQ)")) + command = pack_set_param_command(pdu1_object_id, PDUConfigTable.out_en_3.parameter_address, + PDUConfigTable.out_en_3.parameter_size, Channel.off) + command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) diff --git a/pus_tc/pdu2.py b/pus_tc/pdu2.py index c502940..e6e8080 100644 --- a/pus_tc/pdu2.py +++ b/pus_tc/pdu2.py @@ -101,4 +101,5 @@ def pack_pdu2_test_into(pdu2_object_id: bytearray, p60dock_object_id: bytearray, command = pack_request_full_hk_table_command(pdu2_object_id) command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) + return tc_queue diff --git a/pus_tc/ploc.py b/pus_tc/ploc.py new file mode 100644 index 0000000..b8d593a --- /dev/null +++ b/pus_tc/ploc.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +""" +@file ploc.py +@brief TMP1075 tests +@author J. Meier +@date 06.01.2021 +""" +import struct + +from tmtccmd.config.definitions import QueueCommands + +from tmtccmd.pus_tc.packer import TcQueueT +from tmtccmd.ecss.tc import PusTelecommand + + +class PlocTestProcedure: + """ + @brief Use this class to define the tests to perform for the PLOC. + @details Setting all to True will run all tests. + Setting all to False will only run the tests set to True. + """ + all = False + test_tc_mem_write = False + test_tc_mem_read = True + + +class PlocActionIds: + tc_mem_write = bytearray([0x0, 0x0, 0x0, 0x1]) + tc_mem_read = bytearray([0x0, 0x0, 0x0, 0x2]) + + +class PlocReplyIds: + tm_mem_read_report = 6 + + +def pack_ploc_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: + tc_queue.appendleft( + (QueueCommands.PRINT, + "Testing PLOC Handler with object id: 0x" + object_id.hex()) + ) + + if PlocTestProcedure.all or PlocTestProcedure.test_tc_mem_write: + tc_queue.appendleft((QueueCommands.PRINT, "PLOC: TC Mem Write Test")) + memory_address = int(input("PLOC Tc Mem Write: Type memory address: 0x"), 16) + memory_data = int(input("PLOC Tc Mem Write: Type memory data: 0x"), 16) + command = generate_write_mem_command(object_id, struct.pack('!I', memory_address), memory_data) + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + if PlocTestProcedure.all or PlocTestProcedure.test_tc_mem_read: + tc_queue.appendleft((QueueCommands.PRINT, "PLOC: TC Mem Read Test")) + memory_address = int(input("PLOC Tc Mem Read: Type memory address: 0x"), 16) + command = object_id + PlocActionIds.tc_mem_read + struct.pack('!I', memory_address) + command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + return tc_queue + + +def generate_write_mem_command(object_id: bytearray, memory_address: bytearray, memory_data: int) -> bytearray: + """ This function generates the command to write to a memory address within the PLOC + @param object_id The object id of the PlocHandler + @param memory_address The PLOC memory address where to write to. + @param memory_data The data to write to the memory address specified by the bytearray memory_address. + """ + command = object_id + PlocActionIds.tc_mem_write + memory_address + struct.pack('!I', memory_data) + return command diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index aaa84d2..41b572b 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -18,11 +18,13 @@ from pus_tc.p60dock import pack_p60dock_test_into from pus_tc.pdu2 import pack_pdu2_test_into from pus_tc.pdu1 import pack_pdu1_test_into from pus_tc.acu import pack_acu_test_into +from pus_tc.imtq import pack_imtq_test_into from pus_tc.tmp1075 import pack_tmp1075_test_into +from pus_tc.ploc import pack_ploc_test_into from pus_tc.heater import pack_heater_test_into from config.definitions import CustomServiceList from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \ - TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID + TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_ID LOGGER = get_logger() @@ -60,6 +62,13 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, service_queu if service == CustomServiceList.HEATER.value: object_id = HEATER_ID return pack_heater_test_into(object_id=object_id, tc_queue=service_queue) + if service == CustomServiceList.IMTQ.value: + object_id = IMTQ_HANDLER_ID + return pack_imtq_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) + if service == CustomServiceList.PLOC.value: + object_id = PLOC_ID + return pack_ploc_test_into(object_id=object_id, tc_queue=service_queue) + LOGGER.warning("Invalid Service !") diff --git a/pus_tc/tmp1075.py b/pus_tc/tmp1075.py index 311c39f..b7ae123 100644 --- a/pus_tc/tmp1075.py +++ b/pus_tc/tmp1075.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -@file tmtcc_tc_tmp1075.py +@file tmp1075.py @brief TMP1075 tests @author J. Meier @date 06.01.2021 diff --git a/pus_tm/factory_hook.py b/pus_tm/factory_hook.py index 99053c2..ab2b264 100644 --- a/pus_tm/factory_hook.py +++ b/pus_tm/factory_hook.py @@ -8,25 +8,35 @@ from tmtccmd.ecss.tm import PusTelemetry from tmtccmd.utility.logger import get_logger from tmtccmd.pus_tm.service_1_verification import Service1TM -from tmtccmd.pus_tm.service_3_base import Service3Base +from tmtccmd.pus_tm.service_3_housekeeping import Service3TM from tmtccmd.pus_tm.service_5_event import Service5TM from tmtccmd.pus_tm.service_17_test import Service17TM +from tmtccmd.utility.tmtc_printer import TmTcPrinter + +from config.definitions import PUS_APID LOGGER = get_logger() -def tm_user_factory_hook(raw_tm_packet: bytearray) -> PusTelemetry: +def ccsds_tm_handler(apid: int, raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter) -> None: + if apid == PUS_APID: + pus_factory_hook(raw_tm_packet=raw_tm_packet, tmtc_printer=tmtc_printer) + + +def pus_factory_hook(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter): service_type = raw_tm_packet[7] + tm_packet = None if service_type == 1: - return Service1TM(raw_tm_packet) + tm_packet = Service1TM(raw_tm_packet) if service_type == 3: - return Service3Base(raw_tm_packet) + tm_packet = Service3TM(raw_tm_packet) if service_type == 5: - return Service5TM(raw_tm_packet) + tm_packet = Service5TM(raw_tm_packet) if service_type == 8: - service_8_tm = Service8TM(raw_tm_packet) - return Service8TM(raw_tm_packet) + tm_packet = Service8TM(raw_tm_packet) if service_type == 17: - return Service17TM(raw_tm_packet) - LOGGER.info("The service " + str(service_type) + " is not implemented in Telemetry Factory") - return PusTelemetry(raw_tm_packet) + tm_packet = Service17TM(raw_tm_packet) + if tm_packet is None: + LOGGER.info(f'The service {service_type} is not implemented in Telemetry Factory') + tm_packet = PusTelemetry(raw_tm_packet) + tmtc_printer.print_telemetry(packet=tm_packet) diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index b3d2595..f093ae7 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -4,19 +4,29 @@ @details Template configuration file. Copy this folder to the TMTC commander root and adapt it to your needs. """ +import struct from typing import Tuple -from tmtccmd.pus_tm.service_3_base import Service3Base +from tmtccmd.pus_tm.service_3_housekeeping import Service3Base from tmtccmd.utility.logger import get_logger - +from pus_tc.syrlinks_hk_handler import SetIds +from pus_tc.imtq import ImtqSetIds +from config.object_ids import SYRLINKS_HANDLER, IMTQ_HANDLER_ID LOGGER = get_logger() -def handle_user_hk_packet(object_id: int, set_id: int, hk_data: bytearray, +def handle_user_hk_packet(object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base) -> Tuple[list, list, bytearray, int]: """ This function is called when a Service 3 Housekeeping packet is received. + Please note that the object IDs should be compared by value because direct comparison of + enumerations does not work in Python. For example use: + + if object_id.value == ObjectIds.TEST_OBJECT.value + + to test equality based on the object ID list. + @param object_id: @param set_id: @param hk_data: @@ -26,5 +36,122 @@ def handle_user_hk_packet(object_id: int, set_id: int, hk_data: bytearray, the corresponding values. The bytearray is the validity buffer, which is usually appended at the end of the housekeeping packet. The last value is the number of parameters. """ - LOGGER.info("Service3TM: Parsing for this SID has not been implemented.") - return [], [], bytearray(), 0 + if object_id == SYRLINKS_HANDLER: + if set_id == SetIds.RX_REGISTERS_DATASET: + return handle_syrlinks_rx_registers_dataset(hk_data) + elif set_id == SetIds.TX_REGISTERS_DATASET: + return handle_syrlinks_tx_registers_dataset(hk_data) + else: + LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id") + return [], [], bytearray(), 0 + elif object_id == IMTQ_HANDLER_ID: + if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (set_id <= ImtqSetIds.NEGATIVE_Z_TEST): + return handle_self_test_data(hk_data) + else: + LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id") + return [], [], bytearray(), 0 + else: + LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.") + return [], [], bytearray(), 0 + + +def handle_syrlinks_rx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: + hk_header = [] + hk_content = [] + validity_buffer = bytearray() + hk_header = ["RX Status", "RX Sensitivity", "RX Frequency Shift", "RX IQ Power", "RX AGC Value", "RX Demod Eb", + "RX Demod N0", "RX Datarate"] + rx_status = hk_data[0] + rx_sensitivity = struct.unpack('!I', hk_data[1:5]) + rx_frequency_shift = struct.unpack('!I', hk_data[5:9]) + rx_iq_power = struct.unpack('!H', hk_data[9:11]) + rx_agc_value = struct.unpack('!H', hk_data[11:13]) + rx_demod_eb = struct.unpack('!I', hk_data[13:17]) + rx_demod_n0 = struct.unpack('!I', hk_data[17:21]) + rx_data_rate = hk_data[21] + hk_content = [rx_status, rx_sensitivity, rx_frequency_shift, rx_iq_power, rx_agc_value, rx_demod_eb, rx_demod_n0, + rx_data_rate] + return hk_header, hk_content, validity_buffer, 8 + + +def handle_syrlinks_tx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: + hk_header = [] + hk_content = [] + validity_buffer = bytearray() + hk_header = ["TX Status", "TX Waveform", "TX AGC value"] + tx_status = hk_data[0] + tx_waveform = hk_data[1] + tx_agc_value = struct.unpack('!H', hk_data[2:4]) + hk_content = [tx_status, tx_waveform, tx_agc_value] + return hk_header, hk_content, validity_buffer, 3 + + +def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: + hk_header = [] + hk_content = [] + validity_buffer = bytearray() + hk_header = ["Init Err", "Init Raw Mag X [nT]", "Init Raw Mag Y [nT]", "Init Raw Mag Z [nT]", "Init Cal Mag X [nT]", + "Init Cal Mag Y [nT]", "Init Cal Mag Z [nT]", "Init Coil X Current [mA]", "Init Coil Y Current [mA]", + "Init Coil Z Current [mA]", "Init Coil X Temperature [°C]", "Init Coil Y Temperature [°C]", + "Init Coil Z Temperature [°C]", "Err", "Raw Mag X [nT]", "Raw Mag Y [nT]", "Raw Mag Z [nT]", + "Cal Mag X [nT]", "Cal Mag Y [nT]", "Cal Mag Z [nT]", "Coil X Current [mA]", "Coil Y Current [mA]", + "Coil Z Current [mA]", "Coil X Temperature [°C]", "Coil Y Temperature [°C]", "Coil Z Temperature [°C]", + "Fina Err", "Fina Raw Mag X [nT]", "Fina Raw Mag Y [nT]", "Fina Raw Mag Z [nT]", "Fina Cal Mag X [nT]", + "Fina Cal Mag Y [nT]", "Fina Cal Mag Z [nT]", "Fina Coil X Current [mA]", "Fina Coil Y Current [mA]", + "Fina Coil Z Current [mA]", "Fina Coil X Temperature [°C]", "Fina Coil Y Temperature [°C]", + "Fina Coil Z Temperature [°C]"] + # INIT step (no coil actuation) + init_err = hk_data[0] + init_raw_mag_x = struct.unpack('!f', hk_data[1:5])[0] + init_raw_mag_y = struct.unpack('!f', hk_data[5:9])[0] + init_raw_mag_z = struct.unpack('!f', hk_data[9:13])[0] + init_cal_mag_x = struct.unpack('!f', hk_data[13:17])[0] + init_cal_mag_y = struct.unpack('!f', hk_data[17:21])[0] + init_cal_mag_z = struct.unpack('!f', hk_data[21:25])[0] + init_coil_x_current = struct.unpack('!f', hk_data[25:29])[0] + init_coil_y_current = struct.unpack('!f', hk_data[29:33])[0] + init_coil_z_current = struct.unpack('!f', hk_data[33:37])[0] + init_coil_x_temperature = struct.unpack('!H', hk_data[37:39])[0] + init_coil_y_temperature = struct.unpack('!H', hk_data[39:41])[0] + init_coil_z_temperature = struct.unpack('!H', hk_data[41:43])[0] + + # Actuation step + err = hk_data[43] + raw_mag_x = struct.unpack('!f', hk_data[44:48])[0] + raw_mag_y = struct.unpack('!f', hk_data[48:52])[0] + raw_mag_z = struct.unpack('!f', hk_data[52:56])[0] + cal_mag_x = struct.unpack('!f', hk_data[56:60])[0] + cal_mag_y = struct.unpack('!f', hk_data[60:64])[0] + cal_mag_z = struct.unpack('!f', hk_data[64:68])[0] + coil_x_current = struct.unpack('!f', hk_data[68:72])[0] + coil_y_current = struct.unpack('!f', hk_data[72:76])[0] + coil_z_current = struct.unpack('!f', hk_data[76:80])[0] + coil_x_temperature = struct.unpack('!H', hk_data[80:82])[0] + coil_y_temperature = struct.unpack('!H', hk_data[82:84])[0] + coil_z_temperature = struct.unpack('!H', hk_data[84:86])[0] + + # FINA step (no coil actuation) + fina_err = hk_data[86] + fina_raw_mag_x = struct.unpack('!f', hk_data[87:91])[0] + fina_raw_mag_y = struct.unpack('!f', hk_data[91:95])[0] + fina_raw_mag_z = struct.unpack('!f', hk_data[95:99])[0] + fina_cal_mag_x = struct.unpack('!f', hk_data[99:103])[0] + fina_cal_mag_y = struct.unpack('!f', hk_data[103:107])[0] + fina_cal_mag_z = struct.unpack('!f', hk_data[107:111])[0] + fina_coil_x_current = struct.unpack('!f', hk_data[111:115])[0] + fina_coil_y_current = struct.unpack('!f', hk_data[115:119])[0] + fina_coil_z_current = struct.unpack('!f', hk_data[119:123])[0] + fina_coil_x_temperature = struct.unpack('!H', hk_data[123:125])[0] + fina_coil_y_temperature = struct.unpack('!H', hk_data[125:127])[0] + fina_coil_z_temperature = struct.unpack('!H', hk_data[127:129])[0] + + hk_content = [init_err, init_raw_mag_x, init_raw_mag_y, init_raw_mag_z, init_cal_mag_x, init_cal_mag_y, + init_cal_mag_z, init_coil_x_current, init_coil_y_current, init_coil_z_current, + init_coil_x_temperature, init_coil_y_temperature, init_coil_z_temperature, err, raw_mag_x, + init_raw_mag_y, raw_mag_z, cal_mag_x, cal_mag_y, cal_mag_z, coil_x_current, coil_y_current, + coil_z_current, coil_x_temperature, coil_y_temperature, coil_z_temperature, fina_err, fina_raw_mag_x, + fina_raw_mag_y, fina_raw_mag_z, fina_cal_mag_x, fina_cal_mag_y, fina_cal_mag_z, fina_coil_x_current, + fina_coil_y_current, fina_coil_z_current, fina_coil_x_temperature, fina_coil_y_temperature, + fina_coil_z_temperature] + + return hk_header, hk_content, validity_buffer, len(hk_header) diff --git a/pus_tm/hk_handling_hook.py b/pus_tm/hk_handling_hook.py deleted file mode 100644 index 32e6cba..0000000 --- a/pus_tm/hk_handling_hook.py +++ /dev/null @@ -1,79 +0,0 @@ -""" -@brief This file transfers control of housekeeping handling (PUS service 3) to the - developer -@details Template configuration file. Copy this folder to the TMTC commander root and adapt - it to your needs. -""" -import struct -from typing import Tuple - -from tmtccmd.pus_tm.service_3_housekeeping import Service3Base -from tmtccmd.utility.logger import get_logger -from pus_tc.syrlinks_hk_handler import SetIds -from config.object_ids import SYRLINKS_HANDLER -LOGGER = get_logger() - - -def handle_user_hk_packet(object_id: bytes, set_id: int, hk_data: bytearray, - service3_packet: Service3Base) -> Tuple[list, list, bytearray, int]: - """ - This function is called when a Service 3 Housekeeping packet is received. - - Please note that the object IDs should be compared by value because direct comparison of - enumerations does not work in Python. For example use: - - if object_id.value == ObjectIds.TEST_OBJECT.value - - to test equality based on the object ID list. - - @param object_id: - @param set_id: - @param hk_data: - @param service3_packet: - @return: Expects a tuple, consisting of two lists, a bytearray and an integer - The first list contains the header columns, the second list the list with - the corresponding values. The bytearray is the validity buffer, which is usually appended - at the end of the housekeeping packet. The last value is the number of parameters. - """ - if object_id == SYRLINKS_HANDLER: - if set_id == SetIds.RX_REGISTERS_DATASET: - return handle_syrlinks_rx_registers_dataset(hk_data) - elif set_id == SetIds.TX_REGISTERS_DATASET: - return handle_syrlinks_tx_registers_dataset(hk_data) - else: - LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id") - return [], [], bytearray(), 0 - else: - LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.") - return [], [], bytearray(), 0 - - -def handle_syrlinks_rx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: - hk_header = [] - hk_content = [] - validity_buffer = bytearray() - hk_header = ["RX Status", "RX Sensitivity", "RX Frequency Shift", "RX IQ Power", "RX AGC Value", "RX Demod Eb", - "RX Demod N0", "RX Datarate"] - rx_status = hk_data[0] - rx_sensitivity = struct.unpack('!I', hk_data[1:5]) - rx_frequency_shift = struct.unpack('!I', hk_data[5:9]) - rx_iq_power = struct.unpack('!H', hk_data[9:11]) - rx_agc_value = struct.unpack('!H', hk_data[11:13]) - rx_demod_eb = struct.unpack('!I', hk_data[13:17]) - rx_demod_n0 = struct.unpack('!I', hk_data[17:21]) - rx_data_rate = hk_data[21] - hk_content = [rx_status, rx_sensitivity, rx_frequency_shift, rx_iq_power, rx_agc_value, rx_demod_eb, rx_demod_n0, - rx_data_rate] - return hk_header, hk_content, validity_buffer, 8 - - -def handle_syrlinks_tx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: - hk_header = [] - hk_content = [] - validity_buffer = bytearray() - hk_header = ["TX Status", "TX Waveform", "TX AGC value"] - tx_status = hk_data[0] - tx_waveform = hk_data[1] - tx_agc_value = struct.unpack('!H', hk_data[2:4]) - hk_content = [tx_status, tx_waveform, tx_agc_value] - return hk_header, hk_content, validity_buffer, 3 diff --git a/pus_tm/service_8_hook.py b/pus_tm/service_8_hook.py index 41d6eb9..6b491c0 100644 --- a/pus_tm/service_8_hook.py +++ b/pus_tm/service_8_hook.py @@ -1,5 +1,8 @@ +import struct from typing import Tuple -from config.object_ids import PDU_2_HANDLER_ID +from config.object_ids import * +from pus_tc.imtq import ImtqActionIds +from pus_tc.ploc import PlocReplyIds def user_analyze_service_8_data( @@ -25,7 +28,31 @@ def user_analyze_service_8_data( data_string = data_string.rstrip(',') data_string = data_string.rstrip() content_list = [data_string] + elif object_id == IMTQ_HANDLER_ID: + return handle_imtq_replies(action_id, custom_data) + elif object_id == PLOC_ID: + return handle_ploc_replies(action_id, custom_data) else: header_list = [] content_list = [] return header_list, content_list + + +def handle_imtq_replies(action_id: int, custom_data: bytearray) -> Tuple[list, list]: + header_list = [] + content_list = [] + if action_id == struct.unpack('!I', ImtqActionIds.get_commanded_dipole)[0]: + header_list = ['Commanded X-Dipole', 'Commanded Y-Dipole', 'Commanded Z-Dipole'] + x_dipole = struct.unpack('!H', custom_data[:2]) + y_dipole = struct.unpack('!H', custom_data[2:4]) + z_dipole = struct.unpack('!H', custom_data[4:6]) + content_list = [x_dipole[0], y_dipole[0], z_dipole[0]] + + +def handle_ploc_replies(action_id: int, custom_data: bytearray) -> Tuple[list, list]: + header_list = [] + content_list = [] + if action_id == PlocReplyIds.tm_mem_read_report: + header_list = ['PLOC Memory Address', 'PLOC Mem Len', 'PLOC Read Memory Data'] + content_list = [custom_data[:4], custom_data[4:6], custom_data[6:10]] + return header_list, content_list diff --git a/tmtc_client_cli.py b/tmtc_client_cli.py index c481101..283376e 100644 --- a/tmtc_client_cli.py +++ b/tmtc_client_cli.py @@ -26,13 +26,29 @@ limitations under the License. @author R. Mueller """ -from tmtccmd.runner import initialize_tmtc_commander, run_tmtc_commander +import sys + from config.hook_implementations import EiveHookObject +from config.definitions import PUS_APID +from pus_tm.factory_hook import ccsds_tm_handler +try: + from tmtccmd.runner import initialize_tmtc_commander, run_tmtc_commander, add_ccsds_handler + from tmtccmd.ccsds.handler import CcsdsTmHandler +except ImportError as error: + run_tmtc_commander = None + initialize_tmtc_commander = None + print(error) + print("Python tmtccmd submodule could not be imported") + print("Install with \"cd tmtccmd && python3 -m pip install -e .\" for interactive installation") + sys.exit(0) def main(): hook_obj = EiveHookObject() initialize_tmtc_commander(hook_object=hook_obj) + ccsds_handler = CcsdsTmHandler() + ccsds_handler.add_tm_handler(apid=PUS_APID, pus_tm_handler=ccsds_tm_handler, max_queue_len=50) + add_ccsds_handler(ccsds_handler) run_tmtc_commander(False) diff --git a/tmtccmd b/tmtccmd index 3f39a1f..b4358a1 160000 --- a/tmtccmd +++ b/tmtccmd @@ -1 +1 @@ -Subproject commit 3f39a1ffa17e8c74cadc789b3f4714c4bbd5f75f +Subproject commit b4358a15fd945a9e0103a707b2a2dc56c458b24a