# -*- coding: utf-8 -*- """ @file ploc_mpsoc.py @brief Tests for commanding the MPSoC of the PLOC. The MPSoC is programmed by the ILH. @author J. Meier @date 06.03.2021 """ import dataclasses import logging import struct import enum from eive_tmtc.config.object_ids import get_object_ids, PLOC_MPSOC_ID from eive_tmtc.pus_tm.defs import PrintWrapper from tmtccmd.config.tmtc import ( CmdTreeNode, ) from spacepackets.ecss.tc import PusTelecommand from tmtccmd.tmtc import DefaultPusQueueHelper from eive_tmtc.utility.input_helper import InputHelper from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode, create_mode_command from tmtccmd.pus.s20_fsfw_param import create_load_param_cmd, create_scalar_u8_parameter from tmtccmd.pus.s8_fsfw_action import create_action_cmd _LOGGER = logging.getLogger(__name__) MANUAL_INPUT = "1" CRIT_CMD_APID_DICT = {"1": ("flash_mkfs", 0x12A)} OBC_WRITE_FILE_DICT = { MANUAL_INPUT: ("manual input", ""), "2": ("/mnt/sd0/ploc/mpsoc/flash_write.bin", "/mnt/sd0/ploc/mpsoc/flash_write.bin"), } OBC_READ_FILE_DICT = { MANUAL_INPUT: ("manual input", ""), "2": ("/mnt/sd0/ploc/mpsoc/flash_read.bin", "/mnt/sd0/ploc/mpsoc/flash_read.bin"), } MPSOC_WRITE_FILE_DICT = { MANUAL_INPUT: ("manual input", ""), "2": ("0:/", "0:/"), } MPSOC_READ_FILE_DICT = { MANUAL_INPUT: ("manual input", ""), "2": ("0:/PICA", "0:/PICA"), } SEQ_FILE_NAMES = ["0:/EM16/231", "0:/EQ04/E-75", "0:/EQ01/E130"] SEQ_FILE_DICT = { MANUAL_INPUT: ("manual input", ""), "2": (f"16QRM On Carrier 200 MBd ({SEQ_FILE_NAMES[0]})", f"{SEQ_FILE_NAMES[0]}"), "3": (f"QPSK On Carrier 780 MBd ({SEQ_FILE_NAMES[1]})", f"{SEQ_FILE_NAMES[1]}"), "4": (f"Maximum Bandwidth QPSK ({SEQ_FILE_NAMES[2]})", f"{SEQ_FILE_NAMES[2]}"), } CARRIAGE_RETURN = 0xD class SetId(enum.IntEnum): HK_ID = 0 class ActionId(enum.IntEnum): TC_MEM_WRITE = 1 TC_MEM_READ = 2 TM_MEM_READ_RPT = 6 TC_FLASH_WRITE_FULL_FILE = 9 TC_FLASH_DELETE = 10 TC_REPLAY_START = 11 TC_REPLAY_STOP = 12 TC_REPLAY_WRITE_SEQUENCE = 13 TC_DOWNLINK_PWR_ON = 14 TC_DOWNLINK_PWR_OFF = 15 OBSW_RESET_SEQ_COUNT = 50 TC_MODE_REPLAY = 16 TC_CAM_CMD_SEND = 17 TC_MODE_IDLE = 18 TM_CAM_CMD_RPT = 19 SET_UART_TX_TRISTATE = 20 RELEASE_UART_TX = 21 TC_CAM_TAKE_PIC = 22 TC_SIMPLEX_STREAM_FILE = 23 TC_DOWNLINK_DATA_MODULATE = 24 TC_MODE_SNAPSHOT = 25 TC_FLASH_DIR_GET_CONTENT = 28 TM_FLASH_DIRECTORY_CONTENT = 29 TC_FLASH_READ_FULL_FILE = 30 TC_SIMPLEX_STORE_FILE = 31 TC_VERIFY_BOOT = 32 TC_ENABLE_TC_EXECUTION = 33 TC_FLASH_MKFS = 34 class Submode(enum.IntEnum): IDLE_OR_NONE = 0 REPLAY = 1 SNAPSHOT = 2 class ParamId(enum.IntEnum): PLOC_SUPV_SKIP_CMD_TO_ON = 1 class OpCode: ON = "on" OFF = "off" NORMAL = "normal" VERIFY_BOOT = "verify_boot" REPLAY_WRITE_SEQ = "replay_write" DOWNLINK_PWR_ON = "downlink_pwr_on" MEM_WRITE = "memory_write" MEM_READ = "memory_read" DOWNLINK_PWR_OFF = "downlink_pwr_off" FLASH_WRITE_FILE = "flash_write_file" FLASH_READ_FILE = "flash_read_file" FLASH_DELETE_FILE = "flash_delete_file" FLASH_GET_DIR_CONTENT = "flash_get_dir_content" REPLAY_STOP = "replay_stop" REPLAY_START = "replay_start" CAM_CMD_SEND = "cam_cmd_send" CAM_TAKE_PIC = "cam_take_pic" SIMPLEX_STREAM_FILE = "simplex_stream_file" SIMPLEX_STORE_FILE = "simplex_store_file" DOWNLINK_DATA_MODULATE = "downlink_data_modulate" ENABLE_PLOC_SUPV_COMMANDING_TO_ON = "enable_ploc_supv_cmd_to_on" DISABLE_PLOC_SUPV_COMMANDING_TO_ON = "disable_ploc_supv_cmd_to_on" MODE_IDLE = "mode_idle" MODE_REPLAY = "mode_replay" MODE_SNAPSHOT = "mode_snapshot" ENABLE_TC_EXECUTION = "enable_tc_exec" FLASH_MKFS = "flash_mkfs" class Info: ON = "On" OFF = "Off" NORMAL = "Normal" VERIFY_BOOT = "Verify boot by reading 0xdeadbeef from DEADBEEF address" MODE_REPLAY = "Switch to REPLAY mode" REPLAY_STOP = "Stop Replay" MODE_IDLE = "Switch to IDLE mode" REPLAY_WRITE_SEQ = "Replay write sequence" DOWNLINK_PWR_ON = "Downlink Power On" DOWNLINK_PWR_OFF = "Downlink Power Off" MEM_WRITE = "Write to Memory" MEM_READ = "Read from Memory" REPLAY_START = "Replay Start" CAM_TAKE_PIC = "Cam Take Picture" CAM_CMD_SEND = "Send Camera Command" SIMPLEX_STREAM_FILE = "Simplex Stream File with E-Band" SIMPLEX_STORE_FILE = "Simplex Store File on MPSoC" FLASH_READ_FILE = "Copy file from MPSoC to OBC" FLASH_WRITE_FILE = "Copy file from OBC to MPSoC" FLASH_DELETE_FILE = "Delete file on MPSoC" FLASH_GET_DIR_CONTENT = "Get flash directory content on MPSoC" DOWNLINK_DATA_MODULATE = "Downlink data modulate" MODE_SNAPSHOT = "Mode Snapshot" ENABLE_PLOC_SUPV_COMMANDING_TO_ON = "Enable PLOC SUPV commanding when switching ON" DISABLE_PLOC_SUPV_COMMANDING_TO_ON = ( "Disable PLOC SUPV commanding when switching ON" ) ENABLE_TC_EXECUTION = "Enable execution of critical commands" FLASH_MKFS = "Flash MKFS command" class MemAddresses(enum.IntEnum): DEADBEEF = 0x40000004 def create_ploc_mpsoc_node() -> CmdTreeNode: op_code_strs = [ getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__") ] info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")] combined_dict = dict(zip(op_code_strs, info_strs)) node = CmdTreeNode("ploc_mpsoc", "PLOC MPSoC", hide_children_for_print=True) for op_code, info in combined_dict.items(): node.add_child(CmdTreeNode(op_code, info)) return node def pack_ploc_mpsoc_commands( q: DefaultPusQueueHelper, cmd_str: str ): # noqa C901: Complexity okay here. object_id = get_object_ids().get(PLOC_MPSOC_ID) assert object_id is not None prefix = "PLOC MPSoC" q.add_log_cmd( f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}" ) obyt = object_id.as_bytes if cmd_str == OpCode.OFF: q.add_log_cmd(f"{prefix}: {Info.OFF}") command = pack_mode_data(obyt, Mode.OFF, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if cmd_str == OpCode.ON: q.add_log_cmd(f"{prefix}: {Info.ON}") data = pack_mode_data(obyt, Mode.ON, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if cmd_str == OpCode.NORMAL: q.add_log_cmd(f"{prefix}: {Info.NORMAL}") data = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if cmd_str == OpCode.MEM_WRITE: q.add_log_cmd("PLOC MPSoC: TC mem write test") memory_address = int( input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16 ) memory_data = int(input("PLOC MPSoC: Tc Mem Write: Type memory data: 0x"), 16) # TODO: implement variable length mem write command mem_len = 1 # 1 32-bit word data = generate_write_mem_command( object_id.as_bytes, memory_address, memory_data, mem_len ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str == OpCode.FLASH_WRITE_FILE: q.add_log_cmd(f"{prefix}: {Info.FLASH_WRITE_FILE}") data = prepare_flash_write_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str == OpCode.FLASH_READ_FILE: q.add_log_cmd(f"{prefix}: {Info.FLASH_READ_FILE}") data = prepare_flash_read_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str == OpCode.FLASH_DELETE_FILE: q.add_log_cmd("PLOC MPSoC: Flash delete") data = prepare_flash_delete_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str in OpCode.REPLAY_START: q.add_log_cmd(f"{prefix}: {Info.REPLAY_START}") data = prepare_replay_start_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str == OpCode.REPLAY_STOP: q.add_log_cmd("PLOC MPSoC: Replay stop") data = object_id.as_bytes + struct.pack("!I", ActionId.TC_REPLAY_STOP) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str == OpCode.DOWNLINK_PWR_ON: q.add_log_cmd(f"{prefix}: {OpCode.DOWNLINK_PWR_ON}") data = prepare_downlink_pwr_on_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str == OpCode.DOWNLINK_PWR_OFF: q.add_log_cmd("PLOC MPSoC: Downlink pwr off") data = object_id.as_bytes + struct.pack("!I", ActionId.TC_DOWNLINK_PWR_OFF) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str == OpCode.FLASH_GET_DIR_CONTENT: q.add_log_cmd(f"{prefix}: {Info.FLASH_GET_DIR_CONTENT}") dir_name = input("Please specify MPSoC directory name to get information for: ") dir_name = bytearray(dir_name.encode("utf-8")) dir_name.append(0) q.add_pus_tc( create_action_cmd( object_id=object_id.as_bytes, action_id=ActionId.TC_FLASH_DIR_GET_CONTENT, user_data=dir_name, ) ) if cmd_str == OpCode.REPLAY_WRITE_SEQ: q.add_log_cmd(f"{prefix}: {Info.REPLAY_WRITE_SEQ}") data = prepare_replay_write_sequence_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str == "12": q.add_log_cmd("PLOC MPSoC: Reset OBSW sequence count") data = object_id.as_bytes + struct.pack("!I", ActionId.OBSW_RESET_SEQ_COUNT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str == OpCode.VERIFY_BOOT: q.add_log_cmd(f"{prefix} {Info.VERIFY_BOOT}") app_data = object_id.as_bytes + struct.pack("!I", ActionId.TC_VERIFY_BOOT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=app_data)) if cmd_str == OpCode.CAM_CMD_SEND: q.add_log_cmd(Info.CAM_CMD_SEND) cam_cmd = input("Specify cam command string: ") data = ( object_id.as_bytes + struct.pack("!I", ActionId.TC_CAM_CMD_SEND) + bytearray(cam_cmd, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str == OpCode.ENABLE_TC_EXECUTION: q.add_log_cmd(Info.ENABLE_TC_EXECUTION) while True: for key, val in CRIT_CMD_APID_DICT.items(): print(f"{key}: {val[0]} with APID {val[1]}") key = input("Please specify the command to enable by key: ") if key not in CRIT_CMD_APID_DICT: print("invalid key") continue apid = CRIT_CMD_APID_DICT[key][1] break app_data = struct.pack("!H", apid) q.add_pus_tc( create_action_cmd(PLOC_MPSOC_ID, ActionId.TC_ENABLE_TC_EXECUTION, app_data) ) if cmd_str == OpCode.FLASH_MKFS: q.add_log_cmd(Info.FLASH_MKFS) while True: flash_select = int(input("Please select the flash ID (0 or 1): ")) if flash_select != 0 and flash_select != 1: _LOGGER.warn("invalid flash select") continue break q.add_pus_tc( create_action_cmd( PLOC_MPSOC_ID, ActionId.TC_FLASH_MKFS, bytes([flash_select]) ) ) if cmd_str == OpCode.SIMPLEX_STORE_FILE: q.add_log_cmd(Info.SIMPLEX_STORE_FILE) q.add_pus_tc( create_action_cmd( PLOC_MPSOC_ID, ActionId.TC_SIMPLEX_STORE_FILE, user_data=prepare_simplex_store_file_user_data(), ) ) if cmd_str == "17": q.add_log_cmd("PLOC MPSoC: Set UART TX tristate") data = object_id.as_bytes + struct.pack("!I", ActionId.SET_UART_TX_TRISTATE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str == "18": q.add_log_cmd("PLOC MPSoC: Release UART TX") data = object_id.as_bytes + struct.pack("!I", ActionId.RELEASE_UART_TX) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str == OpCode.CAM_TAKE_PIC: q.add_log_cmd("PLOC MPSoC: Cam take picture") data = prepare_cam_take_pic_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str == OpCode.SIMPLEX_STREAM_FILE: q.add_log_cmd(Info.SIMPLEX_STREAM_FILE) data = prepare_simplex_stream_file_user_data() q.add_pus_tc( create_action_cmd( PLOC_MPSOC_ID, ActionId.TC_SIMPLEX_STREAM_FILE, user_data=data ) ) if cmd_str == OpCode.DOWNLINK_DATA_MODULATE: q.add_log_cmd("PLOC MPSoC: Downlink data modulate") data = prepare_downlink_data_modulate_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if cmd_str == OpCode.MODE_SNAPSHOT: q.add_log_cmd(Info.MODE_SNAPSHOT) mode = prompt_mode_for_submode_change() q.add_pus_tc(create_mode_command(PLOC_MPSOC_ID, mode, Submode.SNAPSHOT)) if cmd_str == OpCode.MODE_IDLE: q.add_log_cmd(Info.MODE_IDLE) mode = prompt_mode_for_submode_change() q.add_pus_tc(create_mode_command(PLOC_MPSOC_ID, mode, Submode.IDLE_OR_NONE)) if cmd_str == OpCode.MODE_REPLAY: q.add_log_cmd(Info.MODE_REPLAY) mode = prompt_mode_for_submode_change() q.add_pus_tc(create_mode_command(PLOC_MPSOC_ID, mode, Submode.REPLAY)) if cmd_str == OpCode.ENABLE_PLOC_SUPV_COMMANDING_TO_ON: q.add_log_cmd(Info.ENABLE_PLOC_SUPV_COMMANDING_TO_ON) q.add_pus_tc( create_load_param_cmd( create_scalar_u8_parameter( object_id.as_bytes, 0, ParamId.PLOC_SUPV_SKIP_CMD_TO_ON, 0 ) ) ) if cmd_str == OpCode.DISABLE_PLOC_SUPV_COMMANDING_TO_ON: q.add_log_cmd(Info.DISABLE_PLOC_SUPV_COMMANDING_TO_ON) q.add_pus_tc( create_load_param_cmd( create_scalar_u8_parameter( object_id.as_bytes, 0, ParamId.PLOC_SUPV_SKIP_CMD_TO_ON, 1 ) ) ) def prompt_mode_for_submode_change() -> int: while True: mode = input("Please specify primary mode [0: ON, 1: NORMAL]: ") if mode == "0": mode = Mode.ON elif mode == "1": mode = Mode.NORMAL else: print("Invalid mode") continue break return mode def generate_write_mem_command( object_id: bytes, memory_address: int, memory_data: int, mem_len: int ) -> bytearray: """This function generates the command to write to a memory address within the PLOC. :param object_id: The object id of the PlocHandler :param memory_address: The PLOC memory address where to write to. :param memory_data: The data to write to the memory address specified by the bytearray memory_address. :param mem_len: """ command = ( object_id + struct.pack("!I", ActionId.TC_MEM_WRITE) + struct.pack("!I", memory_address) + struct.pack("!H", mem_len) + struct.pack("!I", memory_data) ) return bytearray(command) def prepare_mem_read_command(object_id: bytes) -> bytearray: memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16) num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: ")) command = ( object_id + struct.pack("!I", ActionId.TC_MEM_READ) + struct.pack("!I", memory_address) + struct.pack("!H", num_words) ) return bytearray(command) def prepare_flash_base_cmd( obc_filename: str, mpsoc_filename: str, action_id: int, object_id: bytes ) -> bytearray: command = bytearray(object_id) command.extend(struct.pack("!I", action_id)) command.extend(obc_filename.encode("utf-8")) command.append(0) command.extend(mpsoc_filename.encode("utf-8")) command.append(0) return command def prepare_flash_write_cmd(object_id: bytes) -> bytearray: obc_file = get_obc_file(OBC_WRITE_FILE_DICT) mpsoc_file = get_mpsoc_file(MPSOC_WRITE_FILE_DICT) return prepare_flash_base_cmd( obc_file, mpsoc_file, ActionId.TC_FLASH_WRITE_FULL_FILE, object_id ) def prepare_flash_read_cmd(object_id: bytes) -> bytearray: mpsoc_file = get_mpsoc_file(MPSOC_READ_FILE_DICT) obc_file = get_obc_file(OBC_READ_FILE_DICT) cmd = prepare_flash_base_cmd( obc_file, mpsoc_file, ActionId.TC_FLASH_READ_FULL_FILE, object_id ) file_size = get_mpsoc_file_size() cmd.extend(struct.pack("!I", file_size)) return cmd def prepare_flash_delete_cmd(object_id: bytes) -> bytearray: file = get_mpsoc_file(MPSOC_READ_FILE_DICT) command = ( object_id + struct.pack("!I", ActionId.TC_FLASH_DELETE) + file.encode("utf-8") ) return bytearray(command) def prepare_replay_start_cmd(object_id: bytes) -> bytearray: replay = int(input("Specify replay mode (0 - once, 1 - repeated): ")) command = ( object_id + struct.pack("!I", ActionId.TC_REPLAY_START) + struct.pack("!B", replay) ) return bytearray(command) def prepare_downlink_pwr_on_cmd(object_id: bytes) -> bytearray: mode = int(input("Specify JESD mode (0 - 5): ")) lane_rate = int(input("Specify lane rate (0 - 9): ")) command = ( object_id + struct.pack("!I", ActionId.TC_DOWNLINK_PWR_ON) + struct.pack("!B", mode) + struct.pack("!B", lane_rate) ) return bytearray(command) def prepare_replay_write_sequence_cmd(object_id: bytes) -> bytearray: use_decoding = int(input("Use decoding (set to 1): ")) file = get_sequence_file() command = ( object_id + struct.pack("!I", ActionId.TC_REPLAY_WRITE_SEQUENCE) + struct.pack("!B", use_decoding) + bytearray(file, "utf-8") # + bytes([0]) ) return bytearray(command) def prepare_cam_take_pic_cmd(object_id: bytes) -> bytes: filename = input("Specify target filename: ") selection = input("Use default parameter? (y/n): ") if selection.lower() in ["y", "1", "yes"]: encoder_setting_y = 7 quantization_y = 0 encoder_setting_cb = 7 quantization_cb = 0 encoder_setting_cr = 7 quantization_cr = 0 bypass_compressor = 0 else: encoder_setting_y = int(input("Specify encoderSetting_Y: ")) quantization_y = int(input("Specify quantization_Y: ")) encoder_setting_cb = int(input("Specify encoderSetting_Cb: ")) quantization_cb = int(input("Specify quantization_Cb: ")) encoder_setting_cr = int(input("Specify encoderSetting_Cr: ")) quantization_cr = int(input("Specify quantization_Cr: ")) bypass_compressor = int(input("Specify bypassCompressor: ")) command = ( object_id + struct.pack("!I", ActionId.TC_CAM_TAKE_PIC) + filename.encode() + bytes([0]) + struct.pack("!B", encoder_setting_y) + struct.pack("!Q", quantization_y) + struct.pack("!B", encoder_setting_cb) + struct.pack("!Q", quantization_cb) + struct.pack("!B", encoder_setting_cr) + struct.pack("!Q", quantization_cr) + struct.pack("!B", bypass_compressor) ) return command def prepare_simplex_stream_file_user_data() -> bytes: filename = input("Specify filename: ") command = filename.encode() + bytes([0]) return command def prepare_simplex_store_file_user_data() -> bytes: num_of_chunks = int(input("Please specify the number of chunks: ")) assert num_of_chunks >= 0 filename = input("Specify filename: ") command = ( struct.pack("!I", num_of_chunks) + bytearray(filename, "utf-8") + bytes([0]) ) return command def prepare_downlink_data_modulate_cmd(object_id: bytes) -> bytearray: format = int(input("Specify format: ")) src_mem_addr = int(input("Specify srcMemAddr: ")) src_mem_len = int(input("Specify srcMemLen: ")) dest_mem_addr = int(input("Specify destMemAddr: ")) command = ( object_id + struct.pack("!I", ActionId.TC_DOWNLINK_DATA_MODULATE) + struct.pack("!B", format) + struct.pack("!I", src_mem_addr) + struct.pack("!H", src_mem_len) + struct.pack("!I", dest_mem_addr) ) return bytearray(command) def get_obc_file(input_dict: dict) -> str: _LOGGER.info("Specify OBC filename") input_helper = InputHelper(input_dict) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc MPSoC: Specify absolute name of flash file: ") else: file = input_dict[key][1] return file def get_mpsoc_file(input_dict: dict) -> str: _LOGGER.info("Specify MPSoC filename") input_helper = InputHelper(input_dict) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc MPSoC: Specify absolute name file: ") else: file = input_dict[key][1] return file def get_mpsoc_file_size() -> int: file_size = int(input("Specify MPSoC file size: ")) if file_size <= 0: raise ValueError("Invalid file size") return file_size def get_sequence_file() -> str: _LOGGER.info("Specify sequence file") input_helper = InputHelper(SEQ_FILE_DICT) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc MPSoC: Specify absolute name file: ") else: file = SEQ_FILE_DICT[key][1] return file def handle_ploc_mpsoc_hk_data(pw: PrintWrapper, hk_data: bytes, set_id: int): if set_id == SetId.HK_ID: fmt_str = "!IBBBBBBB" current_idx = 0 inc_len = struct.calcsize(fmt_str) ( status, mode, downlink_pwr_on, downlink_reply_active, downlink_jesd_sync_status, downlink_dac_status, cam_status, cam_sdi_status, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len pw.ilog(_LOGGER, "Received MPSoC HK") pw.dlog(f"Status: {status}") pw.dlog(f"Mode: {mode}") pw.dlog(f"Downlink Power On: {downlink_pwr_on}") pw.dlog(f"Downlink Reply Active: {downlink_reply_active}") pw.dlog(f"Downlink JESD Sync Status: {downlink_jesd_sync_status}") pw.dlog(f"Downlink DAC Status: {downlink_dac_status}") pw.dlog(f"CAM Status: {cam_status}") pw.dlog(f"CAM SDI Status: {cam_sdi_status}") fmt_str = "!fffffffff" inc_len = struct.calcsize(fmt_str) ( cam_fpga_temp, cam_soc_temp, sysmon_temp, sysmon_vcc_int, sysmon_vcc_aux, sysmon_vcc_bram, sysmon_vcc_paux, sysmon_vcc_pint, sysmon_vcc_pdro, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len pw.dlog(f"CAM FPGA Temperature: {cam_fpga_temp}") pw.dlog(f"CAM SoC Temperature: {cam_soc_temp}") pw.dlog(f"System Monitor Temperature: {sysmon_temp}") pw.dlog( f"SYSMON VCC INT {sysmon_vcc_int:.3f} | SYSMON VCC AUX" f" {sysmon_vcc_aux:.3f} | SYSMON VCC BRAM {sysmon_vcc_bram:.3f}" ) pw.dlog( f"SYSMON VCC PAUX {sysmon_vcc_paux:.3f} | SYSMON VCC PINT" f" {sysmon_vcc_pint:.3f} | SYSMON VCC PDRO {sysmon_vcc_pdro:.3f}" ) fmt_str = "!fffffffffffff" inc_len = struct.calcsize(fmt_str) ( sysmon_mb_12v, sysmon_mb_3v3, sysmon_mb_1v8, sysmon_vcc_12v, sysmon_vcc_5v, sysmon_vcc_3v3, sysmon_vcc_3v3va, sysmon_vcc_2v5ddr, sysmon_vcc_1v2ddr, sysmon_vcc_0v9, sysmon_vcc_0v6vtt, sysmon_safe_cotr_cur, sysmon_nvm4_xo_cur, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len pw.dlog( f"SYSMON MB 12V {sysmon_mb_12v:.3f} | SYSMON MB 3V3 {sysmon_mb_3v3:.3f} | " f"SYSMON MBA 1V8 {sysmon_mb_1v8:.3f}" ) pw.dlog( f"SYSMON VCC 12V {sysmon_vcc_12v:.3f} | SYSMON VCC 5V {sysmon_vcc_5v:.3f} |" f" SYSMON VCC 3V3 {sysmon_vcc_3v3:.3f} | SYSMON VCC 3V3VA" f" {sysmon_vcc_3v3va}" ) pw.dlog( f"SYSMON VCC 2V5DDR {sysmon_vcc_2v5ddr:.3f} | " f"SYSMON VCC 1V2DDR {sysmon_vcc_1v2ddr:.3f} | " f"SYSMON VCC 0V9 {sysmon_vcc_0v9:.3f} | " f"SYSMON VCC 0V6VTT {sysmon_vcc_0v6vtt}" ) pw.dlog( f"SYSMON SAFE COTS CURR: {sysmon_safe_cotr_cur} | " f"SYSMON NVM4XO CURR {sysmon_nvm4_xo_cur}" ) fmt_str = "!HHBB" inc_len = struct.calcsize(fmt_str) ( sem_uncorrectable_errs, sem_correctable_errs, sem_status, reboot_mpsoc_required, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) pw.dlog(f"SEM IP Uncorrectable Errors: {sem_uncorrectable_errs}") pw.dlog(f"SEM IP Correctable Errors: {sem_correctable_errs}") pw.dlog(f"SEM IP Status: {sem_status}") pw.dlog(f"Reboot MPSoC required: {reboot_mpsoc_required}") else: _LOGGER.warning(f"Unknown set ID {set_id} for MPSoC HK") pass @dataclasses.dataclass class DirElement: name: str attr: int size: int def handle_mpsoc_data_reply(action_id: int, pw: PrintWrapper, custom_data: bytearray): if action_id == ActionId.TM_MEM_READ_RPT: header_list = [ "PLOC Memory Address", "PLOC Mem Len", "PLOC Read Memory Data", ] content_list = [ "0x" + custom_data[:4].hex(), struct.unpack("!H", custom_data[4:6])[0], "0x" + custom_data[6:10].hex(), ] pw.dlog(f"{header_list}") pw.dlog(f"{content_list}") elif action_id == ActionId.TM_CAM_CMD_RPT: header_list = ["Camera reply string", "ACK"] content_list = [ custom_data[: len(custom_data) - 1].decode("utf-8"), hex(custom_data[-1]), ] pw.dlog(f"{header_list}") pw.dlog(f"{content_list}") elif action_id == ActionId.TM_FLASH_DIRECTORY_CONTENT: if len(custom_data) < 16: _LOGGER.warning( "PLOC MPSoC flash directory data shorter than minimum 16 bytes" ) current_idx = 0 end_of_str = custom_data[current_idx : current_idx + 12].index(b"\x00") dir_name_short = custom_data[current_idx : current_idx + end_of_str].decode() current_idx += 12 num_elements = struct.unpack("!I", custom_data[current_idx : current_idx + 4])[ 0 ] current_idx += 4 elem_names = [] elem_attrs = [] elem_sizes = [] expected_size = 16 + num_elements * 17 if len(custom_data) < expected_size: _LOGGER.warning( f"PLOC MPSoC flash directory data shorter than expected {expected_size}" ) pw.dlog( f"Received PLOC MPSoC flash directory content for path {dir_name_short} " f"with {num_elements} elements" ) # It is as weird as it looks.. for _ in range(num_elements): end_of_str = custom_data[current_idx : current_idx + 12].index(b"\x00") elem_name = custom_data[current_idx : current_idx + end_of_str].decode() current_idx += 12 elem_names.append(elem_name) for _ in range(num_elements): elem_attrs.append(custom_data[current_idx]) current_idx += 1 for _ in range(num_elements): elem_sizes.append( struct.unpack("!I", custom_data[current_idx : current_idx + 4])[0] ) current_idx += 4 for i in range(num_elements): pw.dlog(f"{DirElement(elem_names[i], elem_attrs[i], elem_sizes[i])}")