# -*- coding: utf-8 -*- """ @file ploc_supervisor.py @brief Tests for commanding the supervisor of the PLOC. The supervisor is programmed by Thales. @author J. Meier @date 10.07.2021 """ from datetime import datetime import enum import logging import struct from eive_tmtc.config.object_ids import PLOC_SUPV_ID, get_object_ids from eive_tmtc.pus_tm.defs import PrintWrapper from spacepackets.ecss.tc import PusTelecommand from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.config.tmtc import CmdTreeNode from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode from tmtccmd.pus.s8_fsfw_action import create_action_cmd from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from eive_tmtc.utility.input_helper import InputHelper _LOGGER = logging.getLogger(__name__) LATCHUP_ID_DICT = { "0": "0.85V", "1": "1.8V", "2": "MISC", "3": "3.3V", "4": "NVM_4XO", "5": "MISSION", "6": "SAFECOTS", } HARDCODED = "0" MANUAL_INPUT = "1" HARDCODED_FILE = "/home/rmueller/EIVE/mpsoc_boot.bin" UPDATE_FILE_DICT = { HARDCODED: ("hardcoded", ""), MANUAL_INPUT: ("manual input", ""), "2": ("/mnt/sd0/ploc/mpsoc/image.bin", "/mnt/sd0/ploc/mpsoc/image.bin"), } EVENT_BUFFER_PATH_DICT = { MANUAL_INPUT: ("manual input", ""), "2": ("/mnt/sd0/ploc/supervisor", "/mnt/sd0/ploc/supervisor"), } FACTORY_RESET_OPS = { 0x00: "CLEAR_MRAM_EVENT_BUF", 0x01: "CLEAR_MRAM_ADC_BUF", 0x02: "FACTORY_DEFAULT_MRAM_SYS_CFG", 0x03: "FACTORY_DEFAULT_MRAM_DBG_CFG", 0x04: "FACTORY_DEFAULT_BOOTMAN_CFG", 0x05: "FACTORY_DEFAULT_DATA_LOGGER", 0x06: "DATA_LOGGER_OP_DATA_TO_ZERO", 0x07: "FACTORY_DEFAULT_MRAM_LATCHUP_MON", 0x08: "FACTORY_DEFAULT_ADC_MON_CFG", 0x09: "FACTORY_DEFAULT_WATCHDOG_MON_CFG", 0x0A: "FACTORY_DEFAULT_HK_CFG", 0x0B: "FACTORY_DEFAULT_MEM_MAN_CFG", 0x10: "REDWIRE_TASK_1", 0x11: "REDWIRE_TASK_2", 0x12: "REDWIRE_TASK_3", } class SupvActionId(enum.IntEnum): REQUEST_HK_REPORT = 1 START_MPSOC = 3 SHUTWOWN_MPSOC = 4 SEL_MPSOC_BOOT_IMAGE = 5 SET_BOOT_TIMEOUT = 6 SET_MAX_RESTART_TRIES = 7 RESET_MPSOC = 8 SET_TIME_REF = 9 DISABLE_HK = 10 GET_BOOT_STATUS_REPORT = 11 UPDATE_AVAILABLE = 12 ENABLE_LATCHUP_ALERT = 15 DISABLE_LATCHUP_ALERT = 16 SET_ALERT_LIMIT = 18 SET_ADC_SWEEP_PERIOD = 20 SET_ADC_ENABLED_CHANNELS = 21 SET_ADC_WINDOW_AND_STRIDE = 22 SET_ADC_THRESHOLD = 23 GET_LATCHUP_STATUS_REPORT = 24 COPY_ADC_DATA_TO_MRAM = 25 SELECT_NVM = 27 RUN_AUTO_EM_TESTS = 28 WIPE_MRAM = 29 DUMP_MRAM = 30 SET_GPIO = 34 READ_GPIO = 35 RESTART_SUPERVISOR = 36 REQUEST_LOGGING_COUNTERS = 38 FACTORY_RESET = 39 START_MPSOC_QUIET = 45 SET_SHUTDOWN_TIMEOUT = 46 FACTORY_FLASH = 47 PERFORM_UPDATE = 48 TERMINATE_SUPV_HELPER = 49 ENABLE_AUTO_TM = 50 DISABLE_AUTO_TM = 51 LOGGING_REQUEST_EVENT_BUFFERS = 54 LOGGING_CLEAR_COUNTERS = 55 LOGGING_SET_TOPIC = 56 REQUEST_ADC_REPORT = 57 RESET_PL = 58 ENABLE_NVMS = 59 CONTINUE_UPDATE = 60 MEM_CHECK = 61 class SetId(enum.IntEnum): HK_REPORT = 102 BOOT_STATUS_REPORT = 103 LATCHUP_REPORT = 104 COUNTERS_REPORT = 105 ADC_REPORT = 106 UPDATE_STATUS_REPORT = 107 class OpCode: OFF = "off" ON = "on" NORMAL = "nml" HK_TO_OBC = "hk_to_obc" REQUEST_GENERIC_HK_SET = "req_generic_hk" START_MPSOC = "start_mpsoc" SHUTDOWN_MPSOC = "stop_mpsoc" SEL_NVM = "sel_nvm" SET_TIME_REF = "set_time_ref" FACTORY_FLASH = "factory_flash" START_UPDATE = "start_update" PERFORM_UPDATE = "update" FACTORY_RESET = "factory_reset" MEM_CHECK = "mem_check" RESET_MPSOC = "reset_mpsoc" SET_GPIO = "set_gpio" READ_GPIO = "read_gpio" READ_STATUS_REPORT = "read_status_report" class Info(str, enum.Enum): OFF = "Switch Off" ON = "Switch On" NORMAL = "Switch Normal" HK_TO_OBC = "Request HK from PLOC SUPV" REQUEST_GENERIC_HK_SET = "Request prompted HK set from PLOC Handler" START_MPSOC = "Start MPSoC" SHUTDOWN_MPSOC = "Shutdown MPSoC" SET_TIME_REF = "Set time reference" FACTORY_FLASH = "Factory Flash Mode" PERFORM_UPDATE = "Start or continue MPSoC SW update at starting bytes" START_UPDATE = "Start new MPSoC SW update" FACTORY_RESET = "Factory Reset of loggers" MEM_CHECK = "Memory Check" SEL_NVM = "Select NVM" RESET_MPSOC = "Reset MPSoC" SET_GPIO = "Set GPIO" READ_GPIO = "Read GPIO" READ_STATUS_REPORT = "Read HK status report" def create_ploc_supv_node() -> CmdTreeNode: op_code_strs = [ getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__") ] info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")] combined_dict = dict(zip(op_code_strs, info_strs)) node = CmdTreeNode("ploc_supv", "PLOC Supervisor", hide_children_for_print=True) for op_code, info in combined_dict.items(): node.add_child(CmdTreeNode(op_code, info)) return node def pack_ploc_supv_commands(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901 object_id = get_object_ids().get(PLOC_SUPV_ID) assert object_id is not None q.add_log_cmd(f"Testing PLOC Supervisor with object id: {object_id.as_hex_string}") obyt = object_id.as_bytes prefix = "PLOC Supervisor" if cmd_str == OpCode.OFF: q.add_log_cmd(f"{prefix}: {Info.OFF}") command = pack_mode_data(object_id.as_bytes, Mode.OFF, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if cmd_str == OpCode.ON: q.add_log_cmd(f"{prefix}: {Info.ON}") command = pack_mode_data(object_id.as_bytes, Mode.ON, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if cmd_str == OpCode.NORMAL: q.add_log_cmd(f"{prefix}: {Info.NORMAL}") command = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if cmd_str == OpCode.HK_TO_OBC: q.add_log_cmd(f"{prefix}: {Info.HK_TO_OBC}") command = obyt + struct.pack("!I", SupvActionId.REQUEST_HK_REPORT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == OpCode.START_MPSOC: q.add_log_cmd("PLOC Supervisor: Start MPSoC") command = obyt + struct.pack("!I", SupvActionId.START_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == OpCode.SHUTDOWN_MPSOC: q.add_log_cmd("PLOC Supervisor: Shutdown MPSoC") command = object_id.as_bytes + struct.pack("!I", SupvActionId.SHUTWOWN_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == OpCode.READ_STATUS_REPORT: q.add_log_cmd(f"{prefix}: {Info.READ_STATUS_REPORT}") set_id = prompt_set_id() action_cmd = None # First read the set from the device. if set_id == SetId.HK_REPORT: action_cmd = create_action_cmd(PLOC_SUPV_ID, SupvActionId.REQUEST_HK_REPORT) if set_id == SetId.ADC_REPORT: action_cmd = create_action_cmd( PLOC_SUPV_ID, SupvActionId.REQUEST_ADC_REPORT ) if set_id == SetId.COUNTERS_REPORT: action_cmd = create_action_cmd( PLOC_SUPV_ID, SupvActionId.REQUEST_LOGGING_COUNTERS ) if set_id == SetId.LATCHUP_REPORT: action_cmd = create_action_cmd( PLOC_SUPV_ID, SupvActionId.GET_LATCHUP_STATUS_REPORT ) if set_id == SetId.BOOT_STATUS_REPORT: action_cmd = create_action_cmd( PLOC_SUPV_ID, SupvActionId.GET_BOOT_STATUS_REPORT ) if action_cmd is None: _LOGGER.warning(f"invalid set ID {set_id!r} for PLOC SUPV") return # Now dump the HK set. sid = make_sid(object_id.as_bytes, set_id) req_hk = generate_one_hk_command(sid) q.add_pus_tc(action_cmd) q.add_wait_seconds(2.0) q.add_pus_tc(req_hk) assert action_cmd is not None elif cmd_str == OpCode.START_MPSOC: q.add_log_cmd("PLOC Supervisor: Start MPSoC") command = obyt + struct.pack("!I", SupvActionId.START_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == OpCode.SHUTDOWN_MPSOC: q.add_log_cmd("PLOC Supervisor: Shutdown MPSoC") command = object_id.as_bytes + struct.pack("!I", SupvActionId.SHUTWOWN_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == OpCode.SEL_NVM: q.add_log_cmd("PLOC Supervisor: Select MPSoC boot image") mem = int(input("MEM (NVM0 - 0 or NVM1 - 1): ")) bp0 = int(input("BP0 (0 or 1): ")) bp1 = int(input("BP1 (0 or 1): ")) bp2 = int(input("BP2 (0 or 1): ")) command = pack_sel_boot_image_cmd(object_id.as_bytes, mem, bp0, bp1, bp2) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == OpCode.FACTORY_RESET: q.add_log_cmd(f"{prefix}: {Info.FACTORY_RESET}") while True: print("Please select the key for a factory reset operation") for key, val in FACTORY_RESET_OPS.items(): print(f"{key}: {val}") key = int(input("Key Select: ")) if key not in FACTORY_RESET_OPS: print("Key invalid!") break q.add_pus_tc( create_action_cmd( object_id=PLOC_SUPV_ID, action_id=SupvActionId.FACTORY_RESET, user_data=bytes([key]), ) ) if cmd_str == "8": q.add_log_cmd("PLOC Supervisor: Set max restart tries") restart_tries = int(input("Specify maximum restart tries: ")) command = ( object_id.as_bytes + struct.pack("!I", SupvActionId.SET_MAX_RESTART_TRIES) + struct.pack("!B", restart_tries) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == OpCode.RESET_MPSOC: q.add_log_cmd(Info.RESET_MPSOC) command = object_id.as_bytes + struct.pack("!I", SupvActionId.RESET_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == OpCode.SET_TIME_REF: q.add_log_cmd("PLOC Supervisor: Set time reference") command = object_id.as_bytes + struct.pack("!I", SupvActionId.SET_TIME_REF) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == "11": q.add_log_cmd("PLOC Supervisor: Set boot timeout") boot_timeout = int(input("Specify boot timeout [ms]: ")) command = ( object_id.as_bytes + struct.pack("!I", SupvActionId.SET_BOOT_TIMEOUT) + struct.pack("!I", boot_timeout) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == "23": q.add_log_cmd("PLOC Supervisor: Set ADC enabled channels") command = pack_set_adc_enabled_channels_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == "24": q.add_log_cmd("PLOC Supervisor: Set ADC window and stride") command = pack_set_adc_window_and_stride_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == "25": q.add_log_cmd("PLOC Supervisor: Set ADC threshold") command = pack_set_adc_threshold_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == OpCode.SET_GPIO: q.add_log_cmd("PLOC Supervisor: Set GPIO command") command = pack_set_gpio_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == OpCode.READ_GPIO: q.add_log_cmd("PLOC Supervisor: Read GPIO command") command = pack_read_gpio_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == "37": q.add_log_cmd("PLOC Supervisor: Restart supervisor") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.RESTART_SUPERVISOR ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str in OpCode.START_UPDATE: q.add_log_cmd("PLOC Supversior: Start new MPSoC SW update") command = pack_update_command(object_id.as_bytes, True) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str in OpCode.PERFORM_UPDATE: q.add_log_cmd("PLOC Supervisor: Perform MPSoC SW update") command = pack_update_command(object_id.as_bytes, False) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == "43": q.add_log_cmd("PLOC Supervisor: Terminate supervisor process") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.TERMINATE_SUPV_HELPER ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == "44": q.add_log_cmd("PLOC Supervisor: Start MPSoC quiet") command = object_id.as_bytes + struct.pack("!I", SupvActionId.START_MPSOC_QUIET) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == "45": q.add_log_cmd("PLOC Supervisor: Set shutdown timeout") command = pack_set_shutdown_timeout_command(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str in OpCode.FACTORY_FLASH: q.add_log_cmd(f"{prefix}: {Info.FACTORY_FLASH}") command = object_id.as_bytes + struct.pack("!I", SupvActionId.FACTORY_FLASH) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == "56": q.add_log_cmd("PLOC Supervisor: Reset PL") command = object_id.as_bytes + struct.pack("!I", SupvActionId.RESET_PL) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == "57": q.add_log_cmd("PLOC Supervisor: Enable NVMs") nvm01 = int(input("Enable (1) or disable(0) NVM 0 and 1: ")) nvm3 = int(input("Enable (1) or disable(0) NVM 3: ")) command = ( object_id.as_bytes + struct.pack("!I", SupvActionId.ENABLE_NVMS) + struct.pack("B", nvm01) + struct.pack("B", nvm3) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == "58": q.add_log_cmd("PLOC Supervisor: Continue update") command = object_id.as_bytes + struct.pack("!I", SupvActionId.CONTINUE_UPDATE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == OpCode.MEM_CHECK: custom_data = bytearray() update_file = get_update_file() memory_id = int(input("Specify memory ID: ")) start_address = int(input("Specify start address: 0x"), 16) custom_data.extend(update_file.encode("utf-8")) custom_data.append(0) custom_data.extend(struct.pack("!B", memory_id)) custom_data.extend(struct.pack("!I", start_address)) q.add_log_cmd( f"{prefix}: {Info.MEM_CHECK} for file {update_file} at memory ID" f" {memory_id} at start address {start_address}" ) command = create_action_cmd( object_id.as_bytes, SupvActionId.MEM_CHECK, custom_data ) q.add_pus_tc(command) def prompt_set_id() -> SetId: for set_id in SetId: print(f"{set_id}: {set_id.name}") while True: set_id = int(input("Please select the set ID to request: ")) try: set_id_typed = SetId(set_id) except ValueError: _LOGGER.warning("invalid set ID, try again") continue break return set_id_typed def pack_sel_boot_image_cmd( object_id: bytes, mem: int, bp0: int, bp1: int, bp2: int ) -> bytearray: """This function can be used to generate the command to select the image from which the MPSoC will boot. @param object_id The object id of the PLOC supervisor handler. @param mem The memory from which the MPSoC shall boot (NVM0 - 0, NVM1 - 1) @param bp0 Partition pin 0 @param bp1 Partition pin 1 @param bp2 Partition pin 2 """ command = object_id + struct.pack("!I", SupvActionId.SEL_MPSOC_BOOT_IMAGE) command = command + struct.pack("!B", mem) command = command + struct.pack("!B", bp0) command = command + struct.pack("!B", bp1) command = command + struct.pack("!B", bp2) return bytearray(command) def pack_update_available_cmd(object_id: bytes) -> bytearray: """ @brief This function packs the udpate availabe command. @param object_id The object id of the PLOC supervisor handler. """ image_select = 1 image_partition = 0 image_size = 222 image_crc = 0x0 number_of_packets = 150 command = object_id + struct.pack("!I", SupvActionId.UPDATE_AVAILABLE) command = command + struct.pack("!B", image_select) command = command + struct.pack("!B", image_partition) command = command + struct.pack("!I", image_size) command = command + struct.pack("!I", image_crc) command = command + struct.pack("!I", number_of_packets) return bytearray(command) def pack_lachtup_alert_cmd(object_id: bytes, state: bool) -> bytearray: """ @brief This function packs the command to enable or disable a certain latchup alerts. @param object_id The object id of the PLOC supervisor handler. @param state True - enable latchup alert, False - disable latchup alert """ latchup_id = get_latchup_id() command = bytearray() if state: command = object_id + struct.pack("!I", SupvActionId.ENABLE_LATCHUP_ALERT) else: command = object_id + struct.pack("!I", SupvActionId.DISABLE_LATCHUP_ALERT) command = command + struct.pack("!B", latchup_id) return bytearray(command) def get_latchup_id() -> int: key_column_width = 10 description_column_width = 50 separator_width = key_column_width + description_column_width + 3 separator_string = separator_width * "-" key_string = "Latchup ID".ljust(key_column_width) description_string = "Description".ljust(description_column_width) print(f"{key_string} | {description_string}") print(separator_string) for key in LATCHUP_ID_DICT: key_string = key.ljust(key_column_width) description_string = LATCHUP_ID_DICT[key].ljust(description_column_width) print(f"{key_string} | {description_string}") return int(input("Specify latchup ID: ")) def pack_set_alert_limit_cmd(object_id: bytes) -> bytearray: """ @brief This function packs the command to set the limit of a latchup alert. @param object_id The object id of the PLOC supervisor handler. """ latchup_id = get_latchup_id() dutycycle = int(input("Specify dutycycle: ")) command = bytearray() command = object_id + struct.pack("!I", SupvActionId.SET_ALERT_LIMIT) command = command + struct.pack("!B", latchup_id) command = command + struct.pack("!I", dutycycle) return bytearray(command) def pack_set_adc_enabled_channels_cmd(object_id: bytes) -> bytearray: """ @brief This function packs the command to enable or disable channels of the ADC. @param object_id The object id of the PLOC supervisor handler. """ ch = int(input("Specify ch: 0x"), 16) cmd = object_id + struct.pack("!I", SupvActionId.SET_ADC_ENABLED_CHANNELS) cmd = cmd + struct.pack("!H", ch) return bytearray(cmd) def pack_set_adc_window_and_stride_cmd(object_id: bytes) -> bytearray: window_size = int(input("Specify window size: ")) striding_step_size = int(input("Specify striding step size: ")) command = object_id + struct.pack("!I", SupvActionId.SET_ADC_WINDOW_AND_STRIDE) command = command + struct.pack("!H", window_size) command = command + struct.pack("!H", striding_step_size) return bytearray(command) def pack_set_adc_threshold_cmd(object_id: bytes) -> bytearray: threshold = int(input("Specify threshold: ")) command = object_id + struct.pack("!I", SupvActionId.SET_ADC_THRESHOLD) command = command + struct.pack("!I", threshold) return bytearray(command) def pack_select_nvm_cmd(object_id: bytes) -> bytearray: mem = int(input("Specify NVM (0 - NVM0, 1 - MVM1): ")) command = object_id + struct.pack("!I", SupvActionId.SELECT_NVM) command = command + struct.pack("!B", mem) return bytearray(command) def pack_auto_em_tests_cmd(object_id: bytes) -> bytearray: test = int(input("Specify test (1 - complete, 2 - short): ")) command = object_id + struct.pack("!I", SupvActionId.RUN_AUTO_EM_TESTS) command = command + struct.pack("!B", test) return bytearray(command) def pack_mram_wipe_cmd(object_id: bytes) -> bytearray: start = int(input("Start address: 0x"), 16) stop = int(input("Stop address: 0x"), 16) command = object_id + struct.pack("!I", SupvActionId.WIPE_MRAM) command = command + struct.pack("!I", start) command = command + struct.pack("!I", stop) return bytearray(command) def pack_update_command(object_id: bytes, new_update: bool) -> bytearray: command = bytearray() memory_id = int(input("Specify memory ID: ")) start_address = int(input("Specify start address: 0x"), 16) update_file = get_update_file() if new_update: init_bytes_written = 0 init_seq_count = 1 del_mem = True else: init_bytes_written = input("Specify bytes to start from [0 default]: ") if init_bytes_written == "": init_bytes_written = 0 init_bytes_written = int(init_bytes_written) init_seq_count = input("Specify initial sequence count [1 default]: ") if init_seq_count == "": init_seq_count = 1 init_seq_count = int(init_seq_count) del_mem = input("Delete memory? [y/n, y default]: ") if del_mem.lower() in ["y", "1"]: del_mem = 1 elif del_mem.lower() in ["n", "0"]: del_mem = 0 else: raise ValueError("Invalid input, use y or n") command += object_id command += struct.pack("!I", SupvActionId.PERFORM_UPDATE) command += bytearray(update_file, "utf-8") # Adding null terminator command += struct.pack("!B", 0) command += struct.pack("!B", memory_id) command += struct.pack("!I", start_address) command.extend(struct.pack("!I", init_bytes_written)) command.extend(struct.pack("!H", init_seq_count)) command.append(del_mem) return bytearray(command) def pack_set_shutdown_timeout_command(object_id: bytes) -> bytearray: command = bytearray() command += object_id command += struct.pack("!I", SupvActionId.SET_SHUTDOWN_TIMEOUT) timeout = int(input("Specify shutdown timeout (ms): ")) command += struct.pack("!I", timeout) return command def pack_logging_buffer_request(object_id: bytes) -> bytearray: command = bytearray() command += object_id command += struct.pack("!I", SupvActionId.LOGGING_REQUEST_EVENT_BUFFERS) path = get_event_buffer_path() command += bytearray(path, "utf-8") return command def pack_set_gpio_cmd(object_id: bytes) -> bytearray: port = int(input("Specify port: 0x"), 16) pin = int(input("Specify pin: 0x"), 16) val = int(input("Specify val: 0x"), 16) command = bytearray(object_id + struct.pack("!I", SupvActionId.SET_GPIO)) command.append(port) command.append(pin) command.append(val) return bytearray(command) def pack_read_gpio_cmd(object_id: bytes) -> bytearray: port = int(input("Specify port: 0x"), 16) pin = int(input("Specify pin: 0x"), 16) command = object_id + struct.pack("!I", SupvActionId.READ_GPIO) command = command + struct.pack("!B", port) command = command + struct.pack("!B", pin) return bytearray(command) def pack_logging_set_topic(object_id: bytes) -> bytearray: command = object_id + struct.pack("!I", SupvActionId.LOGGING_SET_TOPIC) tpc = int(input("Specify logging topic: ")) command += struct.pack("!B", tpc) return bytearray(command) def get_update_file() -> str: _LOGGER.info("Specify update file ") input_helper = InputHelper(UPDATE_FILE_DICT) key = input_helper.get_key() if key == HARDCODED: file = HARDCODED_FILE elif key == MANUAL_INPUT: file = input("Ploc Supervisor: Specify absolute name of update file: ") else: file = UPDATE_FILE_DICT[key][1] return file def get_event_buffer_path() -> str: _LOGGER.info("Specify path where to store event buffer file ") input_helper = InputHelper(EVENT_BUFFER_PATH_DICT) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc Supervisor: Specify path: ") else: file = EVENT_BUFFER_PATH_DICT[key][1] return file class SocState(enum.IntEnum): OFF = 0 BOOTING = 1 UPDATE = 2 OPERATIONAL = 3 RESET = 4 FAULTY = 5 def handle_supv_hk_data(set_id: int, hk_data: bytes, pw: PrintWrapper): if set_id == SetId.HK_REPORT: handle_hk_report(hk_data, pw) elif set_id == SetId.BOOT_STATUS_REPORT: handle_boot_report(hk_data, pw) elif set_id == SetId.ADC_REPORT: handle_adc_report(hk_data) elif set_id == SetId.COUNTERS_REPORT: handle_counters_report(hk_data) elif set_id == SetId.LATCHUP_REPORT: handle_latchup_status_report(hk_data) else: pw.dlog(f"PLOC SUPV: HK handling not implemented for set ID {set_id}") pw.dlog(f"Raw Data: 0x[{hk_data.hex(sep=',')}]") def handle_hk_report(hk_data: bytes, pw: PrintWrapper): fmt_str = "!IIIQIIIIIBBBB" inc_len = struct.calcsize(fmt_str) ( temp_ps, temp_pl, temp_sup, uptime, cpu_load, avail_heap, num_tcs, num_tms, soc_state, nvm_0_1_state, nvm_3_state, mission_io_state, fmc_state, ) = struct.unpack(fmt_str, hk_data[:inc_len]) pw.dlog(f"Temp PS {temp_ps} C | Temp PL {temp_pl} C | Temp SUP {temp_sup} C") pw.dlog(f"Uptime {uptime} | CPU Load {cpu_load} | Avail Heap {avail_heap}") pw.dlog(f"Number TCs {num_tcs} | Number TMs {num_tms}") try: pw.dlog(f"SOC state {SocState(soc_state)}") except ValueError: pw.dlog(f"Invalid SOC state {soc_state}") pw.dlog(f"NVM 01 State {nvm_0_1_state}") pw.dlog(f"NVM 3 State {nvm_3_state}") pw.dlog(f"Mission IO state {mission_io_state}") pw.dlog(f"FMC state {fmc_state}") pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[inc_len:], 13)) def handle_boot_report(hk_data: bytes, pw: PrintWrapper): current_idx = 0 fmt_str = "!BBIIBBBBBB" inc_len = struct.calcsize(fmt_str) ( soc_state, power_cycles, boot_after_ms, boot_timeout_ms, active_nvm, bp_0_state, bp_1_state, bp_2_state, boot_state, boot_cycles, ) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len]) current_idx += inc_len pw.dlog( "SoC state (0:off, 1:booting, 2:update, 3:operating, 4:shutdown, 5:reset):" f" {soc_state}" ) pw.dlog(f"Power Cycles {power_cycles}") pw.dlog(f"Boot after {boot_after_ms} ms | Boot timeout {boot_timeout_ms} ms") pw.dlog(f"Active NVM: {active_nvm}") pw.dlog(f"BP0 State {bp_0_state} | BP1 State {bp_1_state} | BP2 State {bp_2_state}") pw.dlog(f"Boot State {boot_state} | Boot Cycles {boot_cycles}") FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 10) def handle_adc_report(hk_data: bytes): if len(hk_data) < 64: _LOGGER.warning("ADC report smaller than 64 bytes") current_idx = 0 adc_raw = [] adc_eng = [] for _ in range(16): adc_raw.append(struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]) current_idx += 2 for _ in range(16): adc_eng.append(struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]) current_idx += 2 print(f"{'Index'.ljust(10)} | {'ADC RAW'.ljust(10)} | {'ADC ENG'.ljust(10)}") for i in range(16): print(f"{i: >10} | {adc_raw[i]: >10} | {adc_eng[i]: >10}") def handle_counters_report(hk_data: bytes): current_idx = 0 signature = struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0] current_idx += 4 latchup_counters = [] for _ in range(7): latchup_counters.append( struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0] ) current_idx += 4 fmt_str = "!IIIIIIIIIIIIIIIIIIII" inc_len = struct.calcsize(fmt_str) ( adc_deviation_triggers_cnt, tc_received_cnt, tm_available_cnt, supervisor_boots, mpsoc_boots, mpsoc_boot_failed_attempts, mpsoc_powerup, mpsoc_updates, mpsoc_heartbeat_resets, cpu_wdt_resets, ps_heartbeats_lost, pl_heartbeats_lost, eb_task_lost, bm_task_lost, lm_task_lost, am_task_lost, tctmm_task_lost, mm_task_lost, hk_task_lost, dl_task_lost, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len _redwire_tasks_lost = [] for _ in range(3): _redwire_tasks_lost.append( struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0] ) print(f"Signature: {signature}") print(f"Latchup Counters: {latchup_counters}") print(f"ADC Deviation Triggers Count: {adc_deviation_triggers_cnt}") print(f"TCs received: {tc_received_cnt} | TMs Available: {tm_available_cnt}") print(f"Supervisor Boots: {supervisor_boots} | MPSoC boots: {mpsoc_boots}") print(f"MPSoC boot failed attempts: {mpsoc_boot_failed_attempts}") print(f"MPSoC powerup: {mpsoc_powerup}") print(f"MPSoC updates: {mpsoc_updates}") print(f"MPSoC heartbeat resets: {mpsoc_heartbeat_resets}") print(f"CPU WDT resets: {cpu_wdt_resets}") print(f"PS heartbeats lost: {ps_heartbeats_lost}") print(f"PL heartbeats lost: {pl_heartbeats_lost}") print(f"EB task lost: {eb_task_lost}") print(f"BM task lost: {bm_task_lost}") print(f"LM task lost: {lm_task_lost}") print(f"AM task lost: {am_task_lost}") print(f"TCTMM task lost: {tctmm_task_lost}") print(f"MM task lost: {mm_task_lost}") print(f"HK task lost: {hk_task_lost}") print(f"DL task lost: {dl_task_lost}") def handle_latchup_status_report(hk_data: bytes): # 1 byte ID, 7 times 2 bytes of counts, and 8 bytes of time and 1 byte sync status. if len(hk_data) < 24: raise ValueError("Latchup status report smaller than expected") current_idx = 0 id = hk_data[current_idx] current_idx += 1 counts_of_alerts = [] for _ in range(7): counts_of_alerts.append( struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] ) current_idx += 2 print(f"ID: {id}") print(f"Counts of alerts: {counts_of_alerts}") time_ms = struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] current_idx += 2 time_seconds = hk_data[current_idx] current_idx += 1 time_minutes = hk_data[current_idx] current_idx += 1 time_hour = hk_data[current_idx] current_idx += 1 time_day = hk_data[current_idx] current_idx += 1 time_month = hk_data[current_idx] current_idx += 1 # Is stored as years since 1900. time_year = 1900 + hk_data[current_idx] current_idx += 1 is_synced = hk_data[current_idx] print(f"Time Sync Status: {is_synced}") try: dt = datetime( year=time_year, month=time_month, day=time_day, hour=time_hour, minute=time_minutes, second=time_seconds, microsecond=time_ms * 1000, ) print(f"Time Now: {dt}") except ValueError: print( f"Time: {time_day}.{time_month}.{time_year}T" f"{time_hour}:{time_minutes}:{time_seconds}.{time_ms}" )