# -*- coding: utf-8 -*- """ @file ploc_mpsoc.py @brief Tests for commanding the MPSoC of the PLOC. The MPSoC is programmed by the ILH. @author J. Meier @date 06.03.2021 """ import struct import enum from tmtccmd.config.definitions import QueueCommands from tmtccmd.logging import get_console_logger from tmtccmd.tc.packer import TcQueueT from spacepackets.ecss.tc import PusTelecommand from utility.input_helper import InputHelper from tmtccmd.tc.service_200_mode import pack_mode_data, Modes LOGGER = get_console_logger() MANUAL_INPUT = "1" flash_write_file_dict = { MANUAL_INPUT: ["manual input", ""], "2": ["/mnt/sd0/ploc/mpsoc/flash_write.bin", "/mnt/sd0/ploc/mpsoc/flash_write.bin"], } mpsoc_file_dict = { MANUAL_INPUT: ["manual input", ""], "2": ["0:/flash", "0:/flash"], } sequence_file_dict = { MANUAL_INPUT: ["manual input", ""], "2": ["0:/EM16/231", "0:/EM16/231"], "3": ["0:/EQ04/E-75", "0:/EQ04/E-75"], } CARRIAGE_RETURN = 0xD class CommandIds(enum.IntEnum): TC_MEM_WRITE = 1 TC_MEM_READ = 2 FLASH_WRITE = 9 TC_FLASH_DELETE = 10 TC_REPLAY_START = 11 TC_REPLAY_STOP = 12 TC_REPLAY_WRITE_SEQUENCE = 13 TC_DOWNLINK_PWR_ON = 14 TC_DOWNLINK_PWR_OFF = 15 OBSW_RESET_SEQ_COUNT = 50 TC_MODE_REPLAY = 16 TC_CAM_CMD_SEND = 17 TC_MODE_IDLE = 18 class MemAddresses(enum.IntEnum): DEADBEEF = 0x40000004 class PlocReplyIds: tm_mem_read_report = 6 def pack_ploc_mpsoc_commands( object_id: bytearray, tc_queue: TcQueueT, op_code: str ) -> TcQueueT: tc_queue.appendleft( ( QueueCommands.PRINT, "Generate command for PLOC MPSoC with object id: 0x" + object_id.hex(), ) ) if op_code == "0": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Set mode off")) command = pack_mode_data(object_id, Modes.OFF, 0) command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "1": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Set mode on")) command = pack_mode_data(object_id, Modes.ON, 0) command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "2": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Mode Normal")) command = pack_mode_data(object_id, Modes.NORMAL, 0) command = PusTelecommand(service=200, subservice=1, ssc=11, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "3": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: TC mem write test")) memory_address = int( input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16 ) memory_data = int(input("PLOC MPSoC: Tc Mem Write: Type memory data: 0x"), 16) # TODO: implement variable length mem write command mem_len = 1 # 1 32-bit word command = generate_write_mem_command( object_id, memory_address, memory_data, mem_len ) command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "4": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: TC mem read test")) command = prepare_mem_read_command(object_id) command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "5": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Flash write")) command = prepare_flash_write_cmd(object_id) command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "6": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Flash delete")) command = prepare_flash_delete_cmd(object_id) command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "7": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay start")) command = prepare_replay_start_cmd(object_id) command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "8": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay stop")) command = object_id + struct.pack("!I", CommandIds.TC_REPLAY_STOP) command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "9": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr on")) command = prepare_downlink_pwr_on_cmd(object_id) command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "10": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr off")) command = object_id + struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_OFF) command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "11": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay write sequence")) command = prepare_replay_write_sequence_cmd(object_id) command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "12": tc_queue.appendleft( (QueueCommands.PRINT, "PLOC MPSoC: Reset OBSW sequence count") ) command = object_id + struct.pack("!I", CommandIds.OBSW_RESET_SEQ_COUNT) command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "13": num_words = 1 tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Read DEADBEEF address")) command = ( object_id + struct.pack("!I", CommandIds.TC_MEM_READ) + struct.pack("!I", MemAddresses.DEADBEEF) + struct.pack("!H", num_words) ) command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "14": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Tc mode replay")) command = object_id + struct.pack("!I", CommandIds.TC_MODE_REPLAY) command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "15": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Tc mode idle")) command = object_id + struct.pack("!I", CommandIds.TC_MODE_IDLE) command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "16": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Tc cam command send")) cam_cmd = input("Specify cam command string: ") command = ( object_id + struct.pack("!I", CommandIds.TC_CAM_CMD_SEND) + bytearray(cam_cmd, "utf-8") ) command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) return tc_queue def generate_write_mem_command( object_id: bytearray, memory_address: int, memory_data: int, mem_len: int ) -> bytearray: """This function generates the command to write to a memory address within the PLOC @param object_id The object id of the PlocHandler @param memory_address The PLOC memory address where to write to. @param memory_data The data to write to the memory address specified by the bytearray memory_address. """ command = ( object_id + struct.pack("!I", CommandIds.TC_MEM_WRITE) + struct.pack("!I", memory_address) + struct.pack("!H", mem_len) + struct.pack("!I", memory_data) ) return command def prepare_mem_read_command(object_id: bytearray) -> bytearray: memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16) num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: ")) command = ( object_id + struct.pack("!I", CommandIds.TC_MEM_READ) + struct.pack("!I", memory_address) + struct.pack("!H", num_words) ) return command def prepare_flash_write_cmd(object_id: bytearray) -> bytearray: obcFile = get_obc_file() mpsocFile = get_mpsoc_file() command = ( object_id + struct.pack("!I", CommandIds.FLASH_WRITE) + bytearray(obcFile, "utf-8") + bytearray(mpsocFile, "utf-8") ) return command def prepare_flash_delete_cmd(object_id: bytearray) -> bytearray: file = get_mpsoc_file() command = ( object_id + struct.pack("!I", CommandIds.TC_FLASH_DELETE) + bytearray(file, "utf-8") ) return command def prepare_replay_start_cmd(object_id: bytearray) -> bytearray: replay = int(input("Specify replay mode (0 - once, 1 - repeated): ")) command = ( object_id + struct.pack("!I", CommandIds.TC_REPLAY_START) + struct.pack("!B", replay) ) return command def prepare_downlink_pwr_on_cmd(object_id: bytearray) -> bytearray: mode = int(input("Specify JESD mode (0 - 5): ")) lane_rate = int(input("Specify lane rate (0 - 9): ")) command = ( object_id + struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_ON) + struct.pack("!B", mode) + struct.pack("!B", lane_rate) ) return command def prepare_replay_write_sequence_cmd(object_id: bytearray) -> bytearray: null_terminator = 0 use_decoding = int(input("Use decoding (set to 1): ")) file = get_sequence_file() command = ( object_id + struct.pack("!I", CommandIds.TC_REPLAY_WRITE_SEQUENCE) + struct.pack("!B", use_decoding) + bytearray(file, "utf-8") ) return command def get_obc_file() -> str: LOGGER.info("Specify OBC file ") input_helper = InputHelper(flash_write_file_dict) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc MPSoC: Specify absolute name of flash file: ") else: file = flash_write_file_dict[key][1] return file def get_mpsoc_file() -> str: LOGGER.info("Specify MPSoC file") input_helper = InputHelper(mpsoc_file_dict) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc MPSoC: Specify absolute name file: ") else: file = mpsoc_file_dict[key][1] return file def get_sequence_file() -> str: LOGGER.info("Specify sequence file") input_helper = InputHelper(sequence_file_dict) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc MPSoC: Specify absolute name file: ") else: file = sequence_file_dict[key][1] return file