# -*- coding: utf-8 -*- """ @file ploc_mpsoc.py @brief Tests for commanding the MPSoC of the PLOC. The MPSoC is programmed by the ILH. @author J. Meier @date 06.03.2021 """ import logging import struct import enum from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.object_ids import get_object_ids, PLOC_MPSOC_ID from tmtccmd.config.tmtc import ( tmtc_definitions_provider, OpCodeEntry, TmtcDefinitionWrapper, ) from spacepackets.ecss.tc import PusTelecommand from tmtccmd.tc import service_provider from tmtccmd.tc.decorator import ServiceProviderParams from eive_tmtc.utility.input_helper import InputHelper from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode _LOGGER = logging.getLogger(__name__) MANUAL_INPUT = "1" FLASH_WRITE_FILE_DICT = { MANUAL_INPUT: ("manual input", ""), "2": ("/mnt/sd0/ploc/mpsoc/flash_write.bin", "/mnt/sd0/ploc/mpsoc/flash_write.bin"), } MPSOC_FILE_DICT = { MANUAL_INPUT: ("manual input", ""), "2": ("0:/flash", "0:/flash"), } SEQ_FILE_NAMES = ["0:/EM16/231", "0:/EQ04/E-75", "0:/EQ01/E130"] SEQ_FILE_DICT = { MANUAL_INPUT: ("manual input", ""), "2": (f"16QRM On Carrier 200 MBd ({SEQ_FILE_NAMES[0]})", f"{SEQ_FILE_NAMES[0]}"), "3": (f"QPSK On Carrier 780 MBd ({SEQ_FILE_NAMES[1]})", f"{SEQ_FILE_NAMES[1]}"), "4": (f"Maximum Bandwidth QPSK ({SEQ_FILE_NAMES[2]})", f"{SEQ_FILE_NAMES[2]}"), } CARRIAGE_RETURN = 0xD class ActionId(enum.IntEnum): TC_MEM_WRITE = 1 TC_MEM_READ = 2 TC_FLASH_WRITE_FULL_FILE = 9 TC_FLASH_DELETE = 10 TC_REPLAY_START = 11 TC_REPLAY_STOP = 12 TC_REPLAY_WRITE_SEQUENCE = 13 TC_DOWNLINK_PWR_ON = 14 TC_DOWNLINK_PWR_OFF = 15 OBSW_RESET_SEQ_COUNT = 50 TC_MODE_REPLAY = 16 TC_CAM_CMD_SEND = 17 TC_MODE_IDLE = 18 SET_UART_TX_TRISTATE = 20 RELEASE_UART_TX = 21 TC_CAM_TAKE_PIC = 22 TC_SIMPLEX_SEND_FILE = 23 TC_DOWNLINK_DATA_MODULATE = 24 TC_MODE_SNAPSHOT = 25 TC_FLASH_DIR_GET_CONTENT = 28 TC_FLASH_READ_FULL_FILE = 30 class OpCode: ON = "on" OFF = "off" NORMAL = "normal" VERIFY_BOOT = "verify_boot" MODE_REPLAY = "mode_replay" MODE_IDLE = "mode_idle" REPLAY_WRITE_SEQ = "replay_write" DOWNLINK_PWR_ON = "downlink_pwr_on" MEM_WRITE = "memory_write" MEM_READ = "memory_read" DOWNLINK_PWR_OFF = "downlink_pwr_off" FLASH_WRITE_FILE = "flash_write_file" FLASH_READ_FILE = "flash_read_file" FLASH_DELETE_FILE = "flash_delete_file" FLASH_GET_DIR_CONTENT = "flash_get_dir_content" REPLAY_STOP = "replay_stop" REPLAY_START = "replay_start" CAM_TAKE_PIC = "cam_take_pic" SIMPLEX_SEND_FILE = "simplex_send_file" DOWNLINK_DATA_MODULATE = "downlink_data_modulate" MODE_SNAPSHOT = "mode_snapshot" class Info: ON = "On" OFF = "Off" NORMAL = "Normal" VERIFY_BOOT = "Verify boot by reading 0xdeadbeef from DEADBEEF address" MODE_REPLAY = "Switch to REPLAY mode" REPLAY_STOP = "Stop Replay" MODE_IDLE = "Switch to IDLE mode" REPLAY_WRITE_SEQ = "Replay write sequence" DOWNLINK_PWR_ON = "Downlink Power On" DOWNLINK_PWR_OFF = "Downlink Power Off" REPLAY_START = "Replay Start" CAM_TAKE_PIC = "Cam Take Picture" SIMPLEX_SEND_FILE = "Simplex Send File" FLASH_READ_FILE = "Copy file from MPSoC to OBC" FLASH_WRITE_FILE = "Copy file from OBC to MPSoC" FLASH_DELETE_FILE = "Delete file on MPSoC" FLASH_GET_DIR_CONTENT = "Get flash directory content on MPSoC" DOWNLINK_DATA_MODULATE = "Downlink data modulate" MODE_SNAPSHOT = "Mode Snapshot" class MemAddresses(enum.IntEnum): DEADBEEF = 0x40000004 class PlocReplyIds(enum.IntEnum): TM_MEM_READ_RPT = 6 TM_CAM_CMD_RPT = 19 @tmtc_definitions_provider def add_ploc_mpsoc_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(OpCode.OFF, Info.OFF) oce.add(OpCode.ON, Info.ON) oce.add(OpCode.NORMAL, Info.NORMAL) oce.add(OpCode.MEM_WRITE, "Ploc MPSoC: Memory write") oce.add(OpCode.MEM_READ, "Ploc MPSoC: Memory read") oce.add(OpCode.FLASH_WRITE_FILE, Info.FLASH_WRITE_FILE) oce.add(OpCode.FLASH_READ_FILE, Info.FLASH_READ_FILE) oce.add(OpCode.FLASH_DELETE_FILE, Info.FLASH_DELETE_FILE) oce.add(OpCode.REPLAY_START, Info.REPLAY_START) oce.add(OpCode.REPLAY_STOP, Info.REPLAY_STOP) oce.add(OpCode.DOWNLINK_PWR_ON, Info.DOWNLINK_PWR_ON) oce.add(OpCode.DOWNLINK_PWR_OFF, Info.DOWNLINK_PWR_OFF) oce.add(OpCode.REPLAY_WRITE_SEQ, Info.REPLAY_WRITE_SEQ) oce.add("12", "Ploc MPSoC: OBSW reset sequence count") oce.add(OpCode.VERIFY_BOOT, Info.VERIFY_BOOT) oce.add(OpCode.MODE_REPLAY, Info.MODE_REPLAY) oce.add(OpCode.MODE_IDLE, Info.MODE_IDLE) oce.add("16", "Ploc MPSoC: Tc cam command send") oce.add("17", "Ploc MPSoC: Set UART TX tristate") oce.add("18", "Ploc MPSoC: Relesase UART TX") oce.add(OpCode.CAM_TAKE_PIC, Info.CAM_TAKE_PIC) oce.add(OpCode.SIMPLEX_SEND_FILE, Info.SIMPLEX_SEND_FILE) oce.add(OpCode.DOWNLINK_DATA_MODULATE, Info.DOWNLINK_DATA_MODULATE) oce.add(OpCode.MODE_SNAPSHOT, Info.MODE_SNAPSHOT) defs.add_service(CustomServiceList.PLOC_MPSOC.value, "Ploc MPSoC", oce) @service_provider(CustomServiceList.PLOC_MPSOC) def pack_ploc_mpsoc_commands(p: ServiceProviderParams): object_id = get_object_ids().get(PLOC_MPSOC_ID) q = p.queue_helper prefix = "PLOC MPSoC" op_code = p.op_code q.add_log_cmd( f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}" ) obyt = object_id.as_bytes if op_code == OpCode.OFF: q.add_log_cmd(f"{prefix}: {Info.OFF}") command = pack_mode_data(obyt, Mode.OFF, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if op_code == OpCode.ON: q.add_log_cmd(f"{prefix}: {Info.ON}") data = pack_mode_data(obyt, Mode.ON, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == OpCode.NORMAL: q.add_log_cmd(f"{prefix}: {Info.NORMAL}") data = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == OpCode.MEM_WRITE: q.add_log_cmd("PLOC MPSoC: TC mem write test") memory_address = int( input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16 ) memory_data = int(input("PLOC MPSoC: Tc Mem Write: Type memory data: 0x"), 16) # TODO: implement variable length mem write command mem_len = 1 # 1 32-bit word data = generate_write_mem_command( object_id.as_bytes, memory_address, memory_data, mem_len ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "4": q.add_log_cmd("PLOC MPSoC: TC mem read test") data = prepare_mem_read_command(object_id=object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.FLASH_WRITE_FILE: q.add_log_cmd(f"{prefix}: {Info.FLASH_WRITE_FILE}") data = prepare_flash_write_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.FLASH_READ_FILE: q.add_log_cmd(f"{prefix}: {Info.FLASH_READ_FILE}") data = prepare_flash_read_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.FLASH_DELETE_FILE: q.add_log_cmd("PLOC MPSoC: Flash delete") data = prepare_flash_delete_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code in OpCode.REPLAY_START: q.add_log_cmd(f"{prefix}: {Info.REPLAY_START}") data = prepare_replay_start_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.REPLAY_STOP: q.add_log_cmd("PLOC MPSoC: Replay stop") data = object_id.as_bytes + struct.pack("!I", ActionId.TC_REPLAY_STOP) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.DOWNLINK_PWR_ON: q.add_log_cmd(f"{prefix}: {OpCode.DOWNLINK_PWR_ON}") data = prepare_downlink_pwr_on_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.DOWNLINK_PWR_OFF: q.add_log_cmd("PLOC MPSoC: Downlink pwr off") data = object_id.as_bytes + struct.pack("!I", ActionId.TC_DOWNLINK_PWR_OFF) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.REPLAY_WRITE_SEQ: q.add_log_cmd(f"{prefix}: {Info.REPLAY_WRITE_SEQ}") data = prepare_replay_write_sequence_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "12": q.add_log_cmd("PLOC MPSoC: Reset OBSW sequence count") data = object_id.as_bytes + struct.pack("!I", ActionId.OBSW_RESET_SEQ_COUNT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.VERIFY_BOOT: num_words = 1 q.add_log_cmd(f"{prefix} {Info.VERIFY_BOOT}") data = ( object_id.as_bytes + struct.pack("!I", ActionId.TC_MEM_READ) + struct.pack("!I", MemAddresses.DEADBEEF) + struct.pack("!H", num_words) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.MODE_REPLAY: q.add_log_cmd("PLOC MPSoC: Tc mode replay") data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_REPLAY) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.MODE_IDLE: q.add_log_cmd("PLOC MPSoC: Tc mode idle") data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_IDLE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "16": q.add_log_cmd("PLOC MPSoC: Tc cam command send") cam_cmd = input("Specify cam command string: ") data = ( object_id.as_bytes + struct.pack("!I", ActionId.TC_CAM_CMD_SEND) + bytearray(cam_cmd, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "17": q.add_log_cmd("PLOC MPSoC: Set UART TX tristate") data = object_id.as_bytes + struct.pack("!I", ActionId.SET_UART_TX_TRISTATE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "18": q.add_log_cmd("PLOC MPSoC: Release UART TX") data = object_id.as_bytes + struct.pack("!I", ActionId.RELEASE_UART_TX) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.CAM_TAKE_PIC: q.add_log_cmd("PLOC MPSoC: Cam take picture") data = prepare_cam_take_pic_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.SIMPLEX_SEND_FILE: q.add_log_cmd("PLOC MPSoC: Simplex send file") data = prepare_simplex_send_file_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.DOWNLINK_DATA_MODULATE: q.add_log_cmd("PLOC MPSoC: Downlink data modulate") data = prepare_downlink_data_modulate_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.MODE_SNAPSHOT: q.add_log_cmd("PLOC MPSoC: Mode snapshot") data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_SNAPSHOT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) def generate_write_mem_command( object_id: bytes, memory_address: int, memory_data: int, mem_len: int ) -> bytearray: """This function generates the command to write to a memory address within the PLOC. :param object_id: The object id of the PlocHandler :param memory_address: The PLOC memory address where to write to. :param memory_data: The data to write to the memory address specified by the bytearray memory_address. :param mem_len: """ command = ( object_id + struct.pack("!I", ActionId.TC_MEM_WRITE) + struct.pack("!I", memory_address) + struct.pack("!H", mem_len) + struct.pack("!I", memory_data) ) return bytearray(command) def prepare_mem_read_command(object_id: bytes) -> bytearray: memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16) num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: ")) command = ( object_id + struct.pack("!I", ActionId.TC_MEM_READ) + struct.pack("!I", memory_address) + struct.pack("!H", num_words) ) return bytearray(command) def prepare_flash_base_cmd(action_id: int, object_id: bytes) -> bytearray: obc_file = get_obc_file() mpsoc_file = get_mpsoc_file() command = bytearray(object_id) command.extend(struct.pack("!I", action_id)) command.extend(obc_file.encode("utf-8")) command.append(0) command.extend(mpsoc_file.encode("utf-8")) command.append(0) return command def prepare_flash_write_cmd(object_id : bytes) -> bytearray: return prepare_flash_base_cmd(ActionId.TC_FLASH_WRITE_FULL_FILE, object_id) def prepare_flash_read_cmd(object_id: bytes) -> bytearray: cmd = prepare_flash_base_cmd(ActionId.TC_FLASH_READ_FULL_FILE, object_id) file_size = get_mpsoc_file_size() cmd.extend(struct.pack("!I", file_size)) return cmd def prepare_flash_delete_cmd(object_id: bytes) -> bytearray: file = get_mpsoc_file() command = ( object_id + struct.pack("!I", ActionId.TC_FLASH_DELETE) + file.encode("utf-8") ) return bytearray(command) def prepare_replay_start_cmd(object_id: bytes) -> bytearray: replay = int(input("Specify replay mode (0 - once, 1 - repeated): ")) command = ( object_id + struct.pack("!I", ActionId.TC_REPLAY_START) + struct.pack("!B", replay) ) return bytearray(command) def prepare_downlink_pwr_on_cmd(object_id: bytes) -> bytearray: mode = int(input("Specify JESD mode (0 - 5): ")) lane_rate = int(input("Specify lane rate (0 - 9): ")) command = ( object_id + struct.pack("!I", ActionId.TC_DOWNLINK_PWR_ON) + struct.pack("!B", mode) + struct.pack("!B", lane_rate) ) return bytearray(command) def prepare_replay_write_sequence_cmd(object_id: bytes) -> bytearray: use_decoding = int(input("Use decoding (set to 1): ")) file = get_sequence_file() command = ( object_id + struct.pack("!I", ActionId.TC_REPLAY_WRITE_SEQUENCE) + struct.pack("!B", use_decoding) + bytearray(file, "utf-8") # + bytes([0]) ) return bytearray(command) def prepare_cam_take_pic_cmd(object_id: bytes) -> bytearray: selection = input("Use default parameter? (Y/N): ") if selection is "Y" or selection is "y": filename = "0:/test" encoder_setting_y = 7 quantization_y = 0 encoder_setting_cb = 7 quantization_cb = 0 encoder_setting_cr = 7 quantization_cr = 0 bypass_compressor = 0 else: filename = input("Specify filename: ") encoder_setting_y = int(input("Specify encoderSetting_Y: ")) quantization_y = int(input("Specify quantization_Y: ")) encoder_setting_cb = int(input("Specify encoderSetting_Cb: ")) quantization_cb = int(input("Specify quantization_Cb: ")) encoder_setting_cr = int(input("Specify encoderSetting_Cr: ")) quantization_cr = int(input("Specify quantization_Cr: ")) bypass_compressor = int(input("Specify bypassCompressor: ")) command = ( object_id + struct.pack("!I", ActionId.TC_CAM_TAKE_PIC) + bytearray(filename, "utf-8") + bytes([0]) + struct.pack("!B", encoder_setting_y) + struct.pack("!Q", quantization_y) + struct.pack("!B", encoder_setting_cb) + struct.pack("!Q", quantization_cb) + struct.pack("!B", encoder_setting_cr) + struct.pack("!Q", quantization_cr) + struct.pack("!B", bypass_compressor) ) return bytearray(command) def prepare_simplex_send_file_cmd(object_id: bytes) -> bytearray: filename = input("Specify filename: ") command = ( object_id + struct.pack("!I", ActionId.TC_SIMPLEX_SEND_FILE) + bytearray(filename, "utf-8") + bytes([0]) ) return bytearray(command) def prepare_downlink_data_modulate_cmd(object_id: bytes) -> bytearray: format = int(input("Specify format: ")) src_mem_addr = int(input("Specify srcMemAddr: ")) src_mem_len = int(input("Specify srcMemLen: ")) dest_mem_addr = int(input("Specify destMemAddr: ")) command = ( object_id + struct.pack("!I", ActionId.TC_DOWNLINK_DATA_MODULATE) + struct.pack("!B", format) + struct.pack("!I", src_mem_addr) + struct.pack("!H", src_mem_len) + struct.pack("!I", dest_mem_addr) ) return bytearray(command) def get_obc_file() -> str: _LOGGER.info("Specify OBC file ") input_helper = InputHelper(FLASH_WRITE_FILE_DICT) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc MPSoC: Specify absolute name of flash file: ") else: file = FLASH_WRITE_FILE_DICT[key][1] return file def get_mpsoc_file() -> str: _LOGGER.info("Specify MPSoC file") input_helper = InputHelper(MPSOC_FILE_DICT) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc MPSoC: Specify absolute name file: ") else: file = MPSOC_FILE_DICT[key][1] return file def get_mpsoc_file_size() -> int: file_size = int(input("Specify MPSoC file size")) if file_size <= 0: raise ValueError("Invalid file size") return file_size def get_sequence_file() -> str: _LOGGER.info("Specify sequence file") input_helper = InputHelper(SEQ_FILE_DICT) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc MPSoC: Specify absolute name file: ") else: file = SEQ_FILE_DICT[key][1] return file