From 0c334f615f24f3b5338045f767028c469316e222 Mon Sep 17 00:00:00 2001 From: "Jakob.Meier" Date: Tue, 29 Dec 2020 11:29:03 +0100 Subject: [PATCH] integrated pcu, acu and p60 dock tests --- .../tmtcclient_ACU_Test_UDP.xml | 24 +++ .../tmtcclient_P60_Dock_Test_UDP.xml | 24 +++ .../tmtcclient_PDU1_Test_UDP.xml | 24 +++ .../tmtcclient_PDU2_Test_UDP.xml | 24 +++ config/tmtcc_definitions.py | 4 + config/tmtcc_globals.py | 8 + config/tmtcc_object_ids.py | 8 + gomspace/gomspace_common.py | 121 +++++++++++++++ gomspace/gomspace_pdu_definitions.py | 16 ++ pus_tc/tmtcc_tc_acu.py | 121 +++++++++++++++ pus_tc/tmtcc_tc_p60dock.py | 141 ++++++++++++++++++ pus_tc/tmtcc_tc_packer_hook.py | 18 +++ pus_tc/tmtcc_tc_pdu1.py | 48 ++++++ pus_tc/tmtcc_tc_pdu2.py | 98 ++++++++++++ 14 files changed, 679 insertions(+) create mode 100644 .idea/runConfigurations/tmtcclient_ACU_Test_UDP.xml create mode 100644 .idea/runConfigurations/tmtcclient_P60_Dock_Test_UDP.xml create mode 100644 .idea/runConfigurations/tmtcclient_PDU1_Test_UDP.xml create mode 100644 .idea/runConfigurations/tmtcclient_PDU2_Test_UDP.xml create mode 100644 gomspace/gomspace_common.py create mode 100644 gomspace/gomspace_pdu_definitions.py create mode 100644 pus_tc/tmtcc_tc_acu.py create mode 100644 pus_tc/tmtcc_tc_p60dock.py create mode 100644 pus_tc/tmtcc_tc_pdu1.py create mode 100644 pus_tc/tmtcc_tc_pdu2.py diff --git a/.idea/runConfigurations/tmtcclient_ACU_Test_UDP.xml b/.idea/runConfigurations/tmtcclient_ACU_Test_UDP.xml new file mode 100644 index 0000000..f936f0f --- /dev/null +++ b/.idea/runConfigurations/tmtcclient_ACU_Test_UDP.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/tmtcclient_P60_Dock_Test_UDP.xml b/.idea/runConfigurations/tmtcclient_P60_Dock_Test_UDP.xml new file mode 100644 index 0000000..38bc875 --- /dev/null +++ b/.idea/runConfigurations/tmtcclient_P60_Dock_Test_UDP.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/tmtcclient_PDU1_Test_UDP.xml b/.idea/runConfigurations/tmtcclient_PDU1_Test_UDP.xml new file mode 100644 index 0000000..eb1e7fc --- /dev/null +++ b/.idea/runConfigurations/tmtcclient_PDU1_Test_UDP.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/tmtcclient_PDU2_Test_UDP.xml b/.idea/runConfigurations/tmtcclient_PDU2_Test_UDP.xml new file mode 100644 index 0000000..a69a933 --- /dev/null +++ b/.idea/runConfigurations/tmtcclient_PDU2_Test_UDP.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/config/tmtcc_definitions.py b/config/tmtcc_definitions.py index cfa0610..d5a4f24 100644 --- a/config/tmtcc_definitions.py +++ b/config/tmtcc_definitions.py @@ -28,6 +28,10 @@ class ServiceList(enum.Enum): SERVICE_20 = auto() SERVICE_23 = auto() SERVICE_200 = auto() + P60DOCK = auto() + PDU1 = auto() + PDU2 = auto() + ACU = auto() class SerialConfig(enum.Enum): diff --git a/config/tmtcc_globals.py b/config/tmtcc_globals.py index af522ab..6595b01 100644 --- a/config/tmtcc_globals.py +++ b/config/tmtcc_globals.py @@ -146,6 +146,14 @@ def add_globals_post_args_parsing(args: argparse.Namespace): service = ServiceList.SERVICE_20 elif service == "23": service = ServiceList.SERVICE_23 + elif service == "p60dock": + service = ServiceList.P60DOCK + elif service == "pdu1": + service = ServiceList.PDU1 + elif service == "pdu2": + service = ServiceList.PDU2 + elif service == "acu": + service = ServiceList.ACU else: logger.warning("Service not known! Setting standard service 17") service = ServiceList.SERVICE_17 diff --git a/config/tmtcc_object_ids.py b/config/tmtcc_object_ids.py index d2cc0d8..434d5e4 100644 --- a/config/tmtcc_object_ids.py +++ b/config/tmtcc_object_ids.py @@ -12,6 +12,10 @@ class ObjectIds(enum.Enum): from enum import auto PUS_SERVICE_17 = auto() TEST_DEVICE = auto() + P60DOCK_HANDLER_ID = auto() + PDU1_HANDLER_ID = auto() + PDU2_HANDLER_ID = auto() + ACU_HANDLER_ID = auto() def set_object_ids(object_id_dict: Dict[ObjectIds, bytearray]): @@ -19,5 +23,9 @@ def set_object_ids(object_id_dict: Dict[ObjectIds, bytearray]): object_id_dict.update( {o_ids.PUS_SERVICE_17: bytearray([0x53, 0x00, 0x00, 0x17]), o_ids.TEST_DEVICE: bytearray([0x44, 0x00, 0xAF, 0xFE]), + o_ids.P60DOCK_HANDLER_ID: bytearray([0x44, 0x00, 0x00, 0x1]), + o_ids.PDU1_HANDLER_ID: bytearray([0x44, 0x00, 0x00, 0x2]), + o_ids.PDU2_HANDLER_ID: bytearray([0x44, 0x00, 0x00, 0x3]), + o_ids.ACU_HANDLER_ID: bytearray([0x44, 0x00, 0x00, 0x4]), } ) diff --git a/gomspace/gomspace_common.py b/gomspace/gomspace_common.py new file mode 100644 index 0000000..7115c46 --- /dev/null +++ b/gomspace/gomspace_common.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +""" +@file gomspace_common.py +@brief PDU2 tests +@details All functions and classes common for all gomspace devices are defined in this file. +@author J. Meier +@date 17.12.2020 +""" + + +class GomspaceDeviceActions: + PING = bytearray([0x0, 0x0, 0x0, 0x1]) + REBOOT = bytearray([0x0, 0x0, 0x0, 0x4]) + PARAM_GET = bytearray([0x0, 0x0, 0x0, 0x00]) + PARAM_SET = bytearray([0x0, 0x0, 0x0, 0xFF]) + WDT_RESET = bytearray([0x0, 0x0, 0x0, 0x9]) + + +class TableIds: + config = 1 + hk = 4 + + +class TableEntry: + uint8_size = 1 + uint16_size = 2 + uint32_size = 4 + + def __init__(self, parameter_address: bytearray, parameter_size): + self.parameter_address = parameter_address + self.parameter_size = parameter_size + + +class Channel: + on = 1 + off = 0 + + +def pack_get_param_command(object_id: bytearray, table_id: int, memory_address: bytearray, + parameter_size: int) -> bytearray: + """ Function to generate a command to retrieve parameters like the temperature from a gomspace device. + @param object_id: The object id of the gomspace device handler. + @param table_id: The table id of the gomspace device + @param memory_address: Address offset within table of the value to read. + @param parameter_size: Size of the value to read. E.g. temperature is uint16_t and thus parameter_size is 2 + @return: The command as bytearray. + """ + action_id = GomspaceDeviceActions.PARAM_GET + command = bytearray() + command = command + object_id + action_id + command.append(table_id) + command = command + memory_address + command.append(parameter_size) + return command + + +def pack_set_param_command(object_id: bytearray, memory_address: bytearray, parameter_size: int, + parameter: int) -> bytearray: + """ Function to generate a command to set a parameter + @param object_id: The object id of the gomspace device handler. + @param memory_address: Address offset within table of the value to set. + @param parameter: The parameter value to set. + @param parameter_size: Size of the value to set. There are uint8_t, uint16_t and uint32_t in the device tables. + @return: The command as bytearray. + """ + action_id = GomspaceDeviceActions.PARAM_SET + command = bytearray() + command = command + object_id + action_id + command = command + memory_address + command.append(parameter_size) + if parameter_size == 1: + command.append(parameter) + elif parameter_size == 2: + byte_one = 0xFF00 & parameter >> 8 + byte_two = 0xFF & parameter + command.append(byte_one) + command.append(byte_two) + elif parameter_size == 4: + byte_one = 0xFF000000 & parameter >> 24 + byte_two = 0xFF0000 & parameter >> 16 + byte_three = 0xFF00 & parameter >> 8 + byte_four = 0xFF & parameter + command.append(byte_one) + command.append(byte_two) + command.append(byte_three) + command.append(byte_four) + return command + + +def pack_ping_command(object_id: bytearray, data: bytearray) -> bytearray: + """" Function to generate the command to ping a gomspace device + @param object_id Object Id of the gomspace device handler. + @param data Bytearray containing the bytes to send to the gomspace device. For now the on board software + supports only the handling of up to 33 bytes. + @note The ping request sends the specified data to a gompsace device. These + data are simply copied by the device and then sent back. + """ + action_id = GomspaceDeviceActions.PING + command = bytearray() + command = object_id + action_id + data + return command + + +def pack_gnd_wdt_reset_command(object_id: bytearray) -> bytearray: + """" Function to generate the command to reset the watchdog of a gomspace device. + @param object_id Object Id of the gomspace device handler. + """ + action_id = GomspaceDeviceActions.WDT_RESET + command = bytearray() + command = object_id + action_id + return command + + +def pack_reboot_command(object_id: bytearray) -> bytearray: + """ Function to generate the command which triggers a reboot of a gomspace device + @param object_id The object id of the gomspace device handler. + """ + action_id = GomspaceDeviceActions.REBOOT + command = bytearray() + command = object_id + action_id + return command diff --git a/gomspace/gomspace_pdu_definitions.py b/gomspace/gomspace_pdu_definitions.py new file mode 100644 index 0000000..8d8b408 --- /dev/null +++ b/gomspace/gomspace_pdu_definitions.py @@ -0,0 +1,16 @@ +from gomspace.gomspace_common import TableEntry + + +class PDUConfigTable: + out_en_0 = TableEntry(bytearray([0x00, 0x48]), TableEntry.uint8_size) + out_en_1 = TableEntry(bytearray([0x00, 0x49]), TableEntry.uint8_size) + out_en_2 = TableEntry(bytearray([0x00, 0x4A]), TableEntry.uint8_size) + out_en_3 = TableEntry(bytearray([0x00, 0x4B]), TableEntry.uint8_size) + # When channel consumes more than cur_lu_lim, channel is turned of immediately + cur_lu_lim_0 = TableEntry(bytearray([0x00, 0xB8]), TableEntry.uint16_size) + + +class PDUHkTable: + temperature = TableEntry(bytearray([0x00, 0x28]), TableEntry.uint16_size) + # Ground WDT value (remaining seconds until reboot) + wdt_gnd_left = TableEntry(bytearray([0x00, 0x80]), TableEntry.uint32_size) \ No newline at end of file diff --git a/pus_tc/tmtcc_tc_acu.py b/pus_tc/tmtcc_tc_acu.py new file mode 100644 index 0000000..cf38750 --- /dev/null +++ b/pus_tc/tmtcc_tc_acu.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +""" +@file tmtcc_tc_acu.py +@brief ACU tests +@author J. Meier +@date 21.12.2020 +""" + +from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand +from gomspace.gomspace_common import * +from pus_tc.tmtcc_tc_p60dock import P60DockConfigTable + + +class ACUTestProcedure: + """ + @brief Use this class to define the tests to perform for the ACU. + @details Setting all to True will run all tests. + Setting all to False will only run the tests set to True. + """ + all = False + reboot = False + read_gnd_wdt = False + gnd_wdt_reset = False + ping = False + read_temperature1 = False + read_temperature2 = False + read_temperature3 = True + read_mppt_mode = True + read_vboost = True + read_vbat_max_hi = True + read_vbat_max_lo = True + read_ov_mode = True + + +class ACUConfigTable: + mppt_mode = TableEntry(bytearray([0x00, 0x00]), TableEntry.uint8_size) + mppt_d_mode = TableEntry(bytearray([0x00, 0x01]), TableEntry.uint8_size) + vboost = TableEntry(bytearray([0x00, 0x02]), TableEntry.uint16_size) + vbat_max_hi = TableEntry(bytearray([0x00, 0x10]), TableEntry.uint16_size) + vbat_max_lo = TableEntry(bytearray([0x00, 0x12]), TableEntry.uint16_size) + ov_mode = TableEntry(bytearray([0x00, 0x1A]), TableEntry.uint8_size) + + +class ACUHkTable: + temperature1 = TableEntry(bytearray([0x00, 0x1C]), TableEntry.uint16_size) + temperature2 = TableEntry(bytearray([0x00, 0x1D]), TableEntry.uint16_size) + temperature3 = TableEntry(bytearray([0x00, 0x1E]), TableEntry.uint16_size) + # Ground WDT value (remaining seconds until reboot) + wdt_gnd_left = TableEntry(bytearray([0x00, 0x74]), TableEntry.uint32_size) + + +def pack_acu_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: + tc_queue.appendleft(("print", "Testing ACU")) + + tc_queue.appendleft(("print", "P60 Dock: Enabling ACU connected to X1 slot (channel 0)")) + command = pack_set_param_command(g.P60DOCK_HANDLER_ID, P60DockConfigTable.out_en_0.parameter_address, + P60DockConfigTable.out_en_0.parameter_size, Channel.on) + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + if ACUTestProcedure.all or ACUTestProcedure.reboot: + tc_queue.appendleft(("print", "ACU: Reboot")) + command = pack_reboot_command(object_id) + command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if ACUTestProcedure.all or ACUTestProcedure.read_gnd_wdt: + tc_queue.appendleft(("print", "ACU: Reading ground watchdog timer value")) + command = pack_get_param_command(object_id, TableIds.hk, ACUHkTable.wdt_gnd_left.parameter_address, + ACUHkTable.wdt_gnd_left.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if ACUTestProcedure.all or ACUTestProcedure.gnd_wdt_reset: + tc_queue.appendleft(("print", "ACU: Testing ground watchdog reset")) + command = pack_gnd_wdt_reset_command(object_id) + command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if ACUTestProcedure.all or ACUTestProcedure.ping: + tc_queue.appendleft(("print", "ACU: Ping Test")) + ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + command = pack_ping_command(object_id, ping_data) + command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if ACUTestProcedure.all or ACUTestProcedure.read_temperature3: + tc_queue.appendleft(("print", "ACU: Reading temperature 3")) + command = pack_get_param_command(object_id, TableIds.hk, ACUHkTable.temperature3.parameter_address, + ACUHkTable.temperature3.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if ACUTestProcedure.all or ACUTestProcedure.read_vboost: + tc_queue.appendleft(("print", "ACU: Reading vboost value")) + command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vboost.parameter_address, + ACUConfigTable.vboost.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_hi: + tc_queue.appendleft(("print", "ACU: Reading vbat_max_hi")) + command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vbat_max_hi.parameter_address, + ACUConfigTable.vbat_max_hi.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_lo: + tc_queue.appendleft(("print", "ACU: Reading vbat_max_lo")) + command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vbat_max_lo.parameter_address, + ACUConfigTable.vbat_max_lo.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if ACUTestProcedure.all or ACUTestProcedure.read_ov_mode: + tc_queue.appendleft(("print", "ACU: Reading ov_mode")) + command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.ov_mode.parameter_address, + ACUConfigTable.ov_mode.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft(("print", "P60 Dock: Turning off ACU")) + command = pack_set_param_command(g.P60DOCK_HANDLER_ID, P60DockConfigTable.out_en_0.parameter_address, + P60DockConfigTable.out_en_0.parameter_size, Channel.off) + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + return tc_queue diff --git a/pus_tc/tmtcc_tc_p60dock.py b/pus_tc/tmtcc_tc_p60dock.py new file mode 100644 index 0000000..3b6293e --- /dev/null +++ b/pus_tc/tmtcc_tc_p60dock.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +""" +@file tmtcc_tc_p60dock.py +@brief P60 Dock tests +@author J. Meier +@date 13.12.2020 +""" + +from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand +from gomspace.gomspace_common import * + + +class P60DockTestProcedure: + """ + @brief Use this class to define the tests to perform for the P60Dock. + @details Setting all to True will run all tests. + Setting all to False will only run the tests set to True. + """ + all = True + reboot = False + read_gnd_wdt = False + gnd_wdt_reset = False + ping = False + channel_3_off = False # pdu2 + read_temperature1 = False + read_channel_3_state = False # pdu2 + read_cur_lu_lim_0 = False + channel_3_on = False # pdu2 + invalid_table_id_test = False # Test to check if software properly handles invalid table ids + invalid_address_test = False # Test to check if software properly handles invalid addresses + invalid_parameter_size_test = True + + +class P60DockConfigTable: + out_en_0 = TableEntry(bytearray([0x00, 0x68]), TableEntry.uint8_size) # ACU + out_en_1 = TableEntry(bytearray([0x00, 0x69]), TableEntry.uint8_size) # PDU1 + out_en_2 = TableEntry(bytearray([0x00, 0x6A]), TableEntry.uint8_size) + out_en_3 = TableEntry(bytearray([0x00, 0x6B]), TableEntry.uint8_size) # PDU2 + # When channel consumes more than cur_lu_lim, channel is turned of immediately + cur_lu_lim_0 = TableEntry(bytearray([0x00, 0xF8]), TableEntry.uint16_size) + + +class P60DockHkTable: + temperature1 = TableEntry(bytearray([0x00, 0x44]), TableEntry.uint16_size) + temperature2 = TableEntry(bytearray([0x00, 0x46]), TableEntry.uint16_size) + # Ground WDT value (remaining seconds until reboot) + wdt_gnd_left = TableEntry(bytearray([0x00, 0xA8]), TableEntry.uint32_size) + + +def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: + if P60DockTestProcedure.all or P60DockTestProcedure.reboot: + tc_queue.appendleft(("print", "P60 Dock: Reboot")) + command = pack_reboot_command(object_id) + command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if P60DockTestProcedure.all or P60DockTestProcedure.read_gnd_wdt: + tc_queue.appendleft(("print", "P60 Dock: Reading ground watchdog timer value")) + command = pack_get_param_command(object_id, TableIds.hk, P60DockHkTable.wdt_gnd_left.parameter_address, + P60DockHkTable.wdt_gnd_left.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if P60DockTestProcedure.all or P60DockTestProcedure.gnd_wdt_reset: + tc_queue.appendleft(("print", "P60 Dock: Testing ground watchdog reset")) + command = pack_gnd_wdt_reset_command(object_id) + command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if P60DockTestProcedure.all or P60DockTestProcedure.ping: + tc_queue.appendleft(("print", "P60 Dock: Ping")) + ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + command = pack_ping_command(object_id, ping_data) + command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_off: + tc_queue.appendleft(("print", "P60 Dock: Testing setting output channel 3 off")) + parameter = 0 # set channel off + command = pack_set_param_command(object_id, P60DockConfigTable.out_en_3.parameter_address, + P60DockConfigTable.out_en_3.parameter_size, parameter) + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if P60DockTestProcedure.all or P60DockTestProcedure.read_temperature1: + tc_queue.appendleft(("print", "P60 Dock: Testing temperature reading")) + command = pack_get_param_command(object_id, TableIds.hk, P60DockHkTable.temperature1.parameter_address, + P60DockHkTable.temperature1.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_on: + tc_queue.appendleft(("print", "P60 Dock: Testing Output Channel 3 state (PDU2)")) + command = pack_get_param_command(object_id, TableIds.config, P60DockConfigTable.out_en_3.parameter_address, + P60DockConfigTable.out_en_3.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if P60DockTestProcedure.all or P60DockTestProcedure.read_cur_lu_lim_0: + tc_queue.appendleft(("print", "P60 Dock: Reading current limit value of output channel 0")) + command = pack_get_param_command(object_id, TableIds.config, P60DockConfigTable.cur_lu_lim_0.parameter_address, + P60DockConfigTable.cur_lu_lim_0.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_on: + tc_queue.appendleft(("print", "P60 Dock: Testing setting output channel 3 on")) + parameter = 1 # set channel on + command = pack_set_param_command(object_id, P60DockConfigTable.out_en_3.parameter_address, + P60DockConfigTable.out_en_3.parameter_size, parameter) + command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if P60DockTestProcedure.all or P60DockTestProcedure.invalid_table_id_test: + tc_queue.appendleft(("print", "P60 Dock: Testing invalid table id handling")) + table_id_invalid = 5 + command = pack_get_param_command(object_id, table_id_invalid, P60DockHkTable.temperature1.parameter_address, + P60DockHkTable.temperature1.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if P60DockTestProcedure.all or P60DockTestProcedure.invalid_address_test: + tc_queue.appendleft(("print", "P60 Dock: Testing invalid address handling in get param command")) + invalid_address = bytearray([0x01, 0xF4]) + command = pack_get_param_command(object_id, TableIds.hk, invalid_address, + P60DockHkTable.temperature1.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "P60 Dock: Testing invalid address handling in set param command")) + invalid_address = bytearray([0x01, 0xF4]) + parameter_size = 2 + parameter = 1 + command = pack_set_param_command(object_id, invalid_address, parameter_size, parameter) + command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if P60DockTestProcedure.all or P60DockTestProcedure.invalid_parameter_size_test: + tc_queue.appendleft(("print", "P60 Dock: Testing handling of invalid parameter sizes in get-param command")) + invalid_size = 5 + command = pack_get_param_command(object_id, TableIds.hk, P60DockHkTable.temperature1.parameter_address, + invalid_size) + command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "P60 Dock: Testing handling of invalid parameter size in set-param command")) + parameter = 1 + command = pack_set_param_command(object_id, P60DockConfigTable.out_en_3.parameter_address, invalid_size, + parameter) + command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + return tc_queue diff --git a/pus_tc/tmtcc_tc_packer_hook.py b/pus_tc/tmtcc_tc_packer_hook.py index 1206045..5500572 100644 --- a/pus_tc/tmtcc_tc_packer_hook.py +++ b/pus_tc/tmtcc_tc_packer_hook.py @@ -13,6 +13,12 @@ from tmtc_core.utility.tmtcc_logger import get_logger from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT from tmtc_core.pus_tc.tmtcc_tc_service5_event import pack_service5_test_into from tmtc_core.pus_tc.tmtcc_tc_service17_test import pack_service17_ping_command +from pus_tc.tmtcc_tc_p60dock import pack_p60dock_test_into +from pus_tc.tmtcc_tc_pdu2 import pack_pdu2_test_into +from pus_tc.tmtcc_tc_pdu1 import pack_pdu1_test_into +from pus_tc.tmtcc_tc_acu import pack_acu_test_into +from tmtc_core.core.tmtcc_object_id_manager import get_object_id +from config.tmtcc_object_ids import ObjectIds LOGGER = get_logger() @@ -22,6 +28,18 @@ def pack_service_queue_user(service: Union[int, str], op_code: int, service_queu return pack_service5_test_into(service_queue) if service == ServiceList.SERVICE_17: return service_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple()) + if service == ServiceList.P60DOCK: + object_id = get_object_id(ObjectIds.P60DOCK_HANDLER_ID) + return pack_p60dock_test_into(object_id, service_queue) + if service == ServiceList.PDU1: + object_id = get_object_id(ObjectIds.PDU1_HANDLER_ID) + return pack_pdu1_test_into(object_id, service_queue) + if service == ServiceList.PDU2: + object_id = get_object_id(ObjectIds.PDU2_HANDLER_ID) + return pack_pdu2_test_into(object_id, service_queue) + if service == ServiceList.ACU: + object_id = get_object_id(ObjectIds.ACU_HANDLER_ID) + return pack_acu_test_into(object_id, service_queue) LOGGER.warning("Invalid Service !") diff --git a/pus_tc/tmtcc_tc_pdu1.py b/pus_tc/tmtcc_tc_pdu1.py new file mode 100644 index 0000000..47ee8d7 --- /dev/null +++ b/pus_tc/tmtcc_tc_pdu1.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +""" +@file tmtcc_tc_pdu1.py +@brief PDU2 tests +@author J. Meier +@date 17.12.2020 +""" + +from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand +from gomspace.gomspace_common import * +from pus_tc.tmtcc_tc_p60dock import P60DockConfigTable +from gomspace.gomspace_pdu_definitions import * + + +class PDU1TestProcedure: + """ + @brief Use this class to define the tests to perform for the PDU2. + @details Setting all to True will run all tests. + Setting all to False will only run the tests set to True. + """ + all = True + reboot = False + ping = False + read_temperature = False + + +def pack_pdu1_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: + tc_queue.appendleft(("print", "Testing PDU1")) + + tc_queue.appendleft(("print", "P60 Dock: Enabling PDU1")) + command = pack_set_param_command(g.P60DOCK_HANDLER_ID, P60DockConfigTable.out_en_1.parameter_address, + P60DockConfigTable.out_en_1.parameter_size, Channel.on) + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + if PDU1TestProcedure.all or PDU1TestProcedure.ping: + tc_queue.appendleft(("print", "PDU1: Ping Test")) + ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + command = pack_ping_command(object_id, ping_data) + command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if PDU1TestProcedure.all or PDU1TestProcedure.read_temperature: + tc_queue.appendleft(("print", "PDU1: Testing temperature reading")) + command = pack_get_param_command(object_id, TableIds.hk, PDUHkTable.temperature.parameter_address, + PDUHkTable.temperature.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) diff --git a/pus_tc/tmtcc_tc_pdu2.py b/pus_tc/tmtcc_tc_pdu2.py new file mode 100644 index 0000000..2243bf4 --- /dev/null +++ b/pus_tc/tmtcc_tc_pdu2.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +""" +@file tmtcc_tc_pdu2.py +@brief PDU2 tests +@author J. Meier +@date 17.12.2020 +""" + +from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand +from gomspace.gomspace_common import * +from gomspace.gomspace_pdu_definitions import * +from pus_tc.tmtcc_tc_p60dock import P60DockConfigTable + + +class PDU2TestProcedure: + """ + @brief Use this class to define the tests to perform for the PDU2. + @details Setting all to True will run all tests. + Setting all to False will only run the tests set to True. + """ + all = True + reboot = False + read_gnd_wdt = False + gnd_wdt_reset = False + ping = False + channel_2_off = False # TCS Heater + read_temperature = False + read_channel_2_state = False # TCS Heater + read_cur_lu_lim_0 = False # OBC + channel_2_on = False # TCS Heater + invalid_table_id_test = False # Test to check if software properly handles invalid table ids + invalid_address_test = False # Test to check if software properly handles invalid addresses + invalid_parameter_size_test = True + + +def pack_pdu2_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: + tc_queue.appendleft(("print", "Testing PDU2")) + + tc_queue.appendleft(("print", "P60 Dock: Enabling PDU2")) + command = pack_set_param_command(g.P60DOCK_HANDLER_ID, P60DockConfigTable.out_en_3.parameter_address, + P60DockConfigTable.out_en_3.parameter_size, Channel.on) + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + if PDU2TestProcedure.all or PDU2TestProcedure.reboot: + tc_queue.appendleft(("print", "PDU2: Reboot")) + command = pack_reboot_command(object_id) + command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if PDU2TestProcedure.all or PDU2TestProcedure.read_gnd_wdt: + tc_queue.appendleft(("print", "PDU2: Reading ground watchdog timer value")) + command = pack_get_param_command(object_id, TableIds.hk, PDUHkTable.wdt_gnd_left.parameter_address, + PDUHkTable.wdt_gnd_left.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if PDU2TestProcedure.all or PDU2TestProcedure.gnd_wdt_reset: + tc_queue.appendleft(("print", "PDU2: Testing ground watchdog reset")) + command = pack_gnd_wdt_reset_command(object_id) + command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if PDU2TestProcedure.all or PDU2TestProcedure.ping: + tc_queue.appendleft(("print", "PDU2: Ping Test")) + ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + command = pack_ping_command(object_id, ping_data) + command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if PDU2TestProcedure.all or PDU2TestProcedure.channel_2_on: + tc_queue.appendleft(("print", "PDU2: Testing setting output channel 2 on (TCS Heater)")) + command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address, + PDUConfigTable.out_en_2.parameter_size, Channel.on) + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if PDU2TestProcedure.all or PDU2TestProcedure.read_temperature1: + tc_queue.appendleft(("print", "PDU2: Testing temperature reading")) + command = pack_get_param_command(object_id, TableIds.hk, PDUHkTable.temperature.parameter_address, + PDUHkTable.temperature.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if PDU2TestProcedure.all or PDU2TestProcedure.read_channel_2_state: + tc_queue.appendleft(("print", "PDU2: Reading output channel 2 state (TCS Heater)")) + command = pack_get_param_command(object_id, TableIds.config, PDUConfigTable.out_en_2.parameter_address, + PDUConfigTable.out_en_2.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if PDU2TestProcedure.all or PDU2TestProcedure.read_cur_lu_lim_0: + tc_queue.appendleft(("print", "PDU2: Reading current limit value of output channel 0 (OBC)")) + command = pack_get_param_command(object_id, TableIds.config, PDUConfigTable.cur_lu_lim_0.parameter_address, + PDUConfigTable.cur_lu_lim_0.parameter_size) + command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if PDU2TestProcedure.all or PDU2TestProcedure.channel_2_off: + tc_queue.appendleft(("print", "PDU2: Testing setting output channel 2 off")) + command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address, + PDUConfigTable.out_en_2.parameter_size, Channel.off) + command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + return tc_queue