diff --git a/config/hook_implementations.py b/config/hook_implementations.py index c5bad08..57005f4 100644 --- a/config/hook_implementations.py +++ b/config/hook_implementations.py @@ -117,6 +117,12 @@ class EiveHookObject(TmTcHookBase): "32": ("PLOC Supervisor: MRAM Dump", {OpCodeDictKeys.TIMEOUT: 2.0}), "33": ("PLOC Supervisor: Print CPU stats", {OpCodeDictKeys.TIMEOUT: 2.0}), "34": ("PLOC Supervisor: Set debug verbosity", {OpCodeDictKeys.TIMEOUT: 2.0}), + "35": ("PLOC Supervisor: Set GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}), + "36": ("PLOC Supervisor: Read GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}), + "37": ("PLOC Supervisor: Restart supervisor", {OpCodeDictKeys.TIMEOUT: 2.0}), + "38": ("PLOC Supervisor: Factory reset clear all", {OpCodeDictKeys.TIMEOUT: 2.0}), + "39": ("PLOC Supervisor: Factory reset clear mirror entries", {OpCodeDictKeys.TIMEOUT: 2.0}), + "40": ("PLOC Supervisor: Factory reset clear circular entries", {OpCodeDictKeys.TIMEOUT: 2.0}), } service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv) diff --git a/pus_tc/ploc_supervisor.py b/pus_tc/ploc_supervisor.py index 65aec36..b9a11b4 100644 --- a/pus_tc/ploc_supervisor.py +++ b/pus_tc/ploc_supervisor.py @@ -12,6 +12,9 @@ from tmtccmd.config.definitions import QueueCommands from tmtccmd.tc.packer import TcQueueT from tmtccmd.ecss.tc import PusTelecommand +from tmtccmd.utility.logger import get_console_logger + +LOGGER = get_console_logger() latchup_id_dict = { "0": "0.85V", @@ -60,9 +63,11 @@ class SupvActionIds: SET_GPIO = 34 READ_GPIO = 35 RESTART_SUPERVISOR = 36 - FACTORY_RESET = 37 + FACTORY_RESET_CLEAR_ALL = 37 REQUEST_LOGGING_DATA = 38 UPDATE_IMAGE_DATA = 39 + FACTORY_RESET_CLEAR_MIRROR = 40 + FACTORY_RESET_CLEAR_CIRCULAR = 41 class SupvHkIds: @@ -242,6 +247,36 @@ def pack_ploc_supv_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: command = pack_set_debug_verbosity_cmd(object_id) command = PusTelecommand(service=8, subservice=128, ssc=49, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "35": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set GPIO command")) + command = pack_set_gpio_cmd(object_id) + command = PusTelecommand(service=8, subservice=128, ssc=50, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "36": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Read GPIO command")) + command = pack_read_gpio_cmd(object_id) + command = PusTelecommand(service=8, subservice=128, ssc=51, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "37": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Restart supervisor")) + command = command = object_id + struct.pack('!I', SupvActionIds.RESTART_SUPERVISOR) + command = PusTelecommand(service=8, subservice=128, ssc=52, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "38": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory reset clear all")) + command = command = object_id + struct.pack('!I', SupvActionIds.FACTORY_RESET_CLEAR_ALL) + command = PusTelecommand(service=8, subservice=128, ssc=53, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "39": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory reset clear mirror entries")) + command = command = object_id + struct.pack('!I', SupvActionIds.FACTORY_RESET_CLEAR_MIRROR) + command = PusTelecommand(service=8, subservice=128, ssc=54, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "40": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory reset clear circular entries")) + command = command = object_id + struct.pack('!I', SupvActionIds.FACTORY_RESET_CLEAR_CIRCULAR) + command = PusTelecommand(service=8, subservice=128, ssc=55, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) return tc_queue @@ -488,17 +523,56 @@ def pack_print_cpu_stats_cmd(object_id: bytearray) -> bytearray: def pack_set_debug_verbosity_cmd(object_id: bytearray) -> bytearray: command = bytearray() - print("Specify debug verbosity") - verbosity_options_dict = { - 0: "None", - 1: "Error", - 2: "Warn", - 3: "Info", - } - print("{:<6} | {}".format('Key', 'Description')) - for entry in verbosity_options_dict.items(): - print("{:<6} | {}".format(entry[0], entry[1])) - verbosity = int(input("Specify verbosity key: ")) + verbosity = get_debug_verbosity() command = object_id + struct.pack('!I', SupvActionIds.SET_DBG_VERBOSITY) command = command + struct.pack('!B', verbosity) - return command \ No newline at end of file + return command + + +def get_debug_verbosity() -> int: + tries = 0 + while tries < 3: + try: + print("Debug verbosity options") + verbosity_options_dict = { + 0: "None", + 1: "Error", + 2: "Warn", + 3: "Info", + } + print("{:<6} | {}".format('Key', 'Description')) + for entry in verbosity_options_dict.items(): + print("{:<6} | {}".format(entry[0], entry[1])) + verbosity = int(input("Specify verbosity key: ")) + if verbosity > len(verbosity_options_dict) - 1: + raise ValueError + return verbosity + except ValueError: + LOGGER.warning("Invalid verbosity key specified") + tries = tries + 1 + LOGGER.error("get_debug_verbosity: Exceeded max tries to input verbosity key") + quit() + + +def pack_set_gpio_cmd(object_id: bytearray) -> bytearray: + port = int(input("Specify port: ")) + pin = int(input("Specify pin: ")) + val = int(input("Specify val: ")) + command = bytearray() + command = object_id + struct.pack('!I', SupvActionIds.SET_GPIO) + command = command + struct.pack('!B', port) + command = command + struct.pack('!B', pin) + command = command + struct.pack('!B', val) + return command + + +def pack_read_gpio_cmd(object_id: bytearray) -> bytearray: + port = int(input("Specify port: ")) + pin = int(input("Specify pin: ")) + command = bytearray() + command = object_id + struct.pack('!I', SupvActionIds.READ_GPIO) + command = command + struct.pack('!B', port) + command = command + struct.pack('!B', pin) + return command + +