e-band control commands
This commit is contained in:
parent
1af59ea7a5
commit
b96ecce483
@ -461,6 +461,12 @@ def add_ploc_mpsoc_cmds(cmd_dict: ServiceOpCodeDictT):
|
||||
"0": ("Ploc MPSoC: Memory write", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"1": ("Ploc MPSoC: Memory read", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"2": ("Ploc MPSoC: Flash write", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"3": ("Ploc MPSoC: Flash delete", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"4": ("Ploc MPSoC: Replay start", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"5": ("Ploc MPSoC: Replay stop", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"6": ("Ploc MPSoC: Downlink pwr on", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"7": ("Ploc MPSoC: Downlink pwr off", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
"8": ("Ploc MPSoC: Replay write sequence", {OpCodeDictKeys.TIMEOUT: 2.0}),
|
||||
}
|
||||
service_ploc_mpsoc_tuple = ("Ploc MPSoC", op_code_dict_srv_ploc_mpsoc)
|
||||
cmd_dict[CustomServiceList.PLOC_MPSOC.value] = service_ploc_mpsoc_tuple
|
||||
|
@ -7,27 +7,39 @@
|
||||
@date 06.03.2021
|
||||
"""
|
||||
import struct
|
||||
import enum
|
||||
|
||||
from tmtccmd.config.definitions import QueueCommands
|
||||
from tmtccmd.utility.logger import get_console_logger
|
||||
from tmtccmd.tc.packer import TcQueueT
|
||||
from spacepackets.ecss.tc import PusTelecommand
|
||||
from utility.input_helper import InputHelper
|
||||
|
||||
LOGGER = get_console_logger()
|
||||
|
||||
MANUAL_INPUT = "3"
|
||||
|
||||
flash_write_dict = {
|
||||
flash_write_file_dict = {
|
||||
"1": ["q7s test file", "/mnt/sd0/ploc-mpsoc/flashwrite.bin"],
|
||||
"2": ["te0720-1cfa test file", "/mnt/sd0/ploc-mpsoc/flashwrite.bin"],
|
||||
MANUAL_INPUT: ["manual input", ""],
|
||||
}
|
||||
|
||||
mpsoc_file_dict = {
|
||||
MANUAL_INPUT: ["manual input", ""],
|
||||
}
|
||||
|
||||
class CommandIds:
|
||||
tc_mem_write = 1
|
||||
tc_mem_read = 2
|
||||
flash_write = 3
|
||||
|
||||
class CommandIds(enum.IntEnum):
|
||||
TC_MEM_WRITE = 1
|
||||
TC_MEM_READ = 2
|
||||
FLASH_WRITE = 9
|
||||
TC_FLASH_DELETE = 10
|
||||
TC_REPLAY_START = 11
|
||||
TC_REPLAY_STOP = 12
|
||||
TC_REPLAY_WRITE_SEQUENCE = 13
|
||||
TC_DOWNLINK_PWR_ON = 14
|
||||
TC_DOWNLINK_PWR_OFF = 15
|
||||
|
||||
|
||||
class PlocReplyIds:
|
||||
@ -65,6 +77,36 @@ def pack_ploc_mpsoc_commands(
|
||||
command = prepare_flash_write_cmd(object_id)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
elif op_code == "3":
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Flash delete"))
|
||||
command = prepare_flash_delete_cmd(object_id)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
elif op_code == "4":
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay start"))
|
||||
command = prepare_replay_start_cmd(object_id)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
elif op_code == "5":
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay stop"))
|
||||
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_STOP)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
elif op_code == "6":
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr on"))
|
||||
command = prepare_downlink_pwr_on_cmd(object_id)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
elif op_code == "7":
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr off"))
|
||||
command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_OFF)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
elif op_code == "8":
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay write sequence"))
|
||||
command = prepare_replay_write_sequence_cmd(object_id)
|
||||
command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command)
|
||||
tc_queue.appendleft(command.pack_command_tuple())
|
||||
|
||||
return tc_queue
|
||||
|
||||
@ -79,7 +121,7 @@ def generate_write_mem_command(
|
||||
"""
|
||||
command = (
|
||||
object_id
|
||||
+ struct.pack('!I', CommandIds.tc_mem_write)
|
||||
+ struct.pack('!I', CommandIds.TC_MEM_WRITE)
|
||||
+ struct.pack('!I', memory_address)
|
||||
+ struct.pack('!H', mem_len)
|
||||
+ struct.pack("!I", memory_data)
|
||||
@ -91,7 +133,7 @@ def prepare_mem_read_command(object_id: bytearray) -> bytearray:
|
||||
memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16)
|
||||
num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: "))
|
||||
command = (
|
||||
object_id + struct.pack('!I', CommandIds.tc_mem_read) + struct.pack("!I", memory_address) + struct.pack(
|
||||
object_id + struct.pack('!I', CommandIds.TC_MEM_READ) + struct.pack("!I", memory_address) + struct.pack(
|
||||
'!H', num_words)
|
||||
)
|
||||
return command
|
||||
@ -99,16 +141,55 @@ def prepare_mem_read_command(object_id: bytearray) -> bytearray:
|
||||
|
||||
def prepare_flash_write_cmd(object_id: bytearray) -> bytearray:
|
||||
file = get_flash_write_file()
|
||||
command = object_id + struct.pack('I', CommandIds.flash_write) + bytearray(file, 'utf-8')
|
||||
command = object_id + struct.pack('!I', CommandIds.FLASH_WRITE) + bytearray(file, 'utf-8')
|
||||
return command
|
||||
|
||||
|
||||
def prepare_flash_delete_cmd(object_id: bytearray) -> bytearray:
|
||||
file = get_mpsoc_file()
|
||||
command = object_id + struct.pack('!I', CommandIds.TC_FLASH_DELETE) + bytearray(file, 'utf-8')
|
||||
return command
|
||||
|
||||
|
||||
def prepare_replay_start_cmd(object_id: bytearray) -> bytearray:
|
||||
replay = int(input("Specify replay mode (0 - repeated, 1 - once): "))
|
||||
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_START) + struct.pack('!B', replay)
|
||||
return command
|
||||
|
||||
|
||||
def prepare_downlink_pwr_on_cmd(object_id: bytearray) -> bytearray:
|
||||
mode = int(input("Specify JESD mode (0 - 5): "))
|
||||
lane_rate = int(input("Specify lane rate (0 - 9): "))
|
||||
command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_ON) + struct.pack('!B', mode) \
|
||||
+ struct.pack('!B', lane_rate)
|
||||
return command
|
||||
|
||||
|
||||
def prepare_replay_write_sequence_cmd(object_id: bytearray) -> bytearray:
|
||||
use_decoding = int(input("Use decoding (set to 1): "))
|
||||
file = get_mpsoc_file()
|
||||
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_WRITE_SEQUENCE) + struct.pack('!B', use_decoding) + \
|
||||
bytearray(file, 'utf-8')
|
||||
return command
|
||||
|
||||
|
||||
def get_flash_write_file() -> str:
|
||||
LOGGER.info("Specify json file")
|
||||
input_helper = InputHelper(json_dict)
|
||||
LOGGER.info("Specify flash file")
|
||||
input_helper = InputHelper(flash_write_file_dict)
|
||||
key = input_helper.get_key()
|
||||
if key == MANUAL_INPUT:
|
||||
file = input("Ploc MPSoC: Specify absolute name of flash write file: ")
|
||||
file = input("Ploc MPSoC: Specify absolute name of flash file: ")
|
||||
else:
|
||||
file = json_dict[key][1]
|
||||
file = flash_write_file_dict[key][1]
|
||||
return file
|
||||
|
||||
|
||||
def get_mpsoc_file() -> str:
|
||||
LOGGER.info("Specify mpsoc file")
|
||||
input_helper = InputHelper(mpsoc_file_dict)
|
||||
key = input_helper.get_key()
|
||||
if key == MANUAL_INPUT:
|
||||
file = input("Ploc MPSoC: Specify absolute name file: ")
|
||||
else:
|
||||
file = mpsoc_file_dict[key][1]
|
||||
return file
|
||||
|
@ -5,7 +5,6 @@
|
||||
@author J. Meier
|
||||
@date 01.07.2021
|
||||
"""
|
||||
import struct
|
||||
from tmtccmd.config.definitions import QueueCommands
|
||||
|
||||
from tmtccmd.tc.packer import TcQueueT
|
||||
|
Loading…
x
Reference in New Issue
Block a user