diff --git a/eive_tmtc/tmtc/payload/ploc_supervisor.py b/eive_tmtc/tmtc/payload/ploc_supervisor.py index 63bbcce..2cdcef5 100644 --- a/eive_tmtc/tmtc/payload/ploc_supervisor.py +++ b/eive_tmtc/tmtc/payload/ploc_supervisor.py @@ -27,7 +27,7 @@ from eive_tmtc.utility.input_helper import InputHelper _LOGGER = logging.getLogger(__name__) -latchup_id_dict = { +LATCHUP_ID_DICT = { "0": "0.85V", "1": "1.8V", "2": "MISC", @@ -41,7 +41,7 @@ HARDCODED = "0" MANUAL_INPUT = "1" HARDCODED_FILE = "/home/rmueller/EIVE/mpsoc_boot.bin" -update_file_dict = { +UPDATE_FILE_DICT = { HARDCODED: ["hardcoded", ""], MANUAL_INPUT: ["manual input", ""], "2": ["/mnt/sd0/ploc/supervisor/update.bin", "/mnt/sd0/ploc/supervisor/update.bin"], @@ -59,7 +59,7 @@ update_file_dict = { ], } -event_buffer_path_dict = { +EVENT_BUFFER_PATH_DICT = { MANUAL_INPUT: ["manual input", ""], "2": ["/mnt/sd0/ploc/supervisor", "/mnt/sd0/ploc/supervisor"], } @@ -140,22 +140,23 @@ class SetId(enum.IntEnum): UPDATE_STATUS_REPORT = 107 -class OpCodes: - OFF = ["0", "off"] - ON = ["1", "on"] - NORMAL = ["2", "nml"] - HK_TO_OBC = ["3", "hk_to_obc"] - REQUEST_HK = ["4", "req_hk"] - START_MPSOC = ["5", "start_mpsoc"] - SHUTDOWN_MPSOC = ["6", "stop_mpsoc"] - SEL_NVM = ["7", "sel_nvm"] - SET_TIME_REF = ["set_time_ref"] - FACTORY_FLASH = ["factory_flash"] - REQ_BOOT_STATUS_REPORT = ["13", "boot_report"] - START_UPDATE = ["42", "start_update"] - PERFORM_UPDATE = ["update"] - FACTORY_RESET = ["factory_reset"] - MEM_CHECK = ["mem_check"] +class OpCode: + OFF = "off" + ON = "on" + NORMAL = "nml" + HK_TO_OBC = "hk_to_obc" + REQUEST_HK_SET = "req_hk_from_dev" + REQUEST_HK_SET = "req_hk" + START_MPSOC = "start_mpsoc" + SHUTDOWN_MPSOC = "stop_mpsoc" + SEL_NVM = "sel_nvm" + SET_TIME_REF = "set_time_ref" + FACTORY_FLASH = "factory_flash" + REQ_BOOT_STATUS_REPORT = "boot_report" + START_UPDATE = "start_update" + PERFORM_UPDATE = "update" + FACTORY_RESET = "factory_reset" + MEM_CHECK = "mem_check" RESET_MPSOC = "reset_mpsoc" @@ -180,21 +181,21 @@ class Info(str, enum.Enum): @tmtc_definitions_provider def add_ploc_supv_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() - oce.add(OpCodes.OFF, Info.OFF) - oce.add(OpCodes.ON, Info.ON) - oce.add(OpCodes.NORMAL, Info.NML) - oce.add(OpCodes.HK_TO_OBC, Info.HK_TO_OBC) - oce.add(OpCodes.REQUEST_HK, Info.REQUEST_HK) - oce.add(OpCodes.START_MPSOC, "PLOC Supervisor: Start MPSoC") - oce.add(OpCodes.SHUTDOWN_MPSOC, "PLOC Supervisor: Shutdown MPSoC") - oce.add(OpCodes.SEL_NVM, Info.SEL_NVM) - oce.add(OpCodes.SET_TIME_REF, Info.SET_TIME_REF) - oce.add(OpCodes.FACTORY_RESET, Info.FACTORY_RESET) - oce.add(OpCodes.RESET_MPSOC, Info.RESET_MPSOC) + oce.add(OpCode.OFF, Info.OFF) + oce.add(OpCode.ON, Info.ON) + oce.add(OpCode.NORMAL, Info.NML) + oce.add(OpCode.HK_TO_OBC, Info.HK_TO_OBC) + oce.add(OpCode.REQUEST_HK_SET, Info.REQUEST_HK) + oce.add(OpCode.START_MPSOC, "PLOC Supervisor: Start MPSoC") + oce.add(OpCode.SHUTDOWN_MPSOC, "PLOC Supervisor: Shutdown MPSoC") + oce.add(OpCode.SEL_NVM, Info.SEL_NVM) + oce.add(OpCode.SET_TIME_REF, Info.SET_TIME_REF) + oce.add(OpCode.FACTORY_RESET, Info.FACTORY_RESET) + oce.add(OpCode.RESET_MPSOC, Info.RESET_MPSOC) oce.add("8", "PLOC Supervisor: Set max restart tries") oce.add("11", "PLOC Supervisor: Set boot timeout") oce.add("12", "PLOC Supervisor: Disable Hk") - oce.add(OpCodes.REQ_BOOT_STATUS_REPORT, Info.REQ_BOOT_STATUS_REPORT) + oce.add(OpCode.REQ_BOOT_STATUS_REPORT, Info.REQ_BOOT_STATUS_REPORT) oce.add("17", "PLOC Supervisor: Enable latchup alert") oce.add("18", "PLOC Supervisor: Disable latchup alert") oce.add("20", "PLOC Supervisor: Set alert limit") @@ -208,12 +209,12 @@ def add_ploc_supv_cmds(defs: TmtcDefinitionWrapper): oce.add("35", "PLOC Supervisor: Set GPIO") oce.add("36", "PLOC Supervisor: Read GPIO") oce.add("37", "PLOC Supervisor: Restart supervisor") - oce.add(OpCodes.PERFORM_UPDATE, Info.PERFORM_UPDATE) - oce.add(OpCodes.START_UPDATE, Info.START_UPDATE) + oce.add(OpCode.PERFORM_UPDATE, Info.PERFORM_UPDATE) + oce.add(OpCode.START_UPDATE, Info.START_UPDATE) oce.add("43", "PLOC Supervisor: Terminate supervisor process") oce.add("44", "PLOC Supervisor: Start MPSoC quiet") oce.add("45", "PLOC Supervisor: Set shutdown timeout") - oce.add(OpCodes.FACTORY_FLASH, Info.FACTORY_FLASH) + oce.add(OpCode.FACTORY_FLASH, Info.FACTORY_FLASH) oce.add("47", "PLOC Supervisor: Enable auto TM") oce.add("48", "PLOC Supervisor: Disable auto TM") oce.add("51", "PLOC Supervisor: Logging request event buffers") @@ -224,7 +225,7 @@ def add_ploc_supv_cmds(defs: TmtcDefinitionWrapper): oce.add("56", "PLOC Supervisor: Reset PL") oce.add("57", "PLOC Supervisor: Enable NVMs") oce.add("58", "PLOC Supervisor: Continue update") - oce.add(OpCodes.MEM_CHECK, Info.MEM_CHECK) + oce.add(OpCode.MEM_CHECK, Info.MEM_CHECK) defs.add_service(CustomServiceList.PLOC_SUPV.value, "PLOC Supervisor", oce) @@ -233,39 +234,40 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 q = p.queue_helper op_code = p.op_code object_id = get_object_ids().get(PLOC_SUPV_ID) + assert object_id is not None q.add_log_cmd(f"Testing PLOC Supervisor with object id: {object_id.as_hex_string}") obyt = object_id.as_bytes prefix = "PLOC Supervisor" - if op_code in OpCodes.OFF: + if op_code == OpCode.OFF: q.add_log_cmd(f"{prefix}: {Info.OFF}") command = pack_mode_data(object_id.as_bytes, Mode.OFF, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) - if op_code in OpCodes.ON: + if op_code == OpCode.ON: q.add_log_cmd(f"{prefix}: {Info.ON}") command = pack_mode_data(object_id.as_bytes, Mode.ON, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) - if op_code in OpCodes.NORMAL: + if op_code == OpCode.NORMAL: q.add_log_cmd(f"{prefix}: {Info.NML}") command = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) - if op_code in OpCodes.HK_TO_OBC: + if op_code == OpCode.HK_TO_OBC: q.add_log_cmd(f"{prefix}: {Info.HK_TO_OBC}") command = obyt + struct.pack("!I", SupvActionId.HK_REPORT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.REQUEST_HK: + if op_code == OpCode.REQUEST_HK_SET: q.add_log_cmd(f"{prefix}: {Info.REQUEST_HK}") sid = make_sid(object_id.as_bytes, SetId.HK_REPORT) cmd = generate_one_hk_command(sid) q.add_pus_tc(cmd) - elif op_code in OpCodes.START_MPSOC: + elif op_code == OpCode.START_MPSOC: q.add_log_cmd("PLOC Supervisor: Start MPSoC") command = obyt + struct.pack("!I", SupvActionId.START_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.SHUTDOWN_MPSOC: + if op_code == OpCode.SHUTDOWN_MPSOC: q.add_log_cmd("PLOC Supervisor: Shutdown MPSoC") command = object_id.as_bytes + struct.pack("!I", SupvActionId.SHUTWOWN_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.SEL_NVM: + if op_code == OpCode.SEL_NVM: q.add_log_cmd("PLOC Supervisor: Select MPSoC boot image") mem = int(input("MEM (NVM0 - 0 or NVM1 - 1): ")) bp0 = int(input("BP0 (0 or 1): ")) @@ -273,7 +275,7 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 bp2 = int(input("BP2 (0 or 1): ")) command = pack_sel_boot_image_cmd(object_id.as_bytes, mem, bp0, bp1, bp2) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.FACTORY_RESET: + if op_code == OpCode.FACTORY_RESET: q.add_log_cmd(f"{prefix}: {Info.FACTORY_RESET}") while True: print("Please select the key for a factory reset operation") @@ -299,11 +301,11 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 + struct.pack("!B", restart_tries) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == OpCodes.RESET_MPSOC: + if op_code == OpCode.RESET_MPSOC: q.add_log_cmd(Info.RESET_MPSOC) command = object_id.as_bytes + struct.pack("!I", SupvActionId.RESET_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.SET_TIME_REF: + if op_code == OpCode.SET_TIME_REF: q.add_log_cmd("PLOC Supervisor: Set time reference") command = object_id.as_bytes + struct.pack("!I", SupvActionId.SET_TIME_REF) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) @@ -320,7 +322,7 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 q.add_log_cmd("PLOC Supervisor: Disable HK") command = object_id.as_bytes + struct.pack("!I", SupvActionId.DISABLE_HK) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.REQ_BOOT_STATUS_REPORT: + if op_code in OpCode.REQ_BOOT_STATUS_REPORT: q.add_log_cmd(f"{prefix}: {Info.REQ_BOOT_STATUS_REPORT}") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.GET_BOOT_STATUS_REPORT @@ -388,11 +390,11 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 "!I", SupvActionId.RESTART_SUPERVISOR ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.START_UPDATE: + if op_code in OpCode.START_UPDATE: q.add_log_cmd("PLOC Supversior: Start new MPSoC SW update") command = pack_update_command(object_id.as_bytes, True) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.PERFORM_UPDATE: + if op_code in OpCode.PERFORM_UPDATE: q.add_log_cmd("PLOC Supervisor: Perform MPSoC SW update") command = pack_update_command(object_id.as_bytes, False) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) @@ -410,7 +412,7 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 q.add_log_cmd("PLOC Supervisor: Set shutdown timeout") command = pack_set_shutdown_timeout_command(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.FACTORY_FLASH: + if op_code in OpCode.FACTORY_FLASH: q.add_log_cmd(f"{prefix}: {Info.FACTORY_FLASH}") command = object_id.as_bytes + struct.pack("!I", SupvActionId.FACTORY_FLASH) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) @@ -467,7 +469,7 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 q.add_log_cmd("PLOC Supervisor: Continue update") command = object_id.as_bytes + struct.pack("!I", SupvActionId.CONTINUE_UPDATE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.MEM_CHECK: + if op_code == OpCode.MEM_CHECK: custom_data = bytearray() update_file = get_update_file() memory_id = int(input("Specify memory ID: ")) @@ -550,9 +552,9 @@ def get_latchup_id() -> int: description_string = "Description".ljust(description_column_width) print(f"{key_string} | {description_string}") print(separator_string) - for key in latchup_id_dict: + for key in LATCHUP_ID_DICT: key_string = key.ljust(key_column_width) - description_string = latchup_id_dict[key].ljust(description_column_width) + description_string = LATCHUP_ID_DICT[key].ljust(description_column_width) print(f"{key_string} | {description_string}") return int(input("Specify latchup ID: ")) @@ -706,25 +708,25 @@ def pack_logging_set_topic(object_id: bytes) -> bytearray: def get_update_file() -> str: _LOGGER.info("Specify update file ") - input_helper = InputHelper(update_file_dict) + input_helper = InputHelper(UPDATE_FILE_DICT) key = input_helper.get_key() if key == HARDCODED: file = HARDCODED_FILE elif key == MANUAL_INPUT: file = input("Ploc Supervisor: Specify absolute name of update file: ") else: - file = update_file_dict[key][1] + file = UPDATE_FILE_DICT[key][1] return file def get_event_buffer_path() -> str: _LOGGER.info("Specify path where to store event buffer file ") - input_helper = InputHelper(event_buffer_path_dict) + input_helper = InputHelper(EVENT_BUFFER_PATH_DICT) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc Supervisor: Specify path: ") else: - file = event_buffer_path_dict[key][1] + file = EVENT_BUFFER_PATH_DICT[key][1] return file