diff --git a/config/definitions.py b/config/definitions.py index 771eb06..e9e11fa 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -29,3 +29,4 @@ class CustomServiceList(enum.Enum): REACTION_WHEEL_4 = "reaction_wheel_4" RAD_SENSOR = "rad_sensor" PLOC_SUPV = "ploc_supv" + PLOC_UPDATER = "ploc_updater" diff --git a/config/hook_implementations.py b/config/hook_implementations.py index c5bad08..49a6726 100644 --- a/config/hook_implementations.py +++ b/config/hook_implementations.py @@ -117,9 +117,24 @@ class EiveHookObject(TmTcHookBase): "32": ("PLOC Supervisor: MRAM Dump", {OpCodeDictKeys.TIMEOUT: 2.0}), "33": ("PLOC Supervisor: Print CPU stats", {OpCodeDictKeys.TIMEOUT: 2.0}), "34": ("PLOC Supervisor: Set debug verbosity", {OpCodeDictKeys.TIMEOUT: 2.0}), + "35": ("PLOC Supervisor: Set GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}), + "36": ("PLOC Supervisor: Read GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}), + "37": ("PLOC Supervisor: Restart supervisor", {OpCodeDictKeys.TIMEOUT: 2.0}), + "38": ("PLOC Supervisor: Factory reset clear all", {OpCodeDictKeys.TIMEOUT: 2.0}), + "39": ("PLOC Supervisor: Factory reset clear mirror entries", {OpCodeDictKeys.TIMEOUT: 2.0}), + "40": ("PLOC Supervisor: Factory reset clear circular entries", {OpCodeDictKeys.TIMEOUT: 2.0}), + "41": ("PLOC Supervisor: CAN loopback test", {OpCodeDictKeys.TIMEOUT: 2.0}), } service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv) + op_code_dict_srv_ploc_updater = { + "0": ("Ploc Updater: Update partion A on NVM0", {OpCodeDictKeys.TIMEOUT: 2.0}), + "1": ("Ploc Updater: Update partion B on NVM0", {OpCodeDictKeys.TIMEOUT: 2.0}), + "2": ("Ploc Updater: Update partion A on NVM1", {OpCodeDictKeys.TIMEOUT: 2.0}), + "3": ("Ploc Updater: Update partion B on NVM1", {OpCodeDictKeys.TIMEOUT: 2.0}), + } + service_ploc_updater_tuple = ("Ploc Updater", op_code_dict_srv_ploc_updater) + service_op_code_dict[CustomServiceList.ACU.value] = service_acu_tuple service_op_code_dict[CustomServiceList.TMP1075_1.value] = service_tmp1075_1_tuple service_op_code_dict[CustomServiceList.TMP1075_2.value] = service_tmp1075_2_tuple @@ -134,6 +149,7 @@ class EiveHookObject(TmTcHookBase): service_op_code_dict[CustomServiceList.REACTION_WHEEL_4.value] = service_rw_tuple service_op_code_dict[CustomServiceList.RAD_SENSOR.value] = service_rad_sensor_tuple service_op_code_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple + service_op_code_dict[CustomServiceList.PLOC_UPDATER.value] = service_ploc_updater_tuple return service_op_code_dict def get_json_config_file_path(self) -> str: diff --git a/config/object_ids.py b/config/object_ids.py index 5d6b3e7..b105b89 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -26,6 +26,7 @@ RW4_ID = bytes([0x44, 0x12, 0x00, 0x4]) START_TRACKER_ID = bytes([0x44, 0x13, 0x00, 0x1]) RAD_SENSOR_ID = bytes([0x44, 0x32, 0x00, 0xA5]) PLOC_SUPV_ID = bytes([0x44, 0x33, 0x00, 0x16]) +PLOC_UPDATER_ID = bytes([0x44, 0x33, 0x00, 0x00]) def get_object_ids() -> Dict[bytes, list]: diff --git a/pus_tc/ploc_supervisor.py b/pus_tc/ploc_supervisor.py index 65aec36..eb8a9b0 100644 --- a/pus_tc/ploc_supervisor.py +++ b/pus_tc/ploc_supervisor.py @@ -12,6 +12,9 @@ from tmtccmd.config.definitions import QueueCommands from tmtccmd.tc.packer import TcQueueT from tmtccmd.ecss.tc import PusTelecommand +from tmtccmd.utility.logger import get_console_logger + +LOGGER = get_console_logger() latchup_id_dict = { "0": "0.85V", @@ -60,9 +63,11 @@ class SupvActionIds: SET_GPIO = 34 READ_GPIO = 35 RESTART_SUPERVISOR = 36 - FACTORY_RESET = 37 + FACTORY_RESET_CLEAR_ALL = 37 REQUEST_LOGGING_DATA = 38 UPDATE_IMAGE_DATA = 39 + FACTORY_RESET_CLEAR_MIRROR = 40 + FACTORY_RESET_CLEAR_CIRCULAR = 41 class SupvHkIds: @@ -242,6 +247,41 @@ def pack_ploc_supv_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: command = pack_set_debug_verbosity_cmd(object_id) command = PusTelecommand(service=8, subservice=128, ssc=49, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "35": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set GPIO command")) + command = pack_set_gpio_cmd(object_id) + command = PusTelecommand(service=8, subservice=128, ssc=50, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "36": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Read GPIO command")) + command = pack_read_gpio_cmd(object_id) + command = PusTelecommand(service=8, subservice=128, ssc=51, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "37": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Restart supervisor")) + command = command = object_id + struct.pack('!I', SupvActionIds.RESTART_SUPERVISOR) + command = PusTelecommand(service=8, subservice=128, ssc=52, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "38": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory reset clear all")) + command = command = object_id + struct.pack('!I', SupvActionIds.FACTORY_RESET_CLEAR_ALL) + command = PusTelecommand(service=8, subservice=128, ssc=53, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "39": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory reset clear mirror entries")) + command = command = object_id + struct.pack('!I', SupvActionIds.FACTORY_RESET_CLEAR_MIRROR) + command = PusTelecommand(service=8, subservice=128, ssc=54, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "40": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory reset clear circular entries")) + command = command = object_id + struct.pack('!I', SupvActionIds.FACTORY_RESET_CLEAR_CIRCULAR) + command = PusTelecommand(service=8, subservice=128, ssc=55, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "41": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: CAN loopback test")) + command = command = object_id + struct.pack('!I', SupvActionIds.CAN_LOOPBACK_TEST) + command = PusTelecommand(service=8, subservice=128, ssc=56, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) return tc_queue @@ -488,17 +528,56 @@ def pack_print_cpu_stats_cmd(object_id: bytearray) -> bytearray: def pack_set_debug_verbosity_cmd(object_id: bytearray) -> bytearray: command = bytearray() - print("Specify debug verbosity") - verbosity_options_dict = { - 0: "None", - 1: "Error", - 2: "Warn", - 3: "Info", - } - print("{:<6} | {}".format('Key', 'Description')) - for entry in verbosity_options_dict.items(): - print("{:<6} | {}".format(entry[0], entry[1])) - verbosity = int(input("Specify verbosity key: ")) + verbosity = get_debug_verbosity() command = object_id + struct.pack('!I', SupvActionIds.SET_DBG_VERBOSITY) command = command + struct.pack('!B', verbosity) - return command \ No newline at end of file + return command + + +def get_debug_verbosity() -> int: + tries = 0 + while tries < 3: + try: + print("Debug verbosity options") + verbosity_options_dict = { + 0: "None", + 1: "Error", + 2: "Warn", + 3: "Info", + } + print("{:<6} | {}".format('Key', 'Description')) + for entry in verbosity_options_dict.items(): + print("{:<6} | {}".format(entry[0], entry[1])) + verbosity = int(input("Specify verbosity key: ")) + if verbosity > len(verbosity_options_dict) - 1: + raise ValueError + return verbosity + except ValueError: + LOGGER.warning("Invalid verbosity key specified") + tries = tries + 1 + LOGGER.error("get_debug_verbosity: Exceeded max tries to input verbosity key") + quit() + + +def pack_set_gpio_cmd(object_id: bytearray) -> bytearray: + port = int(input("Specify port: ")) + pin = int(input("Specify pin: ")) + val = int(input("Specify val: ")) + command = bytearray() + command = object_id + struct.pack('!I', SupvActionIds.SET_GPIO) + command = command + struct.pack('!B', port) + command = command + struct.pack('!B', pin) + command = command + struct.pack('!B', val) + return command + + +def pack_read_gpio_cmd(object_id: bytearray) -> bytearray: + port = int(input("Specify port: ")) + pin = int(input("Specify pin: ")) + command = bytearray() + command = object_id + struct.pack('!I', SupvActionIds.READ_GPIO) + command = command + struct.pack('!B', port) + command = command + struct.pack('!B', pin) + return command + + diff --git a/pus_tc/ploc_upater.py b/pus_tc/ploc_upater.py new file mode 100644 index 0000000..2d92cfe --- /dev/null +++ b/pus_tc/ploc_upater.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +""" +@file ploc_supervisor.py +@brief Tests for commanding the supervisor of the PLOC. + The supervisor is programmed by Thales. +@author J. Meier +@date 10.07.2021 +""" +import struct + +from tmtccmd.config.definitions import QueueCommands + +from tmtccmd.tc.packer import TcQueueT +from tmtccmd.ecss.tc import PusTelecommand +from tmtccmd.utility.logger import get_console_logger + +LOGGER = get_console_logger() + +latchup_id_dict = { + "0": "0.85V", + "1": "1.8V", + "2": "MISC", + "3": "3.3V", + "4": "NVM_4XO", + "5": "MISSION", + "6": "SAFECOTS" +} + + +class UpdaterActionIds: + UPDATE_NVM0_A = 0 + UPDATE_NVM0_B = 1 + UPDATE_NVM1_A = 2 + UPDATE_NVM1_B = 3 + + +class ImagePathDefs: + imageNvm0A = "/mnt/sd0/ploc/updateNvm0A.bin" + imageNvm0B = "/mnt/sd0/ploc/updateNvm0B.bin" + imageNvm1A = "/mnt/sd0/ploc/updateNvm1A.bin" + imageNvm1B = "/mnt/sd0/ploc/updateNvm1B.bin" + + +def pack_ploc_updater_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT: + tc_queue.appendleft( + (QueueCommands.PRINT, + "Testing PLOC updater with object id: 0x" + object_id.hex()) + ) + + if op_code == "0": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Update partition A on NVM0")) + command = object_id + struct.pack('!I', UpdaterActionIds.UPDATE_NVM0_A) + \ + bytearray(ImagePathDefs.imageNvm0A, 'utf-8') + command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if op_code == "1": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Update partition B on NVM0")) + command = object_id + struct.pack('!I', UpdaterActionIds.UPDATE_NVM0_B) + \ + bytearray(ImagePathDefs.imageNvm0B, 'utf-8') + command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if op_code == "2": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Update partition A on NVM1")) + command = object_id + struct.pack('!I', UpdaterActionIds.UPDATE_NVM1_A) + \ + bytearray(ImagePathDefs.imageNvm1A, 'utf-8') + command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if op_code == "3": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Update partition B on NVM1")) + command = object_id + struct.pack('!I', UpdaterActionIds.UPDATE_NVM1_B) + \ + bytearray(ImagePathDefs.imageNvm1B, 'utf-8') + command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index 80d893f..dddf999 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -26,10 +26,11 @@ from pus_tc.ploc_supervisor import pack_ploc_supv_test_into from pus_tc.heater import pack_heater_test_into from pus_tc.reaction_wheels import pack_single_rw_test_into from pus_tc.rad_sensor import pack_rad_sensor_test_into +from pus_tc.ploc_upater import pack_ploc_updater_test_into from config.definitions import CustomServiceList from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \ TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, \ - RAD_SENSOR_ID, PLOC_SUPV_ID + RAD_SENSOR_ID, PLOC_SUPV_ID, PLOC_UPDATER_ID LOGGER = get_console_logger() @@ -93,6 +94,9 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, service_queu if service == CustomServiceList.PLOC_SUPV.value: object_id = PLOC_SUPV_ID return pack_ploc_supv_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) + if service == CustomServiceList.PLOC_UPDATER.value: + object_id = PLOC_UPDATER_ID + return pack_ploc_updater_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) LOGGER.warning("Invalid Service !")