From 5d17a1c2b33fee4ed236f871c0de01e9d462b684 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 4 May 2023 11:27:12 +0200 Subject: [PATCH] new mpsoc commands --- CHANGELOG.md | 1 + eive_tmtc/tmtc/payload/ploc_mpsoc.py | 204 ++++++++++++++++----------- 2 files changed, 121 insertions(+), 84 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f294dd..f0a0ec2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ list yields a list of all related PRs for each release. ## Added - Event handling for reboot counter events. +- Start adding new MPSoC commands, improve MPSoC commanding module a bit. # [v3.1.1] 2023-04-17 diff --git a/eive_tmtc/tmtc/payload/ploc_mpsoc.py b/eive_tmtc/tmtc/payload/ploc_mpsoc.py index 2ba83fa..82fd253 100644 --- a/eive_tmtc/tmtc/payload/ploc_mpsoc.py +++ b/eive_tmtc/tmtc/payload/ploc_mpsoc.py @@ -23,37 +23,36 @@ from tmtccmd.tc.decorator import ServiceProviderParams from eive_tmtc.utility.input_helper import InputHelper from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode - _LOGGER = logging.getLogger(__name__) MANUAL_INPUT = "1" -flash_write_file_dict = { - MANUAL_INPUT: ["manual input", ""], - "2": ["/mnt/sd0/ploc/mpsoc/flash_write.bin", "/mnt/sd0/ploc/mpsoc/flash_write.bin"], +FLASH_WRITE_FILE_DICT = { + MANUAL_INPUT: ("manual input", ""), + "2": ("/mnt/sd0/ploc/mpsoc/flash_write.bin", "/mnt/sd0/ploc/mpsoc/flash_write.bin"), } -mpsoc_file_dict = { - MANUAL_INPUT: ["manual input", ""], - "2": ["0:/flash", "0:/flash"], +MPSOC_FILE_DICT = { + MANUAL_INPUT: ("manual input", ""), + "2": ("0:/flash", "0:/flash"), } SEQ_FILE_NAMES = ["0:/EM16/231", "0:/EQ04/E-75", "0:/EQ01/E130"] SEQ_FILE_DICT = { - MANUAL_INPUT: ["manual input", ""], - "2": [f"16QRM On Carrier 200 MBd ({SEQ_FILE_NAMES[0]})", f"{SEQ_FILE_NAMES[0]}"], - "3": [f"QPSK On Carrier 780 MBd ({SEQ_FILE_NAMES[1]})", f"{SEQ_FILE_NAMES[1]}"], - "4": [f"Maximum Bandwidth QPSK ({SEQ_FILE_NAMES[2]})", f"{SEQ_FILE_NAMES[2]}"], + MANUAL_INPUT: ("manual input", ""), + "2": (f"16QRM On Carrier 200 MBd ({SEQ_FILE_NAMES[0]})", f"{SEQ_FILE_NAMES[0]}"), + "3": (f"QPSK On Carrier 780 MBd ({SEQ_FILE_NAMES[1]})", f"{SEQ_FILE_NAMES[1]}"), + "4": (f"Maximum Bandwidth QPSK ({SEQ_FILE_NAMES[2]})", f"{SEQ_FILE_NAMES[2]}"), } CARRIAGE_RETURN = 0xD -class CommandId(enum.IntEnum): +class ActionId(enum.IntEnum): TC_MEM_WRITE = 1 TC_MEM_READ = 2 - FLASH_WRITE = 9 + TC_FLASH_WRITE_FULL_FILE = 9 TC_FLASH_DELETE = 10 TC_REPLAY_START = 11 TC_REPLAY_STOP = 12 @@ -70,22 +69,32 @@ class CommandId(enum.IntEnum): TC_SIMPLEX_SEND_FILE = 23 TC_DOWNLINK_DATA_MODULATE = 24 TC_MODE_SNAPSHOT = 25 + TC_FLASH_DIR_GET_CONTENT = 28 + TC_FLASH_READ_FULL_FILE = 30 class OpCode: - ON = ["on"] - OFF = ["off"] - NORMAL = ["normal"] - VERIFY_BOOT = ["verify_boot"] - MODE_REPLAY = ["mode_replay"] - MODE_IDLE = ["mode_idle"] - REPLAY_WRITE_SEQ = ["replay_write"] - DOWNLINK_PWR_ON = ["downlink_pwr_on"] - REPLAY_START = ["replay_start"] - CAM_TAKE_PIC = ["cam_take_pic"] - SIMPLEX_SEND_FILE = ["simplex_send_file"] - DOWNLINK_DATA_MODULATE = ["downlink_data_modulate"] - MODE_SNAPSHOT = ["mode_snapshot"] + ON = "on" + OFF = "off" + NORMAL = "normal" + VERIFY_BOOT = "verify_boot" + MODE_REPLAY = "mode_replay" + MODE_IDLE = "mode_idle" + REPLAY_WRITE_SEQ = "replay_write" + DOWNLINK_PWR_ON = "downlink_pwr_on" + MEM_WRITE = "memory_write" + MEM_READ = "memory_read" + DOWNLINK_PWR_OFF = "downlink_pwr_off" + FLASH_WRITE_FILE = "flash_write_file" + FLASH_READ_FILE = "flash_read_file" + FLASH_DELETE_FILE = "flash_delete_file" + FLASH_GET_DIR_CONTENT = "flash_get_dir_content" + REPLAY_STOP = "replay_stop" + REPLAY_START = "replay_start" + CAM_TAKE_PIC = "cam_take_pic" + SIMPLEX_SEND_FILE = "simplex_send_file" + DOWNLINK_DATA_MODULATE = "downlink_data_modulate" + MODE_SNAPSHOT = "mode_snapshot" class Info: @@ -94,12 +103,18 @@ class Info: NORMAL = "Normal" VERIFY_BOOT = "Verify boot by reading 0xdeadbeef from DEADBEEF address" MODE_REPLAY = "Switch to REPLAY mode" + REPLAY_STOP = "Stop Replay" MODE_IDLE = "Switch to IDLE mode" REPLAY_WRITE_SEQ = "Replay write sequence" DOWNLINK_PWR_ON = "Downlink Power On" + DOWNLINK_PWR_OFF = "Downlink Power Off" REPLAY_START = "Replay Start" CAM_TAKE_PIC = "Cam Take Picture" SIMPLEX_SEND_FILE = "Simplex Send File" + FLASH_READ_FILE = "Copy file from MPSoC to OBC" + FLASH_WRITE_FILE = "Copy file from OBC to MPSoC" + FLASH_DELETE_FILE = "Delete file on MPSoC" + FLASH_GET_DIR_CONTENT = "Get flash directory content on MPSoC" DOWNLINK_DATA_MODULATE = "Downlink data modulate" MODE_SNAPSHOT = "Mode Snapshot" @@ -119,17 +134,18 @@ def add_ploc_mpsoc_cmds(defs: TmtcDefinitionWrapper): oce.add(OpCode.OFF, Info.OFF) oce.add(OpCode.ON, Info.ON) oce.add(OpCode.NORMAL, Info.NORMAL) - oce.add("3", "Ploc MPSoC: Memory write") - oce.add("4", "Ploc MPSoC: Memory read") - oce.add("5", "Ploc MPSoC: Flash write") - oce.add("6", "Ploc MPSoC: Flash delete") + oce.add(OpCode.MEM_WRITE, "Ploc MPSoC: Memory write") + oce.add(OpCode.MEM_READ, "Ploc MPSoC: Memory read") + oce.add(OpCode.FLASH_WRITE_FILE, Info.FLASH_WRITE_FILE) + oce.add(OpCode.FLASH_READ_FILE, Info.FLASH_READ_FILE) + oce.add(OpCode.FLASH_DELETE_FILE, Info.FLASH_DELETE_FILE) oce.add(OpCode.REPLAY_START, Info.REPLAY_START) - oce.add("8", "Ploc MPSoC: Replay stop") + oce.add(OpCode.REPLAY_STOP, Info.REPLAY_STOP) oce.add(OpCode.DOWNLINK_PWR_ON, Info.DOWNLINK_PWR_ON) - oce.add("10", "Ploc MPSoC: Downlink pwr off") + oce.add(OpCode.DOWNLINK_PWR_OFF, Info.DOWNLINK_PWR_OFF) oce.add(OpCode.REPLAY_WRITE_SEQ, Info.REPLAY_WRITE_SEQ) oce.add("12", "Ploc MPSoC: OBSW reset sequence count") - oce.add(OpCode.VERIFY_BOOT, "Ploc MPSoC: Read DEADBEEF address") + oce.add(OpCode.VERIFY_BOOT, Info.VERIFY_BOOT) oce.add(OpCode.MODE_REPLAY, Info.MODE_REPLAY) oce.add(OpCode.MODE_IDLE, Info.MODE_IDLE) oce.add("16", "Ploc MPSoC: Tc cam command send") @@ -152,19 +168,19 @@ def pack_ploc_mpsoc_commands(p: ServiceProviderParams): f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}" ) obyt = object_id.as_bytes - if op_code in OpCode.OFF: + if op_code == OpCode.OFF: q.add_log_cmd(f"{prefix}: {Info.OFF}") command = pack_mode_data(obyt, Mode.OFF, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) - if op_code in OpCode.ON: + if op_code == OpCode.ON: q.add_log_cmd(f"{prefix}: {Info.ON}") data = pack_mode_data(obyt, Mode.ON, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) - if op_code in OpCode.NORMAL: + if op_code == OpCode.NORMAL: q.add_log_cmd(f"{prefix}: {Info.NORMAL}") data = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) - if op_code == "3": + if op_code == OpCode.MEM_WRITE: q.add_log_cmd("PLOC MPSoC: TC mem write test") memory_address = int( input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16 @@ -180,11 +196,15 @@ def pack_ploc_mpsoc_commands(p: ServiceProviderParams): q.add_log_cmd("PLOC MPSoC: TC mem read test") data = prepare_mem_read_command(object_id=object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == "5": - q.add_log_cmd("PLOC MPSoC: Flash write") + if op_code == OpCode.FLASH_WRITE_FILE: + q.add_log_cmd(f"{prefix}: {Info.FLASH_WRITE_FILE}") data = prepare_flash_write_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == "6": + if op_code == OpCode.FLASH_READ_FILE: + q.add_log_cmd(f"{prefix}: {Info.FLASH_READ_FILE}") + data = prepare_flash_read_cmd(object_id.as_bytes) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) + if op_code == OpCode.FLASH_DELETE_FILE: q.add_log_cmd("PLOC MPSoC: Flash delete") data = prepare_flash_delete_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) @@ -192,76 +212,76 @@ def pack_ploc_mpsoc_commands(p: ServiceProviderParams): q.add_log_cmd(f"{prefix}: {Info.REPLAY_START}") data = prepare_replay_start_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == "8": + if op_code == OpCode.REPLAY_STOP: q.add_log_cmd("PLOC MPSoC: Replay stop") - data = object_id.as_bytes + struct.pack("!I", CommandId.TC_REPLAY_STOP) + data = object_id.as_bytes + struct.pack("!I", ActionId.TC_REPLAY_STOP) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code in OpCode.DOWNLINK_PWR_ON: + if op_code == OpCode.DOWNLINK_PWR_ON: q.add_log_cmd(f"{prefix}: {OpCode.DOWNLINK_PWR_ON}") data = prepare_downlink_pwr_on_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == "10": + if op_code == OpCode.DOWNLINK_PWR_OFF: q.add_log_cmd("PLOC MPSoC: Downlink pwr off") - data = object_id.as_bytes + struct.pack("!I", CommandId.TC_DOWNLINK_PWR_OFF) + data = object_id.as_bytes + struct.pack("!I", ActionId.TC_DOWNLINK_PWR_OFF) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code in OpCode.REPLAY_WRITE_SEQ: + if op_code == OpCode.REPLAY_WRITE_SEQ: q.add_log_cmd(f"{prefix}: {Info.REPLAY_WRITE_SEQ}") data = prepare_replay_write_sequence_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "12": q.add_log_cmd("PLOC MPSoC: Reset OBSW sequence count") - data = object_id.as_bytes + struct.pack("!I", CommandId.OBSW_RESET_SEQ_COUNT) + data = object_id.as_bytes + struct.pack("!I", ActionId.OBSW_RESET_SEQ_COUNT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code in OpCode.VERIFY_BOOT: + if op_code == OpCode.VERIFY_BOOT: num_words = 1 q.add_log_cmd(f"{prefix} {Info.VERIFY_BOOT}") data = ( object_id.as_bytes - + struct.pack("!I", CommandId.TC_MEM_READ) + + struct.pack("!I", ActionId.TC_MEM_READ) + struct.pack("!I", MemAddresses.DEADBEEF) + struct.pack("!H", num_words) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code in OpCode.MODE_REPLAY: + if op_code == OpCode.MODE_REPLAY: q.add_log_cmd("PLOC MPSoC: Tc mode replay") - data = object_id.as_bytes + struct.pack("!I", CommandId.TC_MODE_REPLAY) + data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_REPLAY) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code in OpCode.MODE_IDLE: + if op_code == OpCode.MODE_IDLE: q.add_log_cmd("PLOC MPSoC: Tc mode idle") - data = object_id.as_bytes + struct.pack("!I", CommandId.TC_MODE_IDLE) + data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_IDLE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "16": q.add_log_cmd("PLOC MPSoC: Tc cam command send") cam_cmd = input("Specify cam command string: ") data = ( object_id.as_bytes - + struct.pack("!I", CommandId.TC_CAM_CMD_SEND) + + struct.pack("!I", ActionId.TC_CAM_CMD_SEND) + bytearray(cam_cmd, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "17": q.add_log_cmd("PLOC MPSoC: Set UART TX tristate") - data = object_id.as_bytes + struct.pack("!I", CommandId.SET_UART_TX_TRISTATE) + data = object_id.as_bytes + struct.pack("!I", ActionId.SET_UART_TX_TRISTATE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "18": q.add_log_cmd("PLOC MPSoC: Release UART TX") - data = object_id.as_bytes + struct.pack("!I", CommandId.RELEASE_UART_TX) + data = object_id.as_bytes + struct.pack("!I", ActionId.RELEASE_UART_TX) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code in OpCode.CAM_TAKE_PIC: + if op_code == OpCode.CAM_TAKE_PIC: q.add_log_cmd("PLOC MPSoC: Cam take picture") data = prepare_cam_take_pic_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code in OpCode.SIMPLEX_SEND_FILE: + if op_code == OpCode.SIMPLEX_SEND_FILE: q.add_log_cmd("PLOC MPSoC: Simplex send file") data = prepare_simplex_send_file_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code in OpCode.DOWNLINK_DATA_MODULATE: + if op_code == OpCode.DOWNLINK_DATA_MODULATE: q.add_log_cmd("PLOC MPSoC: Downlink data modulate") data = prepare_downlink_data_modulate_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code in OpCode.MODE_SNAPSHOT: + if op_code == OpCode.MODE_SNAPSHOT: q.add_log_cmd("PLOC MPSoC: Mode snapshot") - data = object_id.as_bytes + struct.pack("!I", CommandId.TC_MODE_SNAPSHOT) + data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_SNAPSHOT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) @@ -278,7 +298,7 @@ def generate_write_mem_command( """ command = ( object_id - + struct.pack("!I", CommandId.TC_MEM_WRITE) + + struct.pack("!I", ActionId.TC_MEM_WRITE) + struct.pack("!I", memory_address) + struct.pack("!H", mem_len) + struct.pack("!I", memory_data) @@ -291,31 +311,40 @@ def prepare_mem_read_command(object_id: bytes) -> bytearray: num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: ")) command = ( object_id - + struct.pack("!I", CommandId.TC_MEM_READ) + + struct.pack("!I", ActionId.TC_MEM_READ) + struct.pack("!I", memory_address) + struct.pack("!H", num_words) ) return bytearray(command) -def prepare_flash_write_cmd(object_id: bytes) -> bytearray: +def prepare_flash_base_cmd(action_id: int, object_id: bytes) -> bytearray: obc_file = get_obc_file() mpsoc_file = get_mpsoc_file() - command = ( - object_id - + struct.pack("!I", CommandId.FLASH_WRITE) - + bytearray(obc_file, "utf-8") - + bytearray(mpsoc_file, "utf-8") - ) - return bytearray(command) + command = bytearray(object_id) + command.extend(struct.pack("!I", action_id)) + command.extend(obc_file.encode("utf-8")) + command.append(0) + command.extend(mpsoc_file.encode("utf-8")) + command.append(0) + return command + + +def prepare_flash_write_cmd(object_id : bytes) -> bytearray: + return prepare_flash_base_cmd(ActionId.TC_FLASH_WRITE_FULL_FILE, object_id) + + +def prepare_flash_read_cmd(object_id: bytes) -> bytearray: + cmd = prepare_flash_base_cmd(ActionId.TC_FLASH_READ_FULL_FILE, object_id) + file_size = get_mpsoc_file_size() + cmd.extend(struct.pack("!I", file_size)) + return cmd def prepare_flash_delete_cmd(object_id: bytes) -> bytearray: file = get_mpsoc_file() command = ( - object_id - + struct.pack("!I", CommandId.TC_FLASH_DELETE) - + bytearray(file, "utf-8") + object_id + struct.pack("!I", ActionId.TC_FLASH_DELETE) + file.encode("utf-8") ) return bytearray(command) @@ -324,7 +353,7 @@ def prepare_replay_start_cmd(object_id: bytes) -> bytearray: replay = int(input("Specify replay mode (0 - once, 1 - repeated): ")) command = ( object_id - + struct.pack("!I", CommandId.TC_REPLAY_START) + + struct.pack("!I", ActionId.TC_REPLAY_START) + struct.pack("!B", replay) ) return bytearray(command) @@ -335,7 +364,7 @@ def prepare_downlink_pwr_on_cmd(object_id: bytes) -> bytearray: lane_rate = int(input("Specify lane rate (0 - 9): ")) command = ( object_id - + struct.pack("!I", CommandId.TC_DOWNLINK_PWR_ON) + + struct.pack("!I", ActionId.TC_DOWNLINK_PWR_ON) + struct.pack("!B", mode) + struct.pack("!B", lane_rate) ) @@ -347,7 +376,7 @@ def prepare_replay_write_sequence_cmd(object_id: bytes) -> bytearray: file = get_sequence_file() command = ( object_id - + struct.pack("!I", CommandId.TC_REPLAY_WRITE_SEQUENCE) + + struct.pack("!I", ActionId.TC_REPLAY_WRITE_SEQUENCE) + struct.pack("!B", use_decoding) + bytearray(file, "utf-8") # + bytes([0]) @@ -377,7 +406,7 @@ def prepare_cam_take_pic_cmd(object_id: bytes) -> bytearray: bypass_compressor = int(input("Specify bypassCompressor: ")) command = ( object_id - + struct.pack("!I", CommandId.TC_CAM_TAKE_PIC) + + struct.pack("!I", ActionId.TC_CAM_TAKE_PIC) + bytearray(filename, "utf-8") + bytes([0]) + struct.pack("!B", encoder_setting_y) @@ -395,7 +424,7 @@ def prepare_simplex_send_file_cmd(object_id: bytes) -> bytearray: filename = input("Specify filename: ") command = ( object_id - + struct.pack("!I", CommandId.TC_SIMPLEX_SEND_FILE) + + struct.pack("!I", ActionId.TC_SIMPLEX_SEND_FILE) + bytearray(filename, "utf-8") + bytes([0]) ) @@ -409,7 +438,7 @@ def prepare_downlink_data_modulate_cmd(object_id: bytes) -> bytearray: dest_mem_addr = int(input("Specify destMemAddr: ")) command = ( object_id - + struct.pack("!I", CommandId.TC_DOWNLINK_DATA_MODULATE) + + struct.pack("!I", ActionId.TC_DOWNLINK_DATA_MODULATE) + struct.pack("!B", format) + struct.pack("!I", src_mem_addr) + struct.pack("!H", src_mem_len) @@ -420,26 +449,33 @@ def prepare_downlink_data_modulate_cmd(object_id: bytes) -> bytearray: def get_obc_file() -> str: _LOGGER.info("Specify OBC file ") - input_helper = InputHelper(flash_write_file_dict) + input_helper = InputHelper(FLASH_WRITE_FILE_DICT) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc MPSoC: Specify absolute name of flash file: ") else: - file = flash_write_file_dict[key][1] + file = FLASH_WRITE_FILE_DICT[key][1] return file def get_mpsoc_file() -> str: _LOGGER.info("Specify MPSoC file") - input_helper = InputHelper(mpsoc_file_dict) + input_helper = InputHelper(MPSOC_FILE_DICT) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc MPSoC: Specify absolute name file: ") else: - file = mpsoc_file_dict[key][1] + file = MPSOC_FILE_DICT[key][1] return file +def get_mpsoc_file_size() -> int: + file_size = int(input("Specify MPSoC file size")) + if file_size <= 0: + raise ValueError("Invalid file size") + return file_size + + def get_sequence_file() -> str: _LOGGER.info("Specify sequence file") input_helper = InputHelper(SEQ_FILE_DICT)