From 02b78cc48bbbb4887ded265723d819a57c7aa7f5 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 24 Aug 2022 16:18:20 +0200 Subject: [PATCH] clean up a bit --- tmtc/ploc_supervisor.py | 125 +++++++++++++++------------------------- tmtcc.py | 5 +- 2 files changed, 49 insertions(+), 81 deletions(-) diff --git a/tmtc/ploc_supervisor.py b/tmtc/ploc_supervisor.py index ab03376..fbcd666 100644 --- a/tmtc/ploc_supervisor.py +++ b/tmtc/ploc_supervisor.py @@ -120,14 +120,15 @@ class OpCodes: OFF = ["0", "off"] ON = ["1", "on"] NORMAL = ["2", "nml"] - HK_TO_OBC = ["3", "hk-to-obc"] - REQUEST_HK = ["4", "req-hk"] - START_MPSOC = ["5", "mpsoc-start"] - STOP_MPSOC = ["6", "mpsoc-stop"] - REQ_BOOT_STATUS_REPORT = ["13", "boot-report"] - START_UPDATE = ["42", "start-update"] + HK_TO_OBC = ["3", "hk_to_obc"] + REQUEST_HK = ["4", "req_hk"] + START_MPSOC = ["5", "start_mpsoc"] + SHUTDOWN_MPSOC = ["6", "stop_mpsoc"] + SEL_NVM = ["7", "sel_nvm"] + REQ_BOOT_STATUS_REPORT = ["13", "boot_report"] + START_UPDATE = ["42", "start_update"] PERFORM_UPDATE = ["update"] - MEM_CHECK = ["mem-check"] + MEM_CHECK = ["mem_check"] class Info(str, enum.Enum): @@ -137,8 +138,11 @@ class Info(str, enum.Enum): NML = "Switch Normal" HK_TO_OBC = "Request HK from PLOC SUPV" REQUEST_HK = "Request HK set from PLOC Handler" + PERFORM_UPDATE = "Start or continue MPSoC SW update at starting bytes" + START_UPDATE = "Start new MPSoC SW update" REQ_BOOT_STATUS_REPORT = "Request boot status report and HK" MEM_CHECK = "Memory Check" + SEL_NVM = "Select NVM" @tmtc_definitions_provider @@ -151,8 +155,8 @@ def add_ploc_supv_cmds(defs: TmtcDefinitionWrapper): oce.add(OpCodes.HK_TO_OBC, Info.HK_TO_OBC) oce.add(OpCodes.REQUEST_HK, Info.REQUEST_HK) oce.add(OpCodes.START_MPSOC, "PLOC Supervisor: Start MPSoC") - oce.add(OpCodes.STOP_MPSOC, "PLOC Supervisor: Shutdown MPSoC") - oce.add("7", "PLOC Supervisor: Select MPSoC boot image") + oce.add(OpCodes.SHUTDOWN_MPSOC, "PLOC Supervisor: Shutdown MPSoC") + oce.add(OpCodes.SEL_NVM, Info.SEL_NVM) oce.add("8", "PLOC Supervisor: Set max restart tries") oce.add("9", "PLOC Supervisor: Reset MPSoC") oce.add("10", "PLOC Supervisor: Set time reference") @@ -175,7 +179,8 @@ def add_ploc_supv_cmds(defs: TmtcDefinitionWrapper): oce.add("38", "PLOC Supervisor: Factory reset clear all") oce.add("39", "PLOC Supervisor: Factory reset clear mirror entries") oce.add("40", "PLOC Supervisor: Factory reset clear circular entries") - oce.add(OpCodes.PERFORM_UPDATE, "PLOC Supervisor: Perform update") + oce.add(OpCodes.PERFORM_UPDATE, Info.PERFORM_UPDATE) + oce.add(OpCodes.START_UPDATE, Info.START_UPDATE) oce.add("43", "PLOC Supervisor: Terminate supervisor process") oce.add("44", "PLOC Supervisor: Start MPSoC quiet") oce.add("45", "PLOC Supervisor: Set shutdown timeout") @@ -227,11 +232,11 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): q.add_log_cmd("PLOC Supervisor: Start MPSoC") command = obyt + struct.pack("!I", SupvActionIds.START_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.STOP_MPSOC: + if op_code in OpCodes.SHUTDOWN_MPSOC: q.add_log_cmd("PLOC Supervisor: Shutdown MPSoC") command = object_id.as_bytes + struct.pack("!I", SupvActionIds.SHUTWOWN_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "7": + if op_code in OpCodes.SEL_NVM: q.add_log_cmd("PLOC Supervisor: Select MPSoC boot image") mem = int(input("MEM (NVM0 - 0 or NVM1 - 1): ")) bp0 = int(input("BP0 (0 or 1): ")) @@ -270,7 +275,7 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): command = object_id.as_bytes + struct.pack("!I", SupvActionIds.DISABLE_HK) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code in OpCodes.REQ_BOOT_STATUS_REPORT: - q.add_log_cmd(Info.REQ_BOOT_STATUS_REPORT) + q.add_log_cmd(f"{prefix}: {Info.REQ_BOOT_STATUS_REPORT}") command = object_id.as_bytes + struct.pack( "!I", SupvActionIds.GET_BOOT_STATUS_REPORT ) @@ -355,9 +360,13 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): "!I", SupvActionIds.FACTORY_RESET_CLEAR_CIRCULAR ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) + if op_code in OpCodes.START_UPDATE: + q.add_log_cmd("PLOC Supversior: Start new MPSoC SW update") + command = pack_update_command(object_id.as_bytes, True) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code in OpCodes.PERFORM_UPDATE: - q.add_log_cmd("PLOC Supervisor: Perform update") - command = pack_update_command(object_id.as_bytes) + q.add_log_cmd("PLOC Supervisor: Perform MPSoC SW update") + command = pack_update_command(object_id.as_bytes, False) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "43": q.add_log_cmd("PLOC Supervisor: Terminate supervisor process") @@ -488,37 +497,6 @@ def pack_update_available_cmd(object_id: bytes) -> bytearray: return bytearray(command) -def pack_watchdogs_enable_cmd(object_id: bytes) -> bytearray: - """ - @brief This function packs the command to enable or disable watchdogs on the PLOC. - @param object_id The object id of the PLOC supervisor handler. - @note Enable = 1, Disable = 0 - """ - watchdog_ps = 1 - watchdog_pl = 1 - watchdog_int = 0 - command = bytearray() - command = object_id + struct.pack("!I", SupvActionIds.WATCHDOGS_ENABLE) - command = command + struct.pack("!B", watchdog_ps) - command = command + struct.pack("!B", watchdog_pl) - command = command + struct.pack("!B", watchdog_int) - return bytearray(command) - - -def pack_watchdog_config_timeout_cmd(object_id: bytearray) -> bytearray: - """ - @brief This function packs the command set the timeout of one of the three watchdogs of the PLOC. - @param object_id The object id of the PLOC supervisor handler. - """ - watchdog = int(input("Specify watchdog (0 - PS, 1 - PL, 2 - INT):")) - timeout = int(input("Specify timeout (1000 ms - 360000 ms):")) - command = bytearray() - command = object_id + struct.pack("!I", SupvActionIds.WATCHDOGS_CONFIG_TIMEOUT) - command = command + struct.pack("!B", watchdog) - command = command + struct.pack("!I", timeout) - return command - - def pack_lachtup_alert_cmd(object_id: bytes, state: bool) -> bytearray: """ @brief This function packs the command to enable or disable a certain latchup alerts. @@ -535,20 +513,6 @@ def pack_lachtup_alert_cmd(object_id: bytes, state: bool) -> bytearray: return bytearray(command) -def pack_auto_calibrate_alert_cmd(object_id: bytearray) -> bytearray: - """ - @brief This function packs the command to auto calibrate a latchup alert. - @param object_id The object id of the PLOC supervisor handler. - """ - latchup_id = get_latchup_id() - mg = int(input("Specify MG: ")) - command = bytearray() - command = object_id + struct.pack("!I", SupvActionIds.AUTO_CALIBRATE_ALERT) - command = command + struct.pack("!B", latchup_id) - command = command + struct.pack("!I", mg) - return command - - def get_latchup_id() -> int: key_column_width = 10 description_column_width = 50 @@ -629,26 +593,31 @@ def pack_mram_wipe_cmd(object_id: bytes) -> bytearray: return bytearray(command) -def pack_update_command(object_id: bytes) -> bytearray: +def pack_update_command(object_id: bytes, new_update: bool) -> bytearray: command = bytearray() memory_id = int(input("Specify memory ID: ")) start_address = int(input("Specify start address: 0x"), 16) update_file = get_update_file() - init_bytes_written = input("Specify bytes to start from [0 default]: ") - if init_bytes_written == "": + if new_update: init_bytes_written = 0 - init_bytes_written = int(init_bytes_written) - init_seq_count = input("Specify initial sequence count [1 default]: ") - if init_seq_count == "": - init_seq_count = 0 - del_mem = input("Delete memory? [y/n, y default]: ") - if del_mem.lower() in ["y", "1"]: - del_mem = 1 - elif del_mem.lower() in ["n", "0"]: - del_mem = 0 + init_seq_count = 1 + del_mem = True else: - raise ValueError("Invalid input, use y or n") - init_seq_count = int(init_seq_count) + init_bytes_written = input("Specify bytes to start from [0 default]: ") + if init_bytes_written == "": + init_bytes_written = 0 + init_bytes_written = int(init_bytes_written) + init_seq_count = input("Specify initial sequence count [1 default]: ") + if init_seq_count == "": + init_seq_count = 1 + init_seq_count = int(init_seq_count) + del_mem = input("Delete memory? [y/n, y default]: ") + if del_mem.lower() in ["y", "1"]: + del_mem = 1 + elif del_mem.lower() in ["n", "0"]: + del_mem = 0 + else: + raise ValueError("Invalid input, use y or n") command += object_id command += struct.pack("!I", SupvActionIds.PERFORM_UPDATE) command += bytearray(update_file, "utf-8") @@ -747,8 +716,8 @@ def handle_supv_hk_data(set_id: int, hk_data: bytes, printer: FsfwTmTcPrinter): bp_1_state, bp_2_state, boot_state, - boot_cycles - ) = struct.unpack(fmt_str, hk_data[0: 0 + inc_len]) + boot_cycles, + ) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len]) current_idx += inc_len pw.dlog( f"SoC state (0:off, 1:booting, 2:update, 3:operating, 4:shutdown, 5:reset): {soc_state}" @@ -756,7 +725,9 @@ def handle_supv_hk_data(set_id: int, hk_data: bytes, printer: FsfwTmTcPrinter): pw.dlog(f"Power Cycles {power_cycles}") pw.dlog(f"Boot after {boot_after_ms} ms | Boot timeout {boot_timeout_ms} ms") pw.dlog(f"Active NVM: {active_nvm}") - pw.dlog(f"BP0 State {bp_0_state} | BP1 State {bp_1_state} | BP2 State {bp_2_state}") + pw.dlog( + f"BP0 State {bp_0_state} | BP1 State {bp_1_state} | BP2 State {bp_2_state}" + ) pw.dlog(f"Boot State {boot_state} | Boot Cycles {boot_cycles}") pw.printer.print_validity_buffer(hk_data[current_idx:], 10) else: diff --git a/tmtcc.py b/tmtcc.py index 356f3ca..e74171f 100755 --- a/tmtcc.py +++ b/tmtcc.py @@ -48,10 +48,7 @@ from tmtccmd.tc import ( DefaultPusQueueHelper, ) from tmtccmd.config import default_json_path, SetupWrapper -from tmtccmd.config.args import ( - SetupParams, - ArgParserWrapper -) +from tmtccmd.config.args import SetupParams, ArgParserWrapper from config import __version__ from config.definitions import PUS_APID from config.hook import EiveHookObject