452 lines
17 KiB
Python
452 lines
17 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
@file ploc_mpsoc.py
|
|
@brief Tests for commanding the MPSoC of the PLOC.
|
|
The MPSoC is programmed by the ILH.
|
|
@author J. Meier
|
|
@date 06.03.2021
|
|
"""
|
|
import logging
|
|
import struct
|
|
import enum
|
|
|
|
from eive_tmtc.config.definitions import CustomServiceList
|
|
from eive_tmtc.config.object_ids import get_object_ids, PLOC_MPSOC_ID
|
|
from tmtccmd.config.tmtc import (
|
|
tmtc_definitions_provider,
|
|
OpCodeEntry,
|
|
TmtcDefinitionWrapper,
|
|
)
|
|
from spacepackets.ecss.tc import PusTelecommand
|
|
from tmtccmd.tc import service_provider
|
|
from tmtccmd.tc.decorator import ServiceProviderParams
|
|
from eive_tmtc.utility.input_helper import InputHelper
|
|
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode
|
|
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
MANUAL_INPUT = "1"
|
|
|
|
flash_write_file_dict = {
|
|
MANUAL_INPUT: ["manual input", ""],
|
|
"2": ["/mnt/sd0/ploc/mpsoc/flash_write.bin", "/mnt/sd0/ploc/mpsoc/flash_write.bin"],
|
|
}
|
|
|
|
mpsoc_file_dict = {
|
|
MANUAL_INPUT: ["manual input", ""],
|
|
"2": ["0:/flash", "0:/flash"],
|
|
}
|
|
|
|
SEQ_FILE_NAMES = ["0:/EM16/231", "0:/EQ04/E-75", "0:/EQ01/E130"]
|
|
SEQ_FILE_DICT = {
|
|
MANUAL_INPUT: ["manual input", ""],
|
|
"2": [f"16QRM On Carrier 200 MBd ({SEQ_FILE_NAMES[0]})", f"{SEQ_FILE_NAMES[0]}"],
|
|
"3": [f"QPSK On Carrier 780 MBd ({SEQ_FILE_NAMES[1]})", f"{SEQ_FILE_NAMES[1]}"],
|
|
"4": [f"Maximum Bandwidth QPSK ({SEQ_FILE_NAMES[2]})", f"{SEQ_FILE_NAMES[2]}"],
|
|
}
|
|
|
|
|
|
CARRIAGE_RETURN = 0xD
|
|
|
|
|
|
class CommandId(enum.IntEnum):
|
|
TC_MEM_WRITE = 1
|
|
TC_MEM_READ = 2
|
|
FLASH_WRITE = 9
|
|
TC_FLASH_DELETE = 10
|
|
TC_REPLAY_START = 11
|
|
TC_REPLAY_STOP = 12
|
|
TC_REPLAY_WRITE_SEQUENCE = 13
|
|
TC_DOWNLINK_PWR_ON = 14
|
|
TC_DOWNLINK_PWR_OFF = 15
|
|
OBSW_RESET_SEQ_COUNT = 50
|
|
TC_MODE_REPLAY = 16
|
|
TC_CAM_CMD_SEND = 17
|
|
TC_MODE_IDLE = 18
|
|
SET_UART_TX_TRISTATE = 20
|
|
RELEASE_UART_TX = 21
|
|
TC_CAM_TAKE_PIC = 22
|
|
TC_SIMPLEX_SEND_FILE = 23
|
|
TC_DOWNLINK_DATA_MODULATE = 24
|
|
TC_MODE_SNAPSHOT = 25
|
|
|
|
|
|
class OpCode:
|
|
ON = ["on"]
|
|
OFF = ["off"]
|
|
NORMAL = ["normal"]
|
|
VERIFY_BOOT = ["verify_boot"]
|
|
MODE_REPLAY = ["mode_replay"]
|
|
MODE_IDLE = ["mode_idle"]
|
|
REPLAY_WRITE_SEQ = ["replay_write"]
|
|
DOWNLINK_PWR_ON = ["downlink_pwr_on"]
|
|
REPLAY_START = ["replay_start"]
|
|
CAM_TAKE_PIC = ["cam_take_pic"]
|
|
SIMPLEX_SEND_FILE = ["simplex_send_file"]
|
|
DOWNLINK_DATA_MODULATE = ["downlink_data_modulate"]
|
|
MODE_SNAPSHOT = ["mode_snapshot"]
|
|
|
|
|
|
class Info:
|
|
ON = "On"
|
|
OFF = "Off"
|
|
NORMAL = "Normal"
|
|
VERIFY_BOOT = "Verify boot by reading 0xdeadbeef from DEADBEEF address"
|
|
MODE_REPLAY = "Switch to REPLAY mode"
|
|
MODE_IDLE = "Switch to IDLE mode"
|
|
REPLAY_WRITE_SEQ = "Replay write sequence"
|
|
DOWNLINK_PWR_ON = "Downlink Power On"
|
|
REPLAY_START = "Replay Start"
|
|
CAM_TAKE_PIC = "Cam Take Picture"
|
|
SIMPLEX_SEND_FILE = "Simplex Send File"
|
|
DOWNLINK_DATA_MODULATE = "Downlink data modulate"
|
|
MODE_SNAPSHOT = "Mode Snapshot"
|
|
|
|
|
|
class MemAddresses(enum.IntEnum):
|
|
DEADBEEF = 0x40000004
|
|
|
|
|
|
class PlocReplyIds(enum.IntEnum):
|
|
TM_MEM_READ_RPT = 6
|
|
TM_CAM_CMD_RPT = 19
|
|
|
|
|
|
@tmtc_definitions_provider
|
|
def add_ploc_mpsoc_cmds(defs: TmtcDefinitionWrapper):
|
|
oce = OpCodeEntry()
|
|
oce.add(OpCode.OFF, Info.OFF)
|
|
oce.add(OpCode.ON, Info.ON)
|
|
oce.add(OpCode.NORMAL, Info.NORMAL)
|
|
oce.add("3", "Ploc MPSoC: Memory write")
|
|
oce.add("4", "Ploc MPSoC: Memory read")
|
|
oce.add("5", "Ploc MPSoC: Flash write")
|
|
oce.add("6", "Ploc MPSoC: Flash delete")
|
|
oce.add(OpCode.REPLAY_START, Info.REPLAY_START)
|
|
oce.add("8", "Ploc MPSoC: Replay stop")
|
|
oce.add(OpCode.DOWNLINK_PWR_ON, Info.DOWNLINK_PWR_ON)
|
|
oce.add("10", "Ploc MPSoC: Downlink pwr off")
|
|
oce.add(OpCode.REPLAY_WRITE_SEQ, Info.REPLAY_WRITE_SEQ)
|
|
oce.add("12", "Ploc MPSoC: OBSW reset sequence count")
|
|
oce.add(OpCode.VERIFY_BOOT, "Ploc MPSoC: Read DEADBEEF address")
|
|
oce.add(OpCode.MODE_REPLAY, Info.MODE_REPLAY)
|
|
oce.add(OpCode.MODE_IDLE, Info.MODE_IDLE)
|
|
oce.add("16", "Ploc MPSoC: Tc cam command send")
|
|
oce.add("17", "Ploc MPSoC: Set UART TX tristate")
|
|
oce.add("18", "Ploc MPSoC: Relesase UART TX")
|
|
oce.add(OpCode.CAM_TAKE_PIC, Info.CAM_TAKE_PIC)
|
|
oce.add(OpCode.SIMPLEX_SEND_FILE, Info.SIMPLEX_SEND_FILE)
|
|
oce.add(OpCode.DOWNLINK_DATA_MODULATE, Info.DOWNLINK_DATA_MODULATE)
|
|
oce.add(OpCode.MODE_SNAPSHOT, Info.MODE_SNAPSHOT)
|
|
defs.add_service(CustomServiceList.PLOC_MPSOC.value, "Ploc MPSoC", oce)
|
|
|
|
|
|
@service_provider(CustomServiceList.PLOC_MPSOC)
|
|
def pack_ploc_mpsoc_commands(p: ServiceProviderParams):
|
|
object_id = get_object_ids().get(PLOC_MPSOC_ID)
|
|
q = p.queue_helper
|
|
prefix = "PLOC MPSoC"
|
|
op_code = p.op_code
|
|
q.add_log_cmd(
|
|
f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}"
|
|
)
|
|
obyt = object_id.as_bytes
|
|
if op_code in OpCode.OFF:
|
|
q.add_log_cmd(f"{prefix}: {Info.OFF}")
|
|
command = pack_mode_data(obyt, Mode.OFF, 0)
|
|
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command))
|
|
if op_code in OpCode.ON:
|
|
q.add_log_cmd(f"{prefix}: {Info.ON}")
|
|
data = pack_mode_data(obyt, Mode.ON, 0)
|
|
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
|
|
if op_code in OpCode.NORMAL:
|
|
q.add_log_cmd(f"{prefix}: {Info.NORMAL}")
|
|
data = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0)
|
|
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
|
|
if op_code == "3":
|
|
q.add_log_cmd("PLOC MPSoC: TC mem write test")
|
|
memory_address = int(
|
|
input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16
|
|
)
|
|
memory_data = int(input("PLOC MPSoC: Tc Mem Write: Type memory data: 0x"), 16)
|
|
# TODO: implement variable length mem write command
|
|
mem_len = 1 # 1 32-bit word
|
|
data = generate_write_mem_command(
|
|
object_id.as_bytes, memory_address, memory_data, mem_len
|
|
)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == "4":
|
|
q.add_log_cmd("PLOC MPSoC: TC mem read test")
|
|
data = prepare_mem_read_command(object_id=object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == "5":
|
|
q.add_log_cmd("PLOC MPSoC: Flash write")
|
|
data = prepare_flash_write_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == "6":
|
|
q.add_log_cmd("PLOC MPSoC: Flash delete")
|
|
data = prepare_flash_delete_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code in OpCode.REPLAY_START:
|
|
q.add_log_cmd(f"{prefix}: {Info.REPLAY_START}")
|
|
data = prepare_replay_start_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == "8":
|
|
q.add_log_cmd("PLOC MPSoC: Replay stop")
|
|
data = object_id.as_bytes + struct.pack("!I", CommandId.TC_REPLAY_STOP)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code in OpCode.DOWNLINK_PWR_ON:
|
|
q.add_log_cmd(f"{prefix}: {OpCode.DOWNLINK_PWR_ON}")
|
|
data = prepare_downlink_pwr_on_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == "10":
|
|
q.add_log_cmd("PLOC MPSoC: Downlink pwr off")
|
|
data = object_id.as_bytes + struct.pack("!I", CommandId.TC_DOWNLINK_PWR_OFF)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code in OpCode.REPLAY_WRITE_SEQ:
|
|
q.add_log_cmd(f"{prefix}: {Info.REPLAY_WRITE_SEQ}")
|
|
data = prepare_replay_write_sequence_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == "12":
|
|
q.add_log_cmd("PLOC MPSoC: Reset OBSW sequence count")
|
|
data = object_id.as_bytes + struct.pack("!I", CommandId.OBSW_RESET_SEQ_COUNT)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code in OpCode.VERIFY_BOOT:
|
|
num_words = 1
|
|
q.add_log_cmd(f"{prefix} {Info.VERIFY_BOOT}")
|
|
data = (
|
|
object_id.as_bytes
|
|
+ struct.pack("!I", CommandId.TC_MEM_READ)
|
|
+ struct.pack("!I", MemAddresses.DEADBEEF)
|
|
+ struct.pack("!H", num_words)
|
|
)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code in OpCode.MODE_REPLAY:
|
|
q.add_log_cmd("PLOC MPSoC: Tc mode replay")
|
|
data = object_id.as_bytes + struct.pack("!I", CommandId.TC_MODE_REPLAY)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code in OpCode.MODE_IDLE:
|
|
q.add_log_cmd("PLOC MPSoC: Tc mode idle")
|
|
data = object_id.as_bytes + struct.pack("!I", CommandId.TC_MODE_IDLE)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == "16":
|
|
q.add_log_cmd("PLOC MPSoC: Tc cam command send")
|
|
cam_cmd = input("Specify cam command string: ")
|
|
data = (
|
|
object_id.as_bytes
|
|
+ struct.pack("!I", CommandId.TC_CAM_CMD_SEND)
|
|
+ bytearray(cam_cmd, "utf-8")
|
|
)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == "17":
|
|
q.add_log_cmd("PLOC MPSoC: Set UART TX tristate")
|
|
data = object_id.as_bytes + struct.pack("!I", CommandId.SET_UART_TX_TRISTATE)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == "18":
|
|
q.add_log_cmd("PLOC MPSoC: Release UART TX")
|
|
data = object_id.as_bytes + struct.pack("!I", CommandId.RELEASE_UART_TX)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code in OpCode.CAM_TAKE_PIC:
|
|
q.add_log_cmd("PLOC MPSoC: Cam take picture")
|
|
data = prepare_cam_take_pic_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code in OpCode.SIMPLEX_SEND_FILE:
|
|
q.add_log_cmd("PLOC MPSoC: Simplex send file")
|
|
data = prepare_simplex_send_file_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code in OpCode.DOWNLINK_DATA_MODULATE:
|
|
q.add_log_cmd("PLOC MPSoC: Downlink data modulate")
|
|
data = prepare_downlink_data_modulate_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code in OpCode.MODE_SNAPSHOT:
|
|
q.add_log_cmd("PLOC MPSoC: Mode snapshot")
|
|
data = object_id.as_bytes + struct.pack("!I", CommandId.TC_MODE_SNAPSHOT)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
|
|
|
|
def generate_write_mem_command(
|
|
object_id: bytes, memory_address: int, memory_data: int, mem_len: int
|
|
) -> bytearray:
|
|
"""This function generates the command to write to a memory address within the PLOC.
|
|
|
|
:param object_id: The object id of the PlocHandler
|
|
:param memory_address: The PLOC memory address where to write to.
|
|
:param memory_data: The data to write to the memory address specified by the
|
|
bytearray memory_address.
|
|
:param mem_len:
|
|
"""
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", CommandId.TC_MEM_WRITE)
|
|
+ struct.pack("!I", memory_address)
|
|
+ struct.pack("!H", mem_len)
|
|
+ struct.pack("!I", memory_data)
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_mem_read_command(object_id: bytes) -> bytearray:
|
|
memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16)
|
|
num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: "))
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", CommandId.TC_MEM_READ)
|
|
+ struct.pack("!I", memory_address)
|
|
+ struct.pack("!H", num_words)
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_flash_write_cmd(object_id: bytes) -> bytearray:
|
|
obc_file = get_obc_file()
|
|
mpsoc_file = get_mpsoc_file()
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", CommandId.FLASH_WRITE)
|
|
+ bytearray(obc_file, "utf-8")
|
|
+ bytearray(mpsoc_file, "utf-8")
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_flash_delete_cmd(object_id: bytes) -> bytearray:
|
|
file = get_mpsoc_file()
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", CommandId.TC_FLASH_DELETE)
|
|
+ bytearray(file, "utf-8")
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_replay_start_cmd(object_id: bytes) -> bytearray:
|
|
replay = int(input("Specify replay mode (0 - once, 1 - repeated): "))
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", CommandId.TC_REPLAY_START)
|
|
+ struct.pack("!B", replay)
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_downlink_pwr_on_cmd(object_id: bytes) -> bytearray:
|
|
mode = int(input("Specify JESD mode (0 - 5): "))
|
|
lane_rate = int(input("Specify lane rate (0 - 9): "))
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", CommandId.TC_DOWNLINK_PWR_ON)
|
|
+ struct.pack("!B", mode)
|
|
+ struct.pack("!B", lane_rate)
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_replay_write_sequence_cmd(object_id: bytes) -> bytearray:
|
|
use_decoding = int(input("Use decoding (set to 1): "))
|
|
file = get_sequence_file()
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", CommandId.TC_REPLAY_WRITE_SEQUENCE)
|
|
+ struct.pack("!B", use_decoding)
|
|
+ bytearray(file, "utf-8")
|
|
# + bytes([0])
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_cam_take_pic_cmd(object_id: bytes) -> bytearray:
|
|
selection = input("Use default parameter? (Y/N): ")
|
|
if selection is "Y" or selection is "y":
|
|
filename = "0:/test"
|
|
encoder_setting_y = 7
|
|
quantization_y = 0
|
|
encoder_setting_cb = 7
|
|
quantization_cb = 0
|
|
encoder_setting_cr = 7
|
|
quantization_cr = 0
|
|
bypass_compressor = 0
|
|
else:
|
|
filename = input("Specify filename: ")
|
|
encoder_setting_y = int(input("Specify encoderSetting_Y: "))
|
|
quantization_y = int(input("Specify quantization_Y: "))
|
|
encoder_setting_cb = int(input("Specify encoderSetting_Cb: "))
|
|
quantization_cb = int(input("Specify quantization_Cb: "))
|
|
encoder_setting_cr = int(input("Specify encoderSetting_Cr: "))
|
|
quantization_cr = int(input("Specify quantization_Cr: "))
|
|
bypass_compressor = int(input("Specify bypassCompressor: "))
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", CommandId.TC_CAM_TAKE_PIC)
|
|
+ bytearray(filename, "utf-8")
|
|
+ bytes([0])
|
|
+ struct.pack("!B", encoder_setting_y)
|
|
+ struct.pack("!Q", quantization_y)
|
|
+ struct.pack("!B", encoder_setting_cb)
|
|
+ struct.pack("!Q", quantization_cb)
|
|
+ struct.pack("!B", encoder_setting_cr)
|
|
+ struct.pack("!Q", quantization_cr)
|
|
+ struct.pack("!B", bypass_compressor)
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_simplex_send_file_cmd(object_id: bytes) -> bytearray:
|
|
filename = input("Specify filename: ")
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", CommandId.TC_SIMPLEX_SEND_FILE)
|
|
+ bytearray(filename, "utf-8")
|
|
+ bytes([0])
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_downlink_data_modulate_cmd(object_id: bytes) -> bytearray:
|
|
format = int(input("Specify format: "))
|
|
src_mem_addr = int(input("Specify srcMemAddr: "))
|
|
src_mem_len = int(input("Specify srcMemLen: "))
|
|
dest_mem_addr = int(input("Specify destMemAddr: "))
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", CommandId.TC_DOWNLINK_DATA_MODULATE)
|
|
+ struct.pack("!B", format)
|
|
+ struct.pack("!I", src_mem_addr)
|
|
+ struct.pack("!H", src_mem_len)
|
|
+ struct.pack("!I", dest_mem_addr)
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def get_obc_file() -> str:
|
|
_LOGGER.info("Specify OBC file ")
|
|
input_helper = InputHelper(flash_write_file_dict)
|
|
key = input_helper.get_key()
|
|
if key == MANUAL_INPUT:
|
|
file = input("Ploc MPSoC: Specify absolute name of flash file: ")
|
|
else:
|
|
file = flash_write_file_dict[key][1]
|
|
return file
|
|
|
|
|
|
def get_mpsoc_file() -> str:
|
|
_LOGGER.info("Specify MPSoC file")
|
|
input_helper = InputHelper(mpsoc_file_dict)
|
|
key = input_helper.get_key()
|
|
if key == MANUAL_INPUT:
|
|
file = input("Ploc MPSoC: Specify absolute name file: ")
|
|
else:
|
|
file = mpsoc_file_dict[key][1]
|
|
return file
|
|
|
|
|
|
def get_sequence_file() -> str:
|
|
_LOGGER.info("Specify sequence file")
|
|
input_helper = InputHelper(SEQ_FILE_DICT)
|
|
key = input_helper.get_key()
|
|
if key == MANUAL_INPUT:
|
|
file = input("Ploc MPSoC: Specify absolute name file: ")
|
|
else:
|
|
file = SEQ_FILE_DICT[key][1]
|
|
return file
|