From b96ecce483fa26bb031dac53fd6eeb8889521093 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Mon, 21 Mar 2022 16:22:41 +0100 Subject: [PATCH] e-band control commands --- pus_tc/cmd_definitions.py | 6 +++ pus_tc/devs/ploc_mpsoc.py | 105 +++++++++++++++++++++++++++++++++----- pus_tc/devs/rad_sensor.py | 1 - 3 files changed, 99 insertions(+), 13 deletions(-) diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 9487987..89fce30 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -461,6 +461,12 @@ def add_ploc_mpsoc_cmds(cmd_dict: ServiceOpCodeDictT): "0": ("Ploc MPSoC: Memory write", {OpCodeDictKeys.TIMEOUT: 2.0}), "1": ("Ploc MPSoC: Memory read", {OpCodeDictKeys.TIMEOUT: 2.0}), "2": ("Ploc MPSoC: Flash write", {OpCodeDictKeys.TIMEOUT: 2.0}), + "3": ("Ploc MPSoC: Flash delete", {OpCodeDictKeys.TIMEOUT: 2.0}), + "4": ("Ploc MPSoC: Replay start", {OpCodeDictKeys.TIMEOUT: 2.0}), + "5": ("Ploc MPSoC: Replay stop", {OpCodeDictKeys.TIMEOUT: 2.0}), + "6": ("Ploc MPSoC: Downlink pwr on", {OpCodeDictKeys.TIMEOUT: 2.0}), + "7": ("Ploc MPSoC: Downlink pwr off", {OpCodeDictKeys.TIMEOUT: 2.0}), + "8": ("Ploc MPSoC: Replay write sequence", {OpCodeDictKeys.TIMEOUT: 2.0}), } service_ploc_mpsoc_tuple = ("Ploc MPSoC", op_code_dict_srv_ploc_mpsoc) cmd_dict[CustomServiceList.PLOC_MPSOC.value] = service_ploc_mpsoc_tuple diff --git a/pus_tc/devs/ploc_mpsoc.py b/pus_tc/devs/ploc_mpsoc.py index ca2f188..0d82cba 100644 --- a/pus_tc/devs/ploc_mpsoc.py +++ b/pus_tc/devs/ploc_mpsoc.py @@ -7,27 +7,39 @@ @date 06.03.2021 """ import struct +import enum from tmtccmd.config.definitions import QueueCommands from tmtccmd.utility.logger import get_console_logger from tmtccmd.tc.packer import TcQueueT from spacepackets.ecss.tc import PusTelecommand +from utility.input_helper import InputHelper LOGGER = get_console_logger() MANUAL_INPUT = "3" -flash_write_dict = { +flash_write_file_dict = { "1": ["q7s test file", "/mnt/sd0/ploc-mpsoc/flashwrite.bin"], "2": ["te0720-1cfa test file", "/mnt/sd0/ploc-mpsoc/flashwrite.bin"], MANUAL_INPUT: ["manual input", ""], } +mpsoc_file_dict = { + MANUAL_INPUT: ["manual input", ""], +} -class CommandIds: - tc_mem_write = 1 - tc_mem_read = 2 - flash_write = 3 + +class CommandIds(enum.IntEnum): + TC_MEM_WRITE = 1 + TC_MEM_READ = 2 + FLASH_WRITE = 9 + TC_FLASH_DELETE = 10 + TC_REPLAY_START = 11 + TC_REPLAY_STOP = 12 + TC_REPLAY_WRITE_SEQUENCE = 13 + TC_DOWNLINK_PWR_ON = 14 + TC_DOWNLINK_PWR_OFF = 15 class PlocReplyIds: @@ -65,6 +77,36 @@ def pack_ploc_mpsoc_commands( command = prepare_flash_write_cmd(object_id) command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "3": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Flash delete")) + command = prepare_flash_delete_cmd(object_id) + command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "4": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay start")) + command = prepare_replay_start_cmd(object_id) + command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "5": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay stop")) + command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_STOP) + command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "6": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr on")) + command = prepare_downlink_pwr_on_cmd(object_id) + command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "7": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr off")) + command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_OFF) + command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "8": + tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay write sequence")) + command = prepare_replay_write_sequence_cmd(object_id) + command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) return tc_queue @@ -79,7 +121,7 @@ def generate_write_mem_command( """ command = ( object_id - + struct.pack('!I', CommandIds.tc_mem_write) + + struct.pack('!I', CommandIds.TC_MEM_WRITE) + struct.pack('!I', memory_address) + struct.pack('!H', mem_len) + struct.pack("!I", memory_data) @@ -91,7 +133,7 @@ def prepare_mem_read_command(object_id: bytearray) -> bytearray: memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16) num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: ")) command = ( - object_id + struct.pack('!I', CommandIds.tc_mem_read) + struct.pack("!I", memory_address) + struct.pack( + object_id + struct.pack('!I', CommandIds.TC_MEM_READ) + struct.pack("!I", memory_address) + struct.pack( '!H', num_words) ) return command @@ -99,16 +141,55 @@ def prepare_mem_read_command(object_id: bytearray) -> bytearray: def prepare_flash_write_cmd(object_id: bytearray) -> bytearray: file = get_flash_write_file() - command = object_id + struct.pack('I', CommandIds.flash_write) + bytearray(file, 'utf-8') + command = object_id + struct.pack('!I', CommandIds.FLASH_WRITE) + bytearray(file, 'utf-8') + return command + + +def prepare_flash_delete_cmd(object_id: bytearray) -> bytearray: + file = get_mpsoc_file() + command = object_id + struct.pack('!I', CommandIds.TC_FLASH_DELETE) + bytearray(file, 'utf-8') + return command + + +def prepare_replay_start_cmd(object_id: bytearray) -> bytearray: + replay = int(input("Specify replay mode (0 - repeated, 1 - once): ")) + command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_START) + struct.pack('!B', replay) + return command + + +def prepare_downlink_pwr_on_cmd(object_id: bytearray) -> bytearray: + mode = int(input("Specify JESD mode (0 - 5): ")) + lane_rate = int(input("Specify lane rate (0 - 9): ")) + command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_ON) + struct.pack('!B', mode) \ + + struct.pack('!B', lane_rate) + return command + + +def prepare_replay_write_sequence_cmd(object_id: bytearray) -> bytearray: + use_decoding = int(input("Use decoding (set to 1): ")) + file = get_mpsoc_file() + command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_WRITE_SEQUENCE) + struct.pack('!B', use_decoding) + \ + bytearray(file, 'utf-8') return command def get_flash_write_file() -> str: - LOGGER.info("Specify json file") - input_helper = InputHelper(json_dict) + LOGGER.info("Specify flash file") + input_helper = InputHelper(flash_write_file_dict) key = input_helper.get_key() if key == MANUAL_INPUT: - file = input("Ploc MPSoC: Specify absolute name of flash write file: ") + file = input("Ploc MPSoC: Specify absolute name of flash file: ") else: - file = json_dict[key][1] + file = flash_write_file_dict[key][1] + return file + + +def get_mpsoc_file() -> str: + LOGGER.info("Specify mpsoc file") + input_helper = InputHelper(mpsoc_file_dict) + key = input_helper.get_key() + if key == MANUAL_INPUT: + file = input("Ploc MPSoC: Specify absolute name file: ") + else: + file = mpsoc_file_dict[key][1] return file diff --git a/pus_tc/devs/rad_sensor.py b/pus_tc/devs/rad_sensor.py index 8ea77d1..2dbee43 100644 --- a/pus_tc/devs/rad_sensor.py +++ b/pus_tc/devs/rad_sensor.py @@ -5,7 +5,6 @@ @author J. Meier @date 01.07.2021 """ -import struct from tmtccmd.config.definitions import QueueCommands from tmtccmd.tc.packer import TcQueueT