MPSoC module update #187

Merged
meggert merged 3 commits from mpsoc_commands into main 2023-05-04 12:01:43 +02:00
2 changed files with 132 additions and 84 deletions

View File

@ -13,6 +13,7 @@ list yields a list of all related PRs for each release.
## Added ## Added
- Event handling for reboot counter events. - Event handling for reboot counter events.
- Start adding new MPSoC commands, improve MPSoC commanding module a bit.
# [v3.1.1] 2023-04-17 # [v3.1.1] 2023-04-17

View File

@ -22,38 +22,38 @@ from tmtccmd.tc import service_provider
from tmtccmd.tc.decorator import ServiceProviderParams from tmtccmd.tc.decorator import ServiceProviderParams
from eive_tmtc.utility.input_helper import InputHelper from eive_tmtc.utility.input_helper import InputHelper
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode
from tmtccmd.tc.pus_8_fsfw_funccmd import create_action_cmd
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
MANUAL_INPUT = "1" MANUAL_INPUT = "1"
flash_write_file_dict = { FLASH_WRITE_FILE_DICT = {
MANUAL_INPUT: ["manual input", ""], MANUAL_INPUT: ("manual input", ""),
"2": ["/mnt/sd0/ploc/mpsoc/flash_write.bin", "/mnt/sd0/ploc/mpsoc/flash_write.bin"], "2": ("/mnt/sd0/ploc/mpsoc/flash_write.bin", "/mnt/sd0/ploc/mpsoc/flash_write.bin"),
} }
mpsoc_file_dict = { MPSOC_FILE_DICT = {
MANUAL_INPUT: ["manual input", ""], MANUAL_INPUT: ("manual input", ""),
"2": ["0:/flash", "0:/flash"], "2": ("0:/flash", "0:/flash"),
} }
SEQ_FILE_NAMES = ["0:/EM16/231", "0:/EQ04/E-75", "0:/EQ01/E130"] SEQ_FILE_NAMES = ["0:/EM16/231", "0:/EQ04/E-75", "0:/EQ01/E130"]
SEQ_FILE_DICT = { SEQ_FILE_DICT = {
MANUAL_INPUT: ["manual input", ""], MANUAL_INPUT: ("manual input", ""),
"2": [f"16QRM On Carrier 200 MBd ({SEQ_FILE_NAMES[0]})", f"{SEQ_FILE_NAMES[0]}"], "2": (f"16QRM On Carrier 200 MBd ({SEQ_FILE_NAMES[0]})", f"{SEQ_FILE_NAMES[0]}"),
"3": [f"QPSK On Carrier 780 MBd ({SEQ_FILE_NAMES[1]})", f"{SEQ_FILE_NAMES[1]}"], "3": (f"QPSK On Carrier 780 MBd ({SEQ_FILE_NAMES[1]})", f"{SEQ_FILE_NAMES[1]}"),
"4": [f"Maximum Bandwidth QPSK ({SEQ_FILE_NAMES[2]})", f"{SEQ_FILE_NAMES[2]}"], "4": (f"Maximum Bandwidth QPSK ({SEQ_FILE_NAMES[2]})", f"{SEQ_FILE_NAMES[2]}"),
} }
CARRIAGE_RETURN = 0xD CARRIAGE_RETURN = 0xD
class CommandId(enum.IntEnum): class ActionId(enum.IntEnum):
TC_MEM_WRITE = 1 TC_MEM_WRITE = 1
TC_MEM_READ = 2 TC_MEM_READ = 2
FLASH_WRITE = 9 TC_FLASH_WRITE_FULL_FILE = 9
TC_FLASH_DELETE = 10 TC_FLASH_DELETE = 10
TC_REPLAY_START = 11 TC_REPLAY_START = 11
TC_REPLAY_STOP = 12 TC_REPLAY_STOP = 12
@ -70,22 +70,32 @@ class CommandId(enum.IntEnum):
TC_SIMPLEX_SEND_FILE = 23 TC_SIMPLEX_SEND_FILE = 23
TC_DOWNLINK_DATA_MODULATE = 24 TC_DOWNLINK_DATA_MODULATE = 24
TC_MODE_SNAPSHOT = 25 TC_MODE_SNAPSHOT = 25
TC_FLASH_DIR_GET_CONTENT = 28
TC_FLASH_READ_FULL_FILE = 30
class OpCode: class OpCode:
ON = ["on"] ON = "on"
OFF = ["off"] OFF = "off"
NORMAL = ["normal"] NORMAL = "normal"
VERIFY_BOOT = ["verify_boot"] VERIFY_BOOT = "verify_boot"
MODE_REPLAY = ["mode_replay"] MODE_REPLAY = "mode_replay"
MODE_IDLE = ["mode_idle"] MODE_IDLE = "mode_idle"
REPLAY_WRITE_SEQ = ["replay_write"] REPLAY_WRITE_SEQ = "replay_write"
DOWNLINK_PWR_ON = ["downlink_pwr_on"] DOWNLINK_PWR_ON = "downlink_pwr_on"
REPLAY_START = ["replay_start"] MEM_WRITE = "memory_write"
CAM_TAKE_PIC = ["cam_take_pic"] MEM_READ = "memory_read"
SIMPLEX_SEND_FILE = ["simplex_send_file"] DOWNLINK_PWR_OFF = "downlink_pwr_off"
DOWNLINK_DATA_MODULATE = ["downlink_data_modulate"] FLASH_WRITE_FILE = "flash_write_file"
MODE_SNAPSHOT = ["mode_snapshot"] FLASH_READ_FILE = "flash_read_file"
FLASH_DELETE_FILE = "flash_delete_file"
FLASH_GET_DIR_CONTENT = "flash_get_dir_content"
REPLAY_STOP = "replay_stop"
REPLAY_START = "replay_start"
CAM_TAKE_PIC = "cam_take_pic"
SIMPLEX_SEND_FILE = "simplex_send_file"
DOWNLINK_DATA_MODULATE = "downlink_data_modulate"
MODE_SNAPSHOT = "mode_snapshot"
class Info: class Info:
@ -94,12 +104,18 @@ class Info:
NORMAL = "Normal" NORMAL = "Normal"
VERIFY_BOOT = "Verify boot by reading 0xdeadbeef from DEADBEEF address" VERIFY_BOOT = "Verify boot by reading 0xdeadbeef from DEADBEEF address"
MODE_REPLAY = "Switch to REPLAY mode" MODE_REPLAY = "Switch to REPLAY mode"
REPLAY_STOP = "Stop Replay"
MODE_IDLE = "Switch to IDLE mode" MODE_IDLE = "Switch to IDLE mode"
REPLAY_WRITE_SEQ = "Replay write sequence" REPLAY_WRITE_SEQ = "Replay write sequence"
DOWNLINK_PWR_ON = "Downlink Power On" DOWNLINK_PWR_ON = "Downlink Power On"
DOWNLINK_PWR_OFF = "Downlink Power Off"
REPLAY_START = "Replay Start" REPLAY_START = "Replay Start"
CAM_TAKE_PIC = "Cam Take Picture" CAM_TAKE_PIC = "Cam Take Picture"
SIMPLEX_SEND_FILE = "Simplex Send File" SIMPLEX_SEND_FILE = "Simplex Send File"
FLASH_READ_FILE = "Copy file from MPSoC to OBC"
FLASH_WRITE_FILE = "Copy file from OBC to MPSoC"
FLASH_DELETE_FILE = "Delete file on MPSoC"
FLASH_GET_DIR_CONTENT = "Get flash directory content on MPSoC"
DOWNLINK_DATA_MODULATE = "Downlink data modulate" DOWNLINK_DATA_MODULATE = "Downlink data modulate"
MODE_SNAPSHOT = "Mode Snapshot" MODE_SNAPSHOT = "Mode Snapshot"
@ -119,17 +135,18 @@ def add_ploc_mpsoc_cmds(defs: TmtcDefinitionWrapper):
oce.add(OpCode.OFF, Info.OFF) oce.add(OpCode.OFF, Info.OFF)
oce.add(OpCode.ON, Info.ON) oce.add(OpCode.ON, Info.ON)
oce.add(OpCode.NORMAL, Info.NORMAL) oce.add(OpCode.NORMAL, Info.NORMAL)
oce.add("3", "Ploc MPSoC: Memory write") oce.add(OpCode.MEM_WRITE, "Ploc MPSoC: Memory write")
oce.add("4", "Ploc MPSoC: Memory read") oce.add(OpCode.MEM_READ, "Ploc MPSoC: Memory read")
oce.add("5", "Ploc MPSoC: Flash write") oce.add(OpCode.FLASH_WRITE_FILE, Info.FLASH_WRITE_FILE)
oce.add("6", "Ploc MPSoC: Flash delete") oce.add(OpCode.FLASH_READ_FILE, Info.FLASH_READ_FILE)
oce.add(OpCode.FLASH_DELETE_FILE, Info.FLASH_DELETE_FILE)
oce.add(OpCode.REPLAY_START, Info.REPLAY_START) oce.add(OpCode.REPLAY_START, Info.REPLAY_START)
oce.add("8", "Ploc MPSoC: Replay stop") oce.add(OpCode.REPLAY_STOP, Info.REPLAY_STOP)
oce.add(OpCode.DOWNLINK_PWR_ON, Info.DOWNLINK_PWR_ON) oce.add(OpCode.DOWNLINK_PWR_ON, Info.DOWNLINK_PWR_ON)
oce.add("10", "Ploc MPSoC: Downlink pwr off") oce.add(OpCode.DOWNLINK_PWR_OFF, Info.DOWNLINK_PWR_OFF)
oce.add(OpCode.REPLAY_WRITE_SEQ, Info.REPLAY_WRITE_SEQ) oce.add(OpCode.REPLAY_WRITE_SEQ, Info.REPLAY_WRITE_SEQ)
oce.add("12", "Ploc MPSoC: OBSW reset sequence count") oce.add("12", "Ploc MPSoC: OBSW reset sequence count")
oce.add(OpCode.VERIFY_BOOT, "Ploc MPSoC: Read DEADBEEF address") oce.add(OpCode.VERIFY_BOOT, Info.VERIFY_BOOT)
oce.add(OpCode.MODE_REPLAY, Info.MODE_REPLAY) oce.add(OpCode.MODE_REPLAY, Info.MODE_REPLAY)
oce.add(OpCode.MODE_IDLE, Info.MODE_IDLE) oce.add(OpCode.MODE_IDLE, Info.MODE_IDLE)
oce.add("16", "Ploc MPSoC: Tc cam command send") oce.add("16", "Ploc MPSoC: Tc cam command send")
@ -152,19 +169,19 @@ def pack_ploc_mpsoc_commands(p: ServiceProviderParams):
f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}" f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}"
) )
obyt = object_id.as_bytes obyt = object_id.as_bytes
if op_code in OpCode.OFF: if op_code == OpCode.OFF:
q.add_log_cmd(f"{prefix}: {Info.OFF}") q.add_log_cmd(f"{prefix}: {Info.OFF}")
command = pack_mode_data(obyt, Mode.OFF, 0) command = pack_mode_data(obyt, Mode.OFF, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command))
if op_code in OpCode.ON: if op_code == OpCode.ON:
q.add_log_cmd(f"{prefix}: {Info.ON}") q.add_log_cmd(f"{prefix}: {Info.ON}")
data = pack_mode_data(obyt, Mode.ON, 0) data = pack_mode_data(obyt, Mode.ON, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
if op_code in OpCode.NORMAL: if op_code == OpCode.NORMAL:
q.add_log_cmd(f"{prefix}: {Info.NORMAL}") q.add_log_cmd(f"{prefix}: {Info.NORMAL}")
data = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0) data = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
if op_code == "3": if op_code == OpCode.MEM_WRITE:
q.add_log_cmd("PLOC MPSoC: TC mem write test") q.add_log_cmd("PLOC MPSoC: TC mem write test")
memory_address = int( memory_address = int(
input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16 input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16
@ -180,11 +197,15 @@ def pack_ploc_mpsoc_commands(p: ServiceProviderParams):
q.add_log_cmd("PLOC MPSoC: TC mem read test") q.add_log_cmd("PLOC MPSoC: TC mem read test")
data = prepare_mem_read_command(object_id=object_id.as_bytes) data = prepare_mem_read_command(object_id=object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "5": if op_code == OpCode.FLASH_WRITE_FILE:
q.add_log_cmd("PLOC MPSoC: Flash write") q.add_log_cmd(f"{prefix}: {Info.FLASH_WRITE_FILE}")
data = prepare_flash_write_cmd(object_id.as_bytes) data = prepare_flash_write_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "6": if op_code == OpCode.FLASH_READ_FILE:
q.add_log_cmd(f"{prefix}: {Info.FLASH_READ_FILE}")
data = prepare_flash_read_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.FLASH_DELETE_FILE:
q.add_log_cmd("PLOC MPSoC: Flash delete") q.add_log_cmd("PLOC MPSoC: Flash delete")
data = prepare_flash_delete_cmd(object_id.as_bytes) data = prepare_flash_delete_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
@ -192,76 +213,86 @@ def pack_ploc_mpsoc_commands(p: ServiceProviderParams):
q.add_log_cmd(f"{prefix}: {Info.REPLAY_START}") q.add_log_cmd(f"{prefix}: {Info.REPLAY_START}")
data = prepare_replay_start_cmd(object_id.as_bytes) data = prepare_replay_start_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "8": if op_code == OpCode.REPLAY_STOP:
q.add_log_cmd("PLOC MPSoC: Replay stop") q.add_log_cmd("PLOC MPSoC: Replay stop")
data = object_id.as_bytes + struct.pack("!I", CommandId.TC_REPLAY_STOP) data = object_id.as_bytes + struct.pack("!I", ActionId.TC_REPLAY_STOP)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code in OpCode.DOWNLINK_PWR_ON: if op_code == OpCode.DOWNLINK_PWR_ON:
q.add_log_cmd(f"{prefix}: {OpCode.DOWNLINK_PWR_ON}") q.add_log_cmd(f"{prefix}: {OpCode.DOWNLINK_PWR_ON}")
data = prepare_downlink_pwr_on_cmd(object_id.as_bytes) data = prepare_downlink_pwr_on_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "10": if op_code == OpCode.DOWNLINK_PWR_OFF:
q.add_log_cmd("PLOC MPSoC: Downlink pwr off") q.add_log_cmd("PLOC MPSoC: Downlink pwr off")
data = object_id.as_bytes + struct.pack("!I", CommandId.TC_DOWNLINK_PWR_OFF) data = object_id.as_bytes + struct.pack("!I", ActionId.TC_DOWNLINK_PWR_OFF)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code in OpCode.REPLAY_WRITE_SEQ: if op_code == OpCode.FLASH_GET_DIR_CONTENT:
q.add_log_cmd(f"{prefix}: {OpCode.FLASH_GET_DIR_CONTENT}")
dir_name = input("Please specify MPSoC directory name to get information for")
dir_name = bytearray(dir_name.encode("utf-8"))
dir_name.append(0)
return create_action_cmd(
object_id=object_id.as_bytes,
action_id=ActionId.TC_FLASH_DIR_GET_CONTENT,
user_data=dir_name,
)
if op_code == OpCode.REPLAY_WRITE_SEQ:
q.add_log_cmd(f"{prefix}: {Info.REPLAY_WRITE_SEQ}") q.add_log_cmd(f"{prefix}: {Info.REPLAY_WRITE_SEQ}")
data = prepare_replay_write_sequence_cmd(object_id.as_bytes) data = prepare_replay_write_sequence_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "12": if op_code == "12":
q.add_log_cmd("PLOC MPSoC: Reset OBSW sequence count") q.add_log_cmd("PLOC MPSoC: Reset OBSW sequence count")
data = object_id.as_bytes + struct.pack("!I", CommandId.OBSW_RESET_SEQ_COUNT) data = object_id.as_bytes + struct.pack("!I", ActionId.OBSW_RESET_SEQ_COUNT)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code in OpCode.VERIFY_BOOT: if op_code == OpCode.VERIFY_BOOT:
num_words = 1 num_words = 1
q.add_log_cmd(f"{prefix} {Info.VERIFY_BOOT}") q.add_log_cmd(f"{prefix} {Info.VERIFY_BOOT}")
data = ( data = (
object_id.as_bytes object_id.as_bytes
+ struct.pack("!I", CommandId.TC_MEM_READ) + struct.pack("!I", ActionId.TC_MEM_READ)
+ struct.pack("!I", MemAddresses.DEADBEEF) + struct.pack("!I", MemAddresses.DEADBEEF)
+ struct.pack("!H", num_words) + struct.pack("!H", num_words)
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code in OpCode.MODE_REPLAY: if op_code == OpCode.MODE_REPLAY:
q.add_log_cmd("PLOC MPSoC: Tc mode replay") q.add_log_cmd("PLOC MPSoC: Tc mode replay")
data = object_id.as_bytes + struct.pack("!I", CommandId.TC_MODE_REPLAY) data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_REPLAY)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code in OpCode.MODE_IDLE: if op_code == OpCode.MODE_IDLE:
q.add_log_cmd("PLOC MPSoC: Tc mode idle") q.add_log_cmd("PLOC MPSoC: Tc mode idle")
data = object_id.as_bytes + struct.pack("!I", CommandId.TC_MODE_IDLE) data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_IDLE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "16": if op_code == "16":
q.add_log_cmd("PLOC MPSoC: Tc cam command send") q.add_log_cmd("PLOC MPSoC: Tc cam command send")
cam_cmd = input("Specify cam command string: ") cam_cmd = input("Specify cam command string: ")
data = ( data = (
object_id.as_bytes object_id.as_bytes
+ struct.pack("!I", CommandId.TC_CAM_CMD_SEND) + struct.pack("!I", ActionId.TC_CAM_CMD_SEND)
+ bytearray(cam_cmd, "utf-8") + bytearray(cam_cmd, "utf-8")
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "17": if op_code == "17":
q.add_log_cmd("PLOC MPSoC: Set UART TX tristate") q.add_log_cmd("PLOC MPSoC: Set UART TX tristate")
data = object_id.as_bytes + struct.pack("!I", CommandId.SET_UART_TX_TRISTATE) data = object_id.as_bytes + struct.pack("!I", ActionId.SET_UART_TX_TRISTATE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "18": if op_code == "18":
q.add_log_cmd("PLOC MPSoC: Release UART TX") q.add_log_cmd("PLOC MPSoC: Release UART TX")
data = object_id.as_bytes + struct.pack("!I", CommandId.RELEASE_UART_TX) data = object_id.as_bytes + struct.pack("!I", ActionId.RELEASE_UART_TX)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code in OpCode.CAM_TAKE_PIC: if op_code == OpCode.CAM_TAKE_PIC:
q.add_log_cmd("PLOC MPSoC: Cam take picture") q.add_log_cmd("PLOC MPSoC: Cam take picture")
data = prepare_cam_take_pic_cmd(object_id.as_bytes) data = prepare_cam_take_pic_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code in OpCode.SIMPLEX_SEND_FILE: if op_code == OpCode.SIMPLEX_SEND_FILE:
q.add_log_cmd("PLOC MPSoC: Simplex send file") q.add_log_cmd("PLOC MPSoC: Simplex send file")
data = prepare_simplex_send_file_cmd(object_id.as_bytes) data = prepare_simplex_send_file_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code in OpCode.DOWNLINK_DATA_MODULATE: if op_code == OpCode.DOWNLINK_DATA_MODULATE:
q.add_log_cmd("PLOC MPSoC: Downlink data modulate") q.add_log_cmd("PLOC MPSoC: Downlink data modulate")
data = prepare_downlink_data_modulate_cmd(object_id.as_bytes) data = prepare_downlink_data_modulate_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code in OpCode.MODE_SNAPSHOT: if op_code == OpCode.MODE_SNAPSHOT:
q.add_log_cmd("PLOC MPSoC: Mode snapshot") q.add_log_cmd("PLOC MPSoC: Mode snapshot")
data = object_id.as_bytes + struct.pack("!I", CommandId.TC_MODE_SNAPSHOT) data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_SNAPSHOT)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
@ -278,7 +309,7 @@ def generate_write_mem_command(
""" """
command = ( command = (
object_id object_id
+ struct.pack("!I", CommandId.TC_MEM_WRITE) + struct.pack("!I", ActionId.TC_MEM_WRITE)
+ struct.pack("!I", memory_address) + struct.pack("!I", memory_address)
+ struct.pack("!H", mem_len) + struct.pack("!H", mem_len)
+ struct.pack("!I", memory_data) + struct.pack("!I", memory_data)
@ -291,31 +322,40 @@ def prepare_mem_read_command(object_id: bytes) -> bytearray:
num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: ")) num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: "))
command = ( command = (
object_id object_id
+ struct.pack("!I", CommandId.TC_MEM_READ) + struct.pack("!I", ActionId.TC_MEM_READ)
+ struct.pack("!I", memory_address) + struct.pack("!I", memory_address)
+ struct.pack("!H", num_words) + struct.pack("!H", num_words)
) )
return bytearray(command) return bytearray(command)
def prepare_flash_write_cmd(object_id: bytes) -> bytearray: def prepare_flash_base_cmd(action_id: int, object_id: bytes) -> bytearray:
obc_file = get_obc_file() obc_file = get_obc_file()
mpsoc_file = get_mpsoc_file() mpsoc_file = get_mpsoc_file()
command = ( command = bytearray(object_id)
object_id command.extend(struct.pack("!I", action_id))
+ struct.pack("!I", CommandId.FLASH_WRITE) command.extend(obc_file.encode("utf-8"))
+ bytearray(obc_file, "utf-8") command.append(0)
+ bytearray(mpsoc_file, "utf-8") command.extend(mpsoc_file.encode("utf-8"))
) command.append(0)
return bytearray(command) return command
def prepare_flash_write_cmd(object_id: bytes) -> bytearray:
return prepare_flash_base_cmd(ActionId.TC_FLASH_WRITE_FULL_FILE, object_id)
def prepare_flash_read_cmd(object_id: bytes) -> bytearray:
cmd = prepare_flash_base_cmd(ActionId.TC_FLASH_READ_FULL_FILE, object_id)
file_size = get_mpsoc_file_size()
cmd.extend(struct.pack("!I", file_size))
return cmd
def prepare_flash_delete_cmd(object_id: bytes) -> bytearray: def prepare_flash_delete_cmd(object_id: bytes) -> bytearray:
file = get_mpsoc_file() file = get_mpsoc_file()
command = ( command = (
object_id object_id + struct.pack("!I", ActionId.TC_FLASH_DELETE) + file.encode("utf-8")
+ struct.pack("!I", CommandId.TC_FLASH_DELETE)
+ bytearray(file, "utf-8")
) )
return bytearray(command) return bytearray(command)
@ -324,7 +364,7 @@ def prepare_replay_start_cmd(object_id: bytes) -> bytearray:
replay = int(input("Specify replay mode (0 - once, 1 - repeated): ")) replay = int(input("Specify replay mode (0 - once, 1 - repeated): "))
command = ( command = (
object_id object_id
+ struct.pack("!I", CommandId.TC_REPLAY_START) + struct.pack("!I", ActionId.TC_REPLAY_START)
+ struct.pack("!B", replay) + struct.pack("!B", replay)
) )
return bytearray(command) return bytearray(command)
@ -335,7 +375,7 @@ def prepare_downlink_pwr_on_cmd(object_id: bytes) -> bytearray:
lane_rate = int(input("Specify lane rate (0 - 9): ")) lane_rate = int(input("Specify lane rate (0 - 9): "))
command = ( command = (
object_id object_id
+ struct.pack("!I", CommandId.TC_DOWNLINK_PWR_ON) + struct.pack("!I", ActionId.TC_DOWNLINK_PWR_ON)
+ struct.pack("!B", mode) + struct.pack("!B", mode)
+ struct.pack("!B", lane_rate) + struct.pack("!B", lane_rate)
) )
@ -347,7 +387,7 @@ def prepare_replay_write_sequence_cmd(object_id: bytes) -> bytearray:
file = get_sequence_file() file = get_sequence_file()
command = ( command = (
object_id object_id
+ struct.pack("!I", CommandId.TC_REPLAY_WRITE_SEQUENCE) + struct.pack("!I", ActionId.TC_REPLAY_WRITE_SEQUENCE)
+ struct.pack("!B", use_decoding) + struct.pack("!B", use_decoding)
+ bytearray(file, "utf-8") + bytearray(file, "utf-8")
# + bytes([0]) # + bytes([0])
@ -377,7 +417,7 @@ def prepare_cam_take_pic_cmd(object_id: bytes) -> bytearray:
bypass_compressor = int(input("Specify bypassCompressor: ")) bypass_compressor = int(input("Specify bypassCompressor: "))
command = ( command = (
object_id object_id
+ struct.pack("!I", CommandId.TC_CAM_TAKE_PIC) + struct.pack("!I", ActionId.TC_CAM_TAKE_PIC)
+ bytearray(filename, "utf-8") + bytearray(filename, "utf-8")
+ bytes([0]) + bytes([0])
+ struct.pack("!B", encoder_setting_y) + struct.pack("!B", encoder_setting_y)
@ -395,7 +435,7 @@ def prepare_simplex_send_file_cmd(object_id: bytes) -> bytearray:
filename = input("Specify filename: ") filename = input("Specify filename: ")
command = ( command = (
object_id object_id
+ struct.pack("!I", CommandId.TC_SIMPLEX_SEND_FILE) + struct.pack("!I", ActionId.TC_SIMPLEX_SEND_FILE)
+ bytearray(filename, "utf-8") + bytearray(filename, "utf-8")
+ bytes([0]) + bytes([0])
) )
@ -409,7 +449,7 @@ def prepare_downlink_data_modulate_cmd(object_id: bytes) -> bytearray:
dest_mem_addr = int(input("Specify destMemAddr: ")) dest_mem_addr = int(input("Specify destMemAddr: "))
command = ( command = (
object_id object_id
+ struct.pack("!I", CommandId.TC_DOWNLINK_DATA_MODULATE) + struct.pack("!I", ActionId.TC_DOWNLINK_DATA_MODULATE)
+ struct.pack("!B", format) + struct.pack("!B", format)
+ struct.pack("!I", src_mem_addr) + struct.pack("!I", src_mem_addr)
+ struct.pack("!H", src_mem_len) + struct.pack("!H", src_mem_len)
@ -420,26 +460,33 @@ def prepare_downlink_data_modulate_cmd(object_id: bytes) -> bytearray:
def get_obc_file() -> str: def get_obc_file() -> str:
_LOGGER.info("Specify OBC file ") _LOGGER.info("Specify OBC file ")
input_helper = InputHelper(flash_write_file_dict) input_helper = InputHelper(FLASH_WRITE_FILE_DICT)
key = input_helper.get_key() key = input_helper.get_key()
if key == MANUAL_INPUT: if key == MANUAL_INPUT:
file = input("Ploc MPSoC: Specify absolute name of flash file: ") file = input("Ploc MPSoC: Specify absolute name of flash file: ")
else: else:
file = flash_write_file_dict[key][1] file = FLASH_WRITE_FILE_DICT[key][1]
return file return file
def get_mpsoc_file() -> str: def get_mpsoc_file() -> str:
_LOGGER.info("Specify MPSoC file") _LOGGER.info("Specify MPSoC file")
input_helper = InputHelper(mpsoc_file_dict) input_helper = InputHelper(MPSOC_FILE_DICT)
key = input_helper.get_key() key = input_helper.get_key()
if key == MANUAL_INPUT: if key == MANUAL_INPUT:
file = input("Ploc MPSoC: Specify absolute name file: ") file = input("Ploc MPSoC: Specify absolute name file: ")
else: else:
file = mpsoc_file_dict[key][1] file = MPSOC_FILE_DICT[key][1]
return file return file
def get_mpsoc_file_size() -> int:
file_size = int(input("Specify MPSoC file size"))
if file_size <= 0:
raise ValueError("Invalid file size")
return file_size
def get_sequence_file() -> str: def get_sequence_file() -> str:
_LOGGER.info("Specify sequence file") _LOGGER.info("Specify sequence file")
input_helper = InputHelper(SEQ_FILE_DICT) input_helper = InputHelper(SEQ_FILE_DICT)