# -*- coding: utf-8 -*- """ @file ploc_mpsoc.py @brief Tests for commanding the MPSoC of the PLOC. The MPSoC is programmed by the ILH. @author J. Meier @date 06.03.2021 """ import struct import enum from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.object_ids import get_object_ids, PLOC_MPSOC_ID from tmtccmd.config.tmtc import ( tmtc_definitions_provider, OpCodeEntry, TmtcDefinitionWrapper, ) from tmtccmd.logging import get_console_logger from spacepackets.ecss.tc import PusTelecommand from tmtccmd.tc import service_provider from tmtccmd.tc.decorator import ServiceProviderParams from eive_tmtc.utility.input_helper import InputHelper from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes LOGGER = get_console_logger() MANUAL_INPUT = "1" flash_write_file_dict = { MANUAL_INPUT: ["manual input", ""], "2": ["/mnt/sd0/ploc/mpsoc/flash_write.bin", "/mnt/sd0/ploc/mpsoc/flash_write.bin"], } mpsoc_file_dict = { MANUAL_INPUT: ["manual input", ""], "2": ["0:/flash", "0:/flash"], } sequence_file_dict = { MANUAL_INPUT: ["manual input", ""], "2": ["0:/EM16/231", "0:/EM16/231"], "3": ["0:/EQ04/E-75", "0:/EQ04/E-75"], } CARRIAGE_RETURN = 0xD class CommandIds(enum.IntEnum): TC_MEM_WRITE = 1 TC_MEM_READ = 2 FLASH_WRITE = 9 TC_FLASH_DELETE = 10 TC_REPLAY_START = 11 TC_REPLAY_STOP = 12 TC_REPLAY_WRITE_SEQUENCE = 13 TC_DOWNLINK_PWR_ON = 14 TC_DOWNLINK_PWR_OFF = 15 OBSW_RESET_SEQ_COUNT = 50 TC_MODE_REPLAY = 16 TC_CAM_CMD_SEND = 17 TC_MODE_IDLE = 18 SET_UART_TX_TRISTATE = 20 RELEASE_UART_TX = 21 class MemAddresses(enum.IntEnum): DEADBEEF = 0x40000004 class PlocReplyIds(enum.IntEnum): TM_MEM_READ_RPT = 6 TM_CAM_CMD_RPT = 19 @tmtc_definitions_provider def add_ploc_mpsoc_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add("0", "Ploc MPSoC: Set mode off") oce.add("1", "Ploc MPSoC: Set mode on") oce.add("2", "Ploc MPSoC: Set mode normal") oce.add("3", "Ploc MPSoC: Memory write") oce.add("4", "Ploc MPSoC: Memory read") oce.add("5", "Ploc MPSoC: Flash write") oce.add("6", "Ploc MPSoC: Flash delete") oce.add("7", "Ploc MPSoC: Replay start") oce.add("8", "Ploc MPSoC: Replay stop") oce.add("9", "Ploc MPSoC: Downlink pwr on") oce.add("10", "Ploc MPSoC: Downlink pwr off") oce.add("11", "Ploc MPSoC: Replay write sequence") oce.add("12", "Ploc MPSoC: OBSW reset sequence count") oce.add("13", "Ploc MPSoC: Read DEADBEEF address") oce.add("14", "Ploc MPSoC: Mode replay") oce.add("15", "Ploc MPSoC: Mode idle") oce.add("16", "Ploc MPSoC: Tc cam command send") oce.add("17", "Ploc MPSoC: Set UART TX tristate") oce.add("18", "Ploc MPSoC: Relesase UART TX") defs.add_service(CustomServiceList.PLOC_MPSOC.value, "Ploc MPSoC", oce) @service_provider(CustomServiceList.PLOC_MPSOC) def pack_ploc_mpsoc_commands(p: ServiceProviderParams): object_id = get_object_ids().get(PLOC_MPSOC_ID) q = p.queue_helper op_code = p.op_code q.add_log_cmd( f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}" ) obyt = object_id.as_bytes if op_code == "0": q.add_log_cmd("PLOC MPSoC: Set mode off") command = pack_mode_data(obyt, Modes.OFF, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if op_code == "1": q.add_log_cmd("PLOC MPSoC: Set mode on") data = pack_mode_data(obyt, Modes.ON, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == "2": q.add_log_cmd("PLOC MPSoC: Mode Normal") data = pack_mode_data(object_id.as_bytes, Modes.NORMAL, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == "3": q.add_log_cmd("PLOC MPSoC: TC mem write test") memory_address = int( input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16 ) memory_data = int(input("PLOC MPSoC: Tc Mem Write: Type memory data: 0x"), 16) # TODO: implement variable length mem write command mem_len = 1 # 1 32-bit word data = generate_write_mem_command( object_id.as_bytes, memory_address, memory_data, mem_len ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "4": q.add_log_cmd("PLOC MPSoC: TC mem read test") data = prepare_mem_read_command(object_id=object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "5": q.add_log_cmd("PLOC MPSoC: Flash write") data = prepare_flash_write_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "6": q.add_log_cmd("PLOC MPSoC: Flash delete") data = prepare_flash_delete_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "7": q.add_log_cmd("PLOC MPSoC: Replay start") data = prepare_replay_start_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "8": q.add_log_cmd("PLOC MPSoC: Replay stop") data = object_id.as_bytes + struct.pack("!I", CommandIds.TC_REPLAY_STOP) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "9": q.add_log_cmd("PLOC MPSoC: Downlink pwr on") data = prepare_downlink_pwr_on_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "10": q.add_log_cmd("PLOC MPSoC: Downlink pwr off") data = object_id.as_bytes + struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_OFF) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "11": q.add_log_cmd("PLOC MPSoC: Replay write sequence") data = prepare_replay_write_sequence_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "12": q.add_log_cmd("PLOC MPSoC: Reset OBSW sequence count") data = object_id.as_bytes + struct.pack("!I", CommandIds.OBSW_RESET_SEQ_COUNT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "13": num_words = 1 q.add_log_cmd("PLOC MPSoC: Read DEADBEEF address") data = ( object_id.as_bytes + struct.pack("!I", CommandIds.TC_MEM_READ) + struct.pack("!I", MemAddresses.DEADBEEF) + struct.pack("!H", num_words) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "14": q.add_log_cmd("PLOC MPSoC: Tc mode replay") data = object_id.as_bytes + struct.pack("!I", CommandIds.TC_MODE_REPLAY) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "15": q.add_log_cmd("PLOC MPSoC: Tc mode idle") data = object_id.as_bytes + struct.pack("!I", CommandIds.TC_MODE_IDLE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "16": q.add_log_cmd("PLOC MPSoC: Tc cam command send") cam_cmd = input("Specify cam command string: ") data = ( object_id.as_bytes + struct.pack("!I", CommandIds.TC_CAM_CMD_SEND) + bytearray(cam_cmd, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "17": q.add_log_cmd("PLOC MPSoC: Set UART TX tristate") data = object_id.as_bytes + struct.pack("!I", CommandIds.SET_UART_TX_TRISTATE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "18": q.add_log_cmd("PLOC MPSoC: Release UART TX") data = object_id.as_bytes + struct.pack("!I", CommandIds.RELEASE_UART_TX) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) def generate_write_mem_command( object_id: bytes, memory_address: int, memory_data: int, mem_len: int ) -> bytearray: """This function generates the command to write to a memory address within the PLOC. :param object_id: The object id of the PlocHandler :param memory_address: The PLOC memory address where to write to. :param memory_data: The data to write to the memory address specified by the bytearray memory_address. :param mem_len: """ command = ( object_id + struct.pack("!I", CommandIds.TC_MEM_WRITE) + struct.pack("!I", memory_address) + struct.pack("!H", mem_len) + struct.pack("!I", memory_data) ) return bytearray(command) def prepare_mem_read_command(object_id: bytes) -> bytearray: memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16) num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: ")) command = ( object_id + struct.pack("!I", CommandIds.TC_MEM_READ) + struct.pack("!I", memory_address) + struct.pack("!H", num_words) ) return bytearray(command) def prepare_flash_write_cmd(object_id: bytes) -> bytearray: obcFile = get_obc_file() mpsocFile = get_mpsoc_file() command = ( object_id + struct.pack("!I", CommandIds.FLASH_WRITE) + bytearray(obcFile, "utf-8") + bytearray(mpsocFile, "utf-8") ) return bytearray(command) def prepare_flash_delete_cmd(object_id: bytes) -> bytearray: file = get_mpsoc_file() command = ( object_id + struct.pack("!I", CommandIds.TC_FLASH_DELETE) + bytearray(file, "utf-8") ) return bytearray(command) def prepare_replay_start_cmd(object_id: bytes) -> bytearray: replay = int(input("Specify replay mode (0 - once, 1 - repeated): ")) command = ( object_id + struct.pack("!I", CommandIds.TC_REPLAY_START) + struct.pack("!B", replay) ) return bytearray(command) def prepare_downlink_pwr_on_cmd(object_id: bytes) -> bytearray: mode = int(input("Specify JESD mode (0 - 5): ")) lane_rate = int(input("Specify lane rate (0 - 9): ")) command = ( object_id + struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_ON) + struct.pack("!B", mode) + struct.pack("!B", lane_rate) ) return bytearray(command) def prepare_replay_write_sequence_cmd(object_id: bytes) -> bytearray: null_terminator = 0 use_decoding = int(input("Use decoding (set to 1): ")) file = get_sequence_file() command = ( object_id + struct.pack("!I", CommandIds.TC_REPLAY_WRITE_SEQUENCE) + struct.pack("!B", use_decoding) + bytearray(file, "utf-8") ) return bytearray(command) def get_obc_file() -> str: LOGGER.info("Specify OBC file ") input_helper = InputHelper(flash_write_file_dict) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc MPSoC: Specify absolute name of flash file: ") else: file = flash_write_file_dict[key][1] return file def get_mpsoc_file() -> str: LOGGER.info("Specify MPSoC file") input_helper = InputHelper(mpsoc_file_dict) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc MPSoC: Specify absolute name file: ") else: file = mpsoc_file_dict[key][1] return file def get_sequence_file() -> str: LOGGER.info("Specify sequence file") input_helper = InputHelper(sequence_file_dict) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc MPSoC: Specify absolute name file: ") else: file = sequence_file_dict[key][1] return file