708 lines
26 KiB
Python
708 lines
26 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
@file ploc_mpsoc.py
|
|
@brief Tests for commanding the MPSoC of the PLOC.
|
|
The MPSoC is programmed by the ILH.
|
|
@author J. Meier
|
|
@date 06.03.2021
|
|
"""
|
|
import dataclasses
|
|
import logging
|
|
import struct
|
|
import enum
|
|
|
|
from eive_tmtc.config.definitions import CustomServiceList
|
|
from eive_tmtc.config.object_ids import get_object_ids, PLOC_MPSOC_ID
|
|
from eive_tmtc.pus_tm.defs import PrintWrapper
|
|
from tmtccmd.config.tmtc import (
|
|
tmtc_definitions_provider,
|
|
OpCodeEntry,
|
|
TmtcDefinitionWrapper,
|
|
)
|
|
from spacepackets.ecss.tc import PusTelecommand
|
|
from tmtccmd.tmtc import service_provider
|
|
from tmtccmd.tmtc.decorator import ServiceProviderParams
|
|
from eive_tmtc.utility.input_helper import InputHelper
|
|
from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode
|
|
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
MANUAL_INPUT = "1"
|
|
|
|
OBC_WRITE_FILE_DICT = {
|
|
MANUAL_INPUT: ("manual input", ""),
|
|
"2": ("/mnt/sd0/ploc/mpsoc/flash_write.bin", "/mnt/sd0/ploc/mpsoc/flash_write.bin"),
|
|
}
|
|
|
|
OBC_READ_FILE_DICT = {
|
|
MANUAL_INPUT: ("manual input", ""),
|
|
"2": ("/mnt/sd0/ploc/mpsoc/flash_read.bin", "/mnt/sd0/ploc/mpsoc/flash_read.bin"),
|
|
}
|
|
|
|
MPSOC_WRITE_FILE_DICT = {
|
|
MANUAL_INPUT: ("manual input", ""),
|
|
"2": ("0:/", "0:/"),
|
|
}
|
|
|
|
MPSOC_READ_FILE_DICT = {
|
|
MANUAL_INPUT: ("manual input", ""),
|
|
"2": ("0:/PICA", "0:/PICA"),
|
|
}
|
|
|
|
SEQ_FILE_NAMES = ["0:/EM16/231", "0:/EQ04/E-75", "0:/EQ01/E130"]
|
|
SEQ_FILE_DICT = {
|
|
MANUAL_INPUT: ("manual input", ""),
|
|
"2": (f"16QRM On Carrier 200 MBd ({SEQ_FILE_NAMES[0]})", f"{SEQ_FILE_NAMES[0]}"),
|
|
"3": (f"QPSK On Carrier 780 MBd ({SEQ_FILE_NAMES[1]})", f"{SEQ_FILE_NAMES[1]}"),
|
|
"4": (f"Maximum Bandwidth QPSK ({SEQ_FILE_NAMES[2]})", f"{SEQ_FILE_NAMES[2]}"),
|
|
}
|
|
|
|
|
|
CARRIAGE_RETURN = 0xD
|
|
|
|
|
|
class SetId(enum.IntEnum):
|
|
HK_ID = 0
|
|
|
|
|
|
class ActionId(enum.IntEnum):
|
|
TC_MEM_WRITE = 1
|
|
TC_MEM_READ = 2
|
|
TM_MEM_READ_RPT = 6
|
|
TC_FLASH_WRITE_FULL_FILE = 9
|
|
TC_FLASH_DELETE = 10
|
|
TC_REPLAY_START = 11
|
|
TC_REPLAY_STOP = 12
|
|
TC_REPLAY_WRITE_SEQUENCE = 13
|
|
TC_DOWNLINK_PWR_ON = 14
|
|
TC_DOWNLINK_PWR_OFF = 15
|
|
OBSW_RESET_SEQ_COUNT = 50
|
|
TC_MODE_REPLAY = 16
|
|
TC_CAM_CMD_SEND = 17
|
|
TC_MODE_IDLE = 18
|
|
TM_CAM_CMD_RPT = 19
|
|
SET_UART_TX_TRISTATE = 20
|
|
RELEASE_UART_TX = 21
|
|
TC_CAM_TAKE_PIC = 22
|
|
TC_SIMPLEX_SEND_FILE = 23
|
|
TC_DOWNLINK_DATA_MODULATE = 24
|
|
TC_MODE_SNAPSHOT = 25
|
|
TC_FLASH_DIR_GET_CONTENT = 28
|
|
TM_FLASH_DIRECTORY_CONTENT = 29
|
|
TC_FLASH_READ_FULL_FILE = 30
|
|
|
|
|
|
class OpCode:
|
|
ON = "on"
|
|
OFF = "off"
|
|
NORMAL = "normal"
|
|
VERIFY_BOOT = "verify_boot"
|
|
MODE_REPLAY = "mode_replay"
|
|
MODE_IDLE = "mode_idle"
|
|
REPLAY_WRITE_SEQ = "replay_write"
|
|
DOWNLINK_PWR_ON = "downlink_pwr_on"
|
|
MEM_WRITE = "memory_write"
|
|
MEM_READ = "memory_read"
|
|
DOWNLINK_PWR_OFF = "downlink_pwr_off"
|
|
FLASH_WRITE_FILE = "flash_write_file"
|
|
FLASH_READ_FILE = "flash_read_file"
|
|
FLASH_DELETE_FILE = "flash_delete_file"
|
|
FLASH_GET_DIR_CONTENT = "flash_get_dir_content"
|
|
REPLAY_STOP = "replay_stop"
|
|
REPLAY_START = "replay_start"
|
|
CAM_TAKE_PIC = "cam_take_pic"
|
|
SIMPLEX_SEND_FILE = "simplex_send_file"
|
|
DOWNLINK_DATA_MODULATE = "downlink_data_modulate"
|
|
MODE_SNAPSHOT = "mode_snapshot"
|
|
|
|
|
|
class Info:
|
|
ON = "On"
|
|
OFF = "Off"
|
|
NORMAL = "Normal"
|
|
VERIFY_BOOT = "Verify boot by reading 0xdeadbeef from DEADBEEF address"
|
|
MODE_REPLAY = "Switch to REPLAY mode"
|
|
REPLAY_STOP = "Stop Replay"
|
|
MODE_IDLE = "Switch to IDLE mode"
|
|
REPLAY_WRITE_SEQ = "Replay write sequence"
|
|
DOWNLINK_PWR_ON = "Downlink Power On"
|
|
DOWNLINK_PWR_OFF = "Downlink Power Off"
|
|
REPLAY_START = "Replay Start"
|
|
CAM_TAKE_PIC = "Cam Take Picture"
|
|
SIMPLEX_SEND_FILE = "Simplex Send File"
|
|
FLASH_READ_FILE = "Copy file from MPSoC to OBC"
|
|
FLASH_WRITE_FILE = "Copy file from OBC to MPSoC"
|
|
FLASH_DELETE_FILE = "Delete file on MPSoC"
|
|
FLASH_GET_DIR_CONTENT = "Get flash directory content on MPSoC"
|
|
DOWNLINK_DATA_MODULATE = "Downlink data modulate"
|
|
MODE_SNAPSHOT = "Mode Snapshot"
|
|
|
|
|
|
class MemAddresses(enum.IntEnum):
|
|
DEADBEEF = 0x40000004
|
|
|
|
|
|
@tmtc_definitions_provider
|
|
def add_ploc_mpsoc_cmds(defs: TmtcDefinitionWrapper):
|
|
oce = OpCodeEntry()
|
|
oce.add(OpCode.OFF, Info.OFF)
|
|
oce.add(OpCode.ON, Info.ON)
|
|
oce.add(OpCode.NORMAL, Info.NORMAL)
|
|
oce.add(OpCode.MEM_WRITE, "Ploc MPSoC: Memory write")
|
|
oce.add(OpCode.MEM_READ, "Ploc MPSoC: Memory read")
|
|
oce.add(OpCode.FLASH_WRITE_FILE, Info.FLASH_WRITE_FILE)
|
|
oce.add(OpCode.FLASH_READ_FILE, Info.FLASH_READ_FILE)
|
|
oce.add(OpCode.FLASH_DELETE_FILE, Info.FLASH_DELETE_FILE)
|
|
oce.add(OpCode.FLASH_GET_DIR_CONTENT, Info.FLASH_GET_DIR_CONTENT)
|
|
oce.add(OpCode.REPLAY_START, Info.REPLAY_START)
|
|
oce.add(OpCode.REPLAY_STOP, Info.REPLAY_STOP)
|
|
oce.add(OpCode.DOWNLINK_PWR_ON, Info.DOWNLINK_PWR_ON)
|
|
oce.add(OpCode.DOWNLINK_PWR_OFF, Info.DOWNLINK_PWR_OFF)
|
|
oce.add(OpCode.REPLAY_WRITE_SEQ, Info.REPLAY_WRITE_SEQ)
|
|
oce.add("12", "Ploc MPSoC: OBSW reset sequence count")
|
|
oce.add(OpCode.VERIFY_BOOT, Info.VERIFY_BOOT)
|
|
oce.add(OpCode.MODE_REPLAY, Info.MODE_REPLAY)
|
|
oce.add(OpCode.MODE_IDLE, Info.MODE_IDLE)
|
|
oce.add("16", "Ploc MPSoC: Tc cam command send")
|
|
oce.add("17", "Ploc MPSoC: Set UART TX tristate")
|
|
oce.add("18", "Ploc MPSoC: Relesase UART TX")
|
|
oce.add(OpCode.CAM_TAKE_PIC, Info.CAM_TAKE_PIC)
|
|
oce.add(OpCode.SIMPLEX_SEND_FILE, Info.SIMPLEX_SEND_FILE)
|
|
oce.add(OpCode.DOWNLINK_DATA_MODULATE, Info.DOWNLINK_DATA_MODULATE)
|
|
oce.add(OpCode.MODE_SNAPSHOT, Info.MODE_SNAPSHOT)
|
|
defs.add_service(CustomServiceList.PLOC_MPSOC.value, "Ploc MPSoC", oce)
|
|
|
|
|
|
@service_provider(CustomServiceList.PLOC_MPSOC)
|
|
def pack_ploc_mpsoc_commands( # noqa C901
|
|
p: ServiceProviderParams,
|
|
): # noqa C901: Complexity okay here.
|
|
object_id = get_object_ids().get(PLOC_MPSOC_ID)
|
|
q = p.queue_helper
|
|
prefix = "PLOC MPSoC"
|
|
op_code = p.op_code
|
|
q.add_log_cmd(
|
|
f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}"
|
|
)
|
|
obyt = object_id.as_bytes
|
|
if op_code == OpCode.OFF:
|
|
q.add_log_cmd(f"{prefix}: {Info.OFF}")
|
|
command = pack_mode_data(obyt, Mode.OFF, 0)
|
|
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command))
|
|
if op_code == OpCode.ON:
|
|
q.add_log_cmd(f"{prefix}: {Info.ON}")
|
|
data = pack_mode_data(obyt, Mode.ON, 0)
|
|
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
|
|
if op_code == OpCode.NORMAL:
|
|
q.add_log_cmd(f"{prefix}: {Info.NORMAL}")
|
|
data = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0)
|
|
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
|
|
if op_code == OpCode.MEM_WRITE:
|
|
q.add_log_cmd("PLOC MPSoC: TC mem write test")
|
|
memory_address = int(
|
|
input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16
|
|
)
|
|
memory_data = int(input("PLOC MPSoC: Tc Mem Write: Type memory data: 0x"), 16)
|
|
# TODO: implement variable length mem write command
|
|
mem_len = 1 # 1 32-bit word
|
|
data = generate_write_mem_command(
|
|
object_id.as_bytes, memory_address, memory_data, mem_len
|
|
)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == "4":
|
|
q.add_log_cmd("PLOC MPSoC: TC mem read test")
|
|
data = prepare_mem_read_command(object_id=object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == OpCode.FLASH_WRITE_FILE:
|
|
q.add_log_cmd(f"{prefix}: {Info.FLASH_WRITE_FILE}")
|
|
data = prepare_flash_write_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == OpCode.FLASH_READ_FILE:
|
|
q.add_log_cmd(f"{prefix}: {Info.FLASH_READ_FILE}")
|
|
data = prepare_flash_read_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == OpCode.FLASH_DELETE_FILE:
|
|
q.add_log_cmd("PLOC MPSoC: Flash delete")
|
|
data = prepare_flash_delete_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code in OpCode.REPLAY_START:
|
|
q.add_log_cmd(f"{prefix}: {Info.REPLAY_START}")
|
|
data = prepare_replay_start_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == OpCode.REPLAY_STOP:
|
|
q.add_log_cmd("PLOC MPSoC: Replay stop")
|
|
data = object_id.as_bytes + struct.pack("!I", ActionId.TC_REPLAY_STOP)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == OpCode.DOWNLINK_PWR_ON:
|
|
q.add_log_cmd(f"{prefix}: {OpCode.DOWNLINK_PWR_ON}")
|
|
data = prepare_downlink_pwr_on_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == OpCode.DOWNLINK_PWR_OFF:
|
|
q.add_log_cmd("PLOC MPSoC: Downlink pwr off")
|
|
data = object_id.as_bytes + struct.pack("!I", ActionId.TC_DOWNLINK_PWR_OFF)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == OpCode.FLASH_GET_DIR_CONTENT:
|
|
q.add_log_cmd(f"{prefix}: {Info.FLASH_GET_DIR_CONTENT}")
|
|
dir_name = input("Please specify MPSoC directory name to get information for: ")
|
|
dir_name = bytearray(dir_name.encode("utf-8"))
|
|
dir_name.append(0)
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=object_id.as_bytes,
|
|
action_id=ActionId.TC_FLASH_DIR_GET_CONTENT,
|
|
user_data=dir_name,
|
|
)
|
|
)
|
|
if op_code == OpCode.REPLAY_WRITE_SEQ:
|
|
q.add_log_cmd(f"{prefix}: {Info.REPLAY_WRITE_SEQ}")
|
|
data = prepare_replay_write_sequence_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == "12":
|
|
q.add_log_cmd("PLOC MPSoC: Reset OBSW sequence count")
|
|
data = object_id.as_bytes + struct.pack("!I", ActionId.OBSW_RESET_SEQ_COUNT)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == OpCode.VERIFY_BOOT:
|
|
num_words = 1
|
|
q.add_log_cmd(f"{prefix} {Info.VERIFY_BOOT}")
|
|
data = (
|
|
object_id.as_bytes
|
|
+ struct.pack("!I", ActionId.TC_MEM_READ)
|
|
+ struct.pack("!I", MemAddresses.DEADBEEF)
|
|
+ struct.pack("!H", num_words)
|
|
)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == OpCode.MODE_REPLAY:
|
|
q.add_log_cmd("PLOC MPSoC: Tc mode replay")
|
|
data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_REPLAY)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == OpCode.MODE_IDLE:
|
|
q.add_log_cmd("PLOC MPSoC: Tc mode idle")
|
|
data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_IDLE)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == "16":
|
|
q.add_log_cmd("PLOC MPSoC: Tc cam command send")
|
|
cam_cmd = input("Specify cam command string: ")
|
|
data = (
|
|
object_id.as_bytes
|
|
+ struct.pack("!I", ActionId.TC_CAM_CMD_SEND)
|
|
+ bytearray(cam_cmd, "utf-8")
|
|
)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == "17":
|
|
q.add_log_cmd("PLOC MPSoC: Set UART TX tristate")
|
|
data = object_id.as_bytes + struct.pack("!I", ActionId.SET_UART_TX_TRISTATE)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == "18":
|
|
q.add_log_cmd("PLOC MPSoC: Release UART TX")
|
|
data = object_id.as_bytes + struct.pack("!I", ActionId.RELEASE_UART_TX)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == OpCode.CAM_TAKE_PIC:
|
|
q.add_log_cmd("PLOC MPSoC: Cam take picture")
|
|
data = prepare_cam_take_pic_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == OpCode.SIMPLEX_SEND_FILE:
|
|
q.add_log_cmd("PLOC MPSoC: Simplex send file")
|
|
data = prepare_simplex_send_file_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == OpCode.DOWNLINK_DATA_MODULATE:
|
|
q.add_log_cmd("PLOC MPSoC: Downlink data modulate")
|
|
data = prepare_downlink_data_modulate_cmd(object_id.as_bytes)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
if op_code == OpCode.MODE_SNAPSHOT:
|
|
q.add_log_cmd("PLOC MPSoC: Mode snapshot")
|
|
data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_SNAPSHOT)
|
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
|
|
|
|
|
def generate_write_mem_command(
|
|
object_id: bytes, memory_address: int, memory_data: int, mem_len: int
|
|
) -> bytearray:
|
|
"""This function generates the command to write to a memory address within the PLOC.
|
|
|
|
:param object_id: The object id of the PlocHandler
|
|
:param memory_address: The PLOC memory address where to write to.
|
|
:param memory_data: The data to write to the memory address specified by the
|
|
bytearray memory_address.
|
|
:param mem_len:
|
|
"""
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", ActionId.TC_MEM_WRITE)
|
|
+ struct.pack("!I", memory_address)
|
|
+ struct.pack("!H", mem_len)
|
|
+ struct.pack("!I", memory_data)
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_mem_read_command(object_id: bytes) -> bytearray:
|
|
memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16)
|
|
num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: "))
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", ActionId.TC_MEM_READ)
|
|
+ struct.pack("!I", memory_address)
|
|
+ struct.pack("!H", num_words)
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_flash_base_cmd(
|
|
obc_filename: str, mpsoc_filename: str, action_id: int, object_id: bytes
|
|
) -> bytearray:
|
|
command = bytearray(object_id)
|
|
command.extend(struct.pack("!I", action_id))
|
|
command.extend(obc_filename.encode("utf-8"))
|
|
command.append(0)
|
|
command.extend(mpsoc_filename.encode("utf-8"))
|
|
command.append(0)
|
|
return command
|
|
|
|
|
|
def prepare_flash_write_cmd(object_id: bytes) -> bytearray:
|
|
obc_file = get_obc_file(OBC_WRITE_FILE_DICT)
|
|
mpsoc_file = get_mpsoc_file(MPSOC_WRITE_FILE_DICT)
|
|
return prepare_flash_base_cmd(
|
|
obc_file, mpsoc_file, ActionId.TC_FLASH_WRITE_FULL_FILE, object_id
|
|
)
|
|
|
|
|
|
def prepare_flash_read_cmd(object_id: bytes) -> bytearray:
|
|
mpsoc_file = get_mpsoc_file(MPSOC_READ_FILE_DICT)
|
|
obc_file = get_obc_file(OBC_READ_FILE_DICT)
|
|
cmd = prepare_flash_base_cmd(
|
|
obc_file, mpsoc_file, ActionId.TC_FLASH_READ_FULL_FILE, object_id
|
|
)
|
|
file_size = get_mpsoc_file_size()
|
|
cmd.extend(struct.pack("!I", file_size))
|
|
return cmd
|
|
|
|
|
|
def prepare_flash_delete_cmd(object_id: bytes) -> bytearray:
|
|
file = get_mpsoc_file(MPSOC_READ_FILE_DICT)
|
|
command = (
|
|
object_id + struct.pack("!I", ActionId.TC_FLASH_DELETE) + file.encode("utf-8")
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_replay_start_cmd(object_id: bytes) -> bytearray:
|
|
replay = int(input("Specify replay mode (0 - once, 1 - repeated): "))
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", ActionId.TC_REPLAY_START)
|
|
+ struct.pack("!B", replay)
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_downlink_pwr_on_cmd(object_id: bytes) -> bytearray:
|
|
mode = int(input("Specify JESD mode (0 - 5): "))
|
|
lane_rate = int(input("Specify lane rate (0 - 9): "))
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", ActionId.TC_DOWNLINK_PWR_ON)
|
|
+ struct.pack("!B", mode)
|
|
+ struct.pack("!B", lane_rate)
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_replay_write_sequence_cmd(object_id: bytes) -> bytearray:
|
|
use_decoding = int(input("Use decoding (set to 1): "))
|
|
file = get_sequence_file()
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", ActionId.TC_REPLAY_WRITE_SEQUENCE)
|
|
+ struct.pack("!B", use_decoding)
|
|
+ bytearray(file, "utf-8")
|
|
# + bytes([0])
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_cam_take_pic_cmd(object_id: bytes) -> bytearray:
|
|
selection = input("Use default parameter? (Y/N): ")
|
|
if selection.lower() in ["y", "1", "yes"]:
|
|
filename = "0:/test"
|
|
encoder_setting_y = 7
|
|
quantization_y = 0
|
|
encoder_setting_cb = 7
|
|
quantization_cb = 0
|
|
encoder_setting_cr = 7
|
|
quantization_cr = 0
|
|
bypass_compressor = 0
|
|
else:
|
|
filename = input("Specify filename: ")
|
|
encoder_setting_y = int(input("Specify encoderSetting_Y: "))
|
|
quantization_y = int(input("Specify quantization_Y: "))
|
|
encoder_setting_cb = int(input("Specify encoderSetting_Cb: "))
|
|
quantization_cb = int(input("Specify quantization_Cb: "))
|
|
encoder_setting_cr = int(input("Specify encoderSetting_Cr: "))
|
|
quantization_cr = int(input("Specify quantization_Cr: "))
|
|
bypass_compressor = int(input("Specify bypassCompressor: "))
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", ActionId.TC_CAM_TAKE_PIC)
|
|
+ bytearray(filename, "utf-8")
|
|
+ bytes([0])
|
|
+ struct.pack("!B", encoder_setting_y)
|
|
+ struct.pack("!Q", quantization_y)
|
|
+ struct.pack("!B", encoder_setting_cb)
|
|
+ struct.pack("!Q", quantization_cb)
|
|
+ struct.pack("!B", encoder_setting_cr)
|
|
+ struct.pack("!Q", quantization_cr)
|
|
+ struct.pack("!B", bypass_compressor)
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_simplex_send_file_cmd(object_id: bytes) -> bytearray:
|
|
filename = input("Specify filename: ")
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", ActionId.TC_SIMPLEX_SEND_FILE)
|
|
+ bytearray(filename, "utf-8")
|
|
+ bytes([0])
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def prepare_downlink_data_modulate_cmd(object_id: bytes) -> bytearray:
|
|
format = int(input("Specify format: "))
|
|
src_mem_addr = int(input("Specify srcMemAddr: "))
|
|
src_mem_len = int(input("Specify srcMemLen: "))
|
|
dest_mem_addr = int(input("Specify destMemAddr: "))
|
|
command = (
|
|
object_id
|
|
+ struct.pack("!I", ActionId.TC_DOWNLINK_DATA_MODULATE)
|
|
+ struct.pack("!B", format)
|
|
+ struct.pack("!I", src_mem_addr)
|
|
+ struct.pack("!H", src_mem_len)
|
|
+ struct.pack("!I", dest_mem_addr)
|
|
)
|
|
return bytearray(command)
|
|
|
|
|
|
def get_obc_file(input_dict: dict) -> str:
|
|
_LOGGER.info("Specify OBC filename")
|
|
input_helper = InputHelper(input_dict)
|
|
key = input_helper.get_key()
|
|
if key == MANUAL_INPUT:
|
|
file = input("Ploc MPSoC: Specify absolute name of flash file: ")
|
|
else:
|
|
file = input_dict[key][1]
|
|
return file
|
|
|
|
|
|
def get_mpsoc_file(input_dict: dict) -> str:
|
|
_LOGGER.info("Specify MPSoC filename")
|
|
input_helper = InputHelper(input_dict)
|
|
key = input_helper.get_key()
|
|
if key == MANUAL_INPUT:
|
|
file = input("Ploc MPSoC: Specify absolute name file: ")
|
|
else:
|
|
file = input_dict[key][1]
|
|
return file
|
|
|
|
|
|
def get_mpsoc_file_size() -> int:
|
|
file_size = int(input("Specify MPSoC file size: "))
|
|
if file_size <= 0:
|
|
raise ValueError("Invalid file size")
|
|
return file_size
|
|
|
|
|
|
def get_sequence_file() -> str:
|
|
_LOGGER.info("Specify sequence file")
|
|
input_helper = InputHelper(SEQ_FILE_DICT)
|
|
key = input_helper.get_key()
|
|
if key == MANUAL_INPUT:
|
|
file = input("Ploc MPSoC: Specify absolute name file: ")
|
|
else:
|
|
file = SEQ_FILE_DICT[key][1]
|
|
return file
|
|
|
|
|
|
def handle_ploc_mpsoc_hk_data(pw: PrintWrapper, hk_data: bytes, set_id: int):
|
|
if set_id == SetId.HK_ID:
|
|
fmt_str = "!IBBBBBBB"
|
|
current_idx = 0
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(
|
|
status,
|
|
mode,
|
|
downlink_pwr_on,
|
|
downlink_reply_active,
|
|
downlink_jesd_sync_status,
|
|
downlink_dac_status,
|
|
cam_status,
|
|
cam_sdi_status,
|
|
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
|
current_idx += inc_len
|
|
pw.ilog(_LOGGER, "Received MPSoC HK")
|
|
pw.dlog(f"Status: {status}")
|
|
pw.dlog(f"Mode: {mode}")
|
|
pw.dlog(f"Downlink Power On: {downlink_pwr_on}")
|
|
pw.dlog(f"Downlink Reply Active: {downlink_reply_active}")
|
|
pw.dlog(f"Downlink JESD Sync Status: {downlink_jesd_sync_status}")
|
|
pw.dlog(f"Downlink DAC Status: {downlink_dac_status}")
|
|
pw.dlog(f"CAM Status: {cam_status}")
|
|
pw.dlog(f"CAM SDI Status: {cam_sdi_status}")
|
|
|
|
fmt_str = "!fffffffff"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(
|
|
cam_fpga_temp,
|
|
cam_soc_temp,
|
|
sysmon_temp,
|
|
sysmon_vcc_int,
|
|
sysmon_vcc_aux,
|
|
sysmon_vcc_bram,
|
|
sysmon_vcc_paux,
|
|
sysmon_vcc_pint,
|
|
sysmon_vcc_pdro,
|
|
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
|
current_idx += inc_len
|
|
|
|
pw.dlog(f"CAM FPGA Temperature: {cam_fpga_temp}")
|
|
pw.dlog(f"CAM SoC Temperature: {cam_soc_temp}")
|
|
pw.dlog(f"System Monitor Temperature: {sysmon_temp}")
|
|
pw.dlog(
|
|
f"SYSMON VCC INT {sysmon_vcc_int:.3f} | SYSMON VCC AUX"
|
|
f" {sysmon_vcc_aux:.3f} | SYSMON VCC BRAM {sysmon_vcc_bram:.3f}"
|
|
)
|
|
pw.dlog(
|
|
f"SYSMON VCC PAUX {sysmon_vcc_paux:.3f} | SYSMON VCC PINT"
|
|
f" {sysmon_vcc_pint:.3f} | SYSMON VCC PDRO {sysmon_vcc_pdro:.3f}"
|
|
)
|
|
|
|
fmt_str = "!fffffffffffff"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(
|
|
sysmon_mb_12v,
|
|
sysmon_mb_3v3,
|
|
sysmon_mb_1v8,
|
|
sysmon_vcc_12v,
|
|
sysmon_vcc_5v,
|
|
sysmon_vcc_3v3,
|
|
sysmon_vcc_3v3va,
|
|
sysmon_vcc_2v5ddr,
|
|
sysmon_vcc_1v2ddr,
|
|
sysmon_vcc_0v9,
|
|
sysmon_vcc_0v6vtt,
|
|
sysmon_safe_cotr_cur,
|
|
sysmon_nvm4_xo_cur,
|
|
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
|
current_idx += inc_len
|
|
|
|
pw.dlog(
|
|
f"SYSMON MB 12V {sysmon_mb_12v:.3f} | SYSMON MB 3V3 {sysmon_mb_3v3:.3f} | "
|
|
f"SYSMON MBA 1V8 {sysmon_mb_1v8:.3f}"
|
|
)
|
|
pw.dlog(
|
|
f"SYSMON VCC 12V {sysmon_vcc_12v:.3f} | SYSMON VCC 5V {sysmon_vcc_5v:.3f} |"
|
|
f" SYSMON VCC 3V3 {sysmon_vcc_3v3:.3f} | SYSMON VCC 3V3VA"
|
|
f" {sysmon_vcc_3v3va}"
|
|
)
|
|
pw.dlog(
|
|
f"SYSMON VCC 2V5DDR {sysmon_vcc_2v5ddr:.3f} | "
|
|
f"SYSMON VCC 1V2DDR {sysmon_vcc_1v2ddr:.3f} | "
|
|
f"SYSMON VCC 0V9 {sysmon_vcc_0v9:.3f} | "
|
|
f"SYSMON VCC 0V6VTT {sysmon_vcc_0v6vtt}"
|
|
)
|
|
pw.dlog(
|
|
f"SYSMON SAFE COTS CURR: {sysmon_safe_cotr_cur} | "
|
|
f"SYSMON NVM4XO CURR {sysmon_nvm4_xo_cur}"
|
|
)
|
|
fmt_str = "!HHBB"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(
|
|
sem_uncorrectable_errs,
|
|
sem_correctable_errs,
|
|
sem_status,
|
|
reboot_mpsoc_required,
|
|
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
|
|
|
pw.dlog(f"SEM IP Uncorrectable Errors: {sem_uncorrectable_errs}")
|
|
pw.dlog(f"SEM IP Correctable Errors: {sem_correctable_errs}")
|
|
pw.dlog(f"SEM IP Status: {sem_status}")
|
|
pw.dlog(f"Reboot MPSoC required: {reboot_mpsoc_required}")
|
|
else:
|
|
_LOGGER.warning(f"Unknown set ID {set_id} for MPSoC HK")
|
|
pass
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class DirElement:
|
|
name: str
|
|
attr: int
|
|
size: int
|
|
|
|
|
|
def handle_mpsoc_data_reply(action_id: int, pw: PrintWrapper, custom_data: bytearray):
|
|
if action_id == ActionId.TM_MEM_READ_RPT:
|
|
header_list = [
|
|
"PLOC Memory Address",
|
|
"PLOC Mem Len",
|
|
"PLOC Read Memory Data",
|
|
]
|
|
content_list = [
|
|
"0x" + custom_data[:4].hex(),
|
|
struct.unpack("!H", custom_data[4:6])[0],
|
|
"0x" + custom_data[6:10].hex(),
|
|
]
|
|
pw.dlog(f"{header_list}")
|
|
pw.dlog(f"{content_list}")
|
|
elif action_id == ActionId.TM_CAM_CMD_RPT:
|
|
header_list = ["Camera reply string", "ACK"]
|
|
content_list = [
|
|
custom_data[: len(custom_data) - 1].decode("utf-8"),
|
|
hex(custom_data[-1]),
|
|
]
|
|
pw.dlog(f"{header_list}")
|
|
pw.dlog(f"{content_list}")
|
|
elif action_id == ActionId.TM_FLASH_DIRECTORY_CONTENT:
|
|
if len(custom_data) < 16:
|
|
_LOGGER.warning(
|
|
"PLOC MPSoC flash directory data shorter than minimum 16 bytes"
|
|
)
|
|
current_idx = 0
|
|
dir_name_short = custom_data[current_idx : current_idx + 12].decode("utf-8")
|
|
current_idx += 12
|
|
num_elements = struct.unpack("!I", custom_data[current_idx : current_idx + 4])[
|
|
0
|
|
]
|
|
current_idx += 4
|
|
elem_names = []
|
|
elem_attrs = []
|
|
elem_sizes = []
|
|
expected_size = 16 + num_elements * 17
|
|
if len(custom_data) < expected_size:
|
|
_LOGGER.warning(
|
|
f"PLOC MPSoC flash directory data shorter than expected {expected_size}"
|
|
)
|
|
pw.dlog(
|
|
f"Received PLOC MPSoC flash directory content for path {dir_name_short} "
|
|
f"with {num_elements} elements"
|
|
)
|
|
# It is as weird as it looks..
|
|
for _ in range(num_elements):
|
|
end_of_str = custom_data[current_idx : current_idx + 12].index(b"\x00")
|
|
elem_name = custom_data[current_idx : current_idx + end_of_str].decode(
|
|
"utf-8"
|
|
)
|
|
current_idx += 12
|
|
elem_names.append(elem_name)
|
|
for _ in range(num_elements):
|
|
elem_attrs.append(custom_data[current_idx])
|
|
current_idx += 1
|
|
for _ in range(num_elements):
|
|
elem_sizes.append(
|
|
struct.unpack("!I", custom_data[current_idx : current_idx + 4])[0]
|
|
)
|
|
current_idx += 4
|
|
for i in range(num_elements):
|
|
pw.dlog(f"{DirElement(elem_names[i], elem_attrs[i], elem_sizes[i])}")
|