OOF2
All checks were successful
EIVE/-/pipeline/head This commit looks good

This commit is contained in:
Robin Müller 2023-11-22 11:21:26 +01:00
parent ba24faefa9
commit 02d9d6adfc
Signed by: muellerr
GPG Key ID: A649FB78196E3849
7 changed files with 168 additions and 205 deletions

View File

@ -1,14 +1,16 @@
from typing import Optional from typing import Optional
from tmtccmd import CcsdsTmtcBackend, HookBase from eive_tmtc.config.definitions import SPACE_PACKET_IDS
from eive_tmtc.tmtc.payload.ploc_supervisor import create_ploc_supv_node
from eive_tmtc.tmtc.payload.scex import create_scex_node
from eive_tmtc.tmtc.time import create_time_node
from eive_tmtc.tmtc.payload.ploc_mpsoc import create_ploc_mpsoc_node
from tmtccmd import HookBase, CcsdsTmtcBackend
from tmtccmd.com import ComInterface from tmtccmd.com import ComInterface
from tmtccmd.config import CmdTreeNode from tmtccmd.config import CmdTreeNode
from tmtccmd.util import ObjectIdDictT, RetvalDictT
from eive_tmtc.config.definitions import SPACE_PACKET_IDS
from eive_tmtc.config.retvals import get_retval_dict from eive_tmtc.config.retvals import get_retval_dict
from eive_tmtc.tmtc.time import create_time_node from tmtccmd.util import ObjectIdDictT, RetvalDictT
from eive_tmtc.tmtc.acs.acs_ctrl import create_acs_ctrl_node
class EiveHookObject(HookBase): class EiveHookObject(HookBase):
@ -41,6 +43,10 @@ class EiveHookObject(HookBase):
gyro_devs = CmdTreeNode("gyro_devs", "Gyro Devices") gyro_devs = CmdTreeNode("gyro_devs", "Gyro Devices")
acs_brd_assy_node.add_child(mgm_devs) acs_brd_assy_node.add_child(mgm_devs)
acs_brd_assy_node.add_child(gyro_devs) acs_brd_assy_node.add_child(gyro_devs)
acs_ctrl = CmdTreeNode("acs_ctrl", "ACS Controller")
acs_ctrl.add_child(mode_node)
acs_ctrl.add_child(action_node)
acs_ctrl.add_child(param_node)
rws = CmdTreeNode("rws", "Reaction Wheel Devices") rws = CmdTreeNode("rws", "Reaction Wheel Devices")
rw_assy = CmdTreeNode("rw_assy", "Reaction Wheel Assembly") rw_assy = CmdTreeNode("rw_assy", "Reaction Wheel Assembly")
rw_1 = CmdTreeNode("rw_1", "Reaction Wheel 1") rw_1 = CmdTreeNode("rw_1", "Reaction Wheel 1")
@ -63,7 +69,7 @@ class EiveHookObject(HookBase):
gnss_devs = CmdTreeNode("gnss", "GNSS Devices") gnss_devs = CmdTreeNode("gnss", "GNSS Devices")
acs_node.add_child(acs_brd_assy_node) acs_node.add_child(acs_brd_assy_node)
acs_node.add_child(create_acs_ctrl_node()) acs_node.add_child(acs_ctrl)
acs_node.add_child(mode_node) acs_node.add_child(mode_node)
acs_node.add_child(rws) acs_node.add_child(rws)
acs_node.add_child(mgt) acs_node.add_child(mgt)
@ -104,10 +110,11 @@ class EiveHookObject(HookBase):
eps_node.add_child(bat_node) eps_node.add_child(bat_node)
payload_node = CmdTreeNode("payload", "Payload Subsystem") payload_node = CmdTreeNode("payload", "Payload Subsystem")
scex_node = CmdTreeNode("scex", "SCEX devices")
scex_node.add_child(dev_node)
pl_pcdu = CmdTreeNode("pl_pcdu", "Payload PCDU") pl_pcdu = CmdTreeNode("pl_pcdu", "Payload PCDU")
payload_node.add_child(pl_pcdu) payload_node.add_child(pl_pcdu)
payload_node.add_child(create_scex_node())
payload_node.add_child(create_ploc_mpsoc_node())
payload_node.add_child(create_ploc_supv_node())
obdh_node = CmdTreeNode("obdh", "OBDH Subsystem") obdh_node = CmdTreeNode("obdh", "OBDH Subsystem")
xiphos_wdt = CmdTreeNode("wdt", "Xiphos WDT") xiphos_wdt = CmdTreeNode("wdt", "Xiphos WDT")
@ -127,8 +134,8 @@ class EiveHookObject(HookBase):
def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
from tmtccmd.config.com import ( from tmtccmd.config.com import (
create_com_interface_cfg_default,
create_com_interface_default, create_com_interface_default,
create_com_interface_cfg_default,
) )
assert self.cfg_path is not None assert self.cfg_path is not None

View File

@ -13,7 +13,7 @@ from eive_tmtc.tmtc.tcs.rtd import pack_rtd_commands
from eive_tmtc.tmtc.payload.scex import pack_scex_cmds from eive_tmtc.tmtc.payload.scex import pack_scex_cmds
from eive_tmtc.tmtc.tcs.subsystem import pack_tcs_sys_commands from eive_tmtc.tmtc.tcs.subsystem import pack_tcs_sys_commands
from eive_tmtc.tmtc.time import pack_time_management_cmd from eive_tmtc.tmtc.time import pack_time_management_cmd
from tmtccmd import DefaultProcedureInfo, TcHandlerBase from tmtccmd import DefaultProcedureInfo
from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.tmtc import DefaultPusQueueHelper
from eive_tmtc.tmtc.power.p60dock import pack_p60dock_cmds from eive_tmtc.tmtc.power.p60dock import pack_p60dock_cmds
@ -71,7 +71,6 @@ from eive_tmtc.utility.input_helper import InputHelper
def handle_default_procedure( # noqa C901: Complexity okay here. def handle_default_procedure( # noqa C901: Complexity okay here.
tc_base: TcHandlerBase,
info: DefaultProcedureInfo, info: DefaultProcedureInfo,
queue_helper: DefaultPusQueueHelper, queue_helper: DefaultPusQueueHelper,
): ):

View File

@ -5,7 +5,7 @@ import math
import socket import socket
import struct import struct
from socket import AF_INET from socket import AF_INET
from typing import Dict, Tuple from typing import Tuple
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
CmdTreeNode, CmdTreeNode,

View File

@ -15,13 +15,13 @@ from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import get_object_ids, PLOC_MPSOC_ID from eive_tmtc.config.object_ids import get_object_ids, PLOC_MPSOC_ID
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
CmdTreeNode,
tmtc_definitions_provider, tmtc_definitions_provider,
OpCodeEntry, OpCodeEntry,
TmtcDefinitionWrapper, TmtcDefinitionWrapper,
) )
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tmtc import service_provider from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.tmtc.decorator import ServiceProviderParams
from eive_tmtc.utility.input_helper import InputHelper from eive_tmtc.utility.input_helper import InputHelper
from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode
from tmtccmd.pus.s8_fsfw_action import create_action_cmd from tmtccmd.pus.s8_fsfw_action import create_action_cmd
@ -128,6 +128,8 @@ class Info:
REPLAY_WRITE_SEQ = "Replay write sequence" REPLAY_WRITE_SEQ = "Replay write sequence"
DOWNLINK_PWR_ON = "Downlink Power On" DOWNLINK_PWR_ON = "Downlink Power On"
DOWNLINK_PWR_OFF = "Downlink Power Off" DOWNLINK_PWR_OFF = "Downlink Power Off"
MEM_WRITE = "Write to Memory"
MEM_READ = "Read from Memory"
REPLAY_START = "Replay Start" REPLAY_START = "Replay Start"
CAM_TAKE_PIC = "Cam Take Picture" CAM_TAKE_PIC = "Cam Take Picture"
SIMPLEX_SEND_FILE = "Simplex Send File" SIMPLEX_SEND_FILE = "Simplex Send File"
@ -143,6 +145,18 @@ class MemAddresses(enum.IntEnum):
DEADBEEF = 0x40000004 DEADBEEF = 0x40000004
def create_ploc_mpsoc_node() -> CmdTreeNode:
op_code_strs = [
getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__")
]
info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
combined_dict = dict(zip(op_code_strs, info_strs))
ploc_mpsoc = CmdTreeNode("ploc_mpsoc", "PLOC MPSoC")
for op_code, info in combined_dict.items():
ploc_mpsoc.add_child(CmdTreeNode(op_code, info))
return ploc_mpsoc
@tmtc_definitions_provider @tmtc_definitions_provider
def add_ploc_mpsoc_cmds(defs: TmtcDefinitionWrapper): def add_ploc_mpsoc_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry() oce = OpCodeEntry()
@ -174,31 +188,29 @@ def add_ploc_mpsoc_cmds(defs: TmtcDefinitionWrapper):
defs.add_service(CustomServiceList.PLOC_MPSOC.value, "Ploc MPSoC", oce) defs.add_service(CustomServiceList.PLOC_MPSOC.value, "Ploc MPSoC", oce)
@service_provider(CustomServiceList.PLOC_MPSOC) def pack_ploc_mpsoc_commands(
def pack_ploc_mpsoc_commands( # noqa C901 q: DefaultPusQueueHelper, cmd_str: str
p: ServiceProviderParams,
): # noqa C901: Complexity okay here. ): # noqa C901: Complexity okay here.
object_id = get_object_ids().get(PLOC_MPSOC_ID) object_id = get_object_ids().get(PLOC_MPSOC_ID)
q = p.queue_helper assert object_id is not None
prefix = "PLOC MPSoC" prefix = "PLOC MPSoC"
op_code = p.op_code
q.add_log_cmd( q.add_log_cmd(
f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}" f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}"
) )
obyt = object_id.as_bytes obyt = object_id.as_bytes
if op_code == OpCode.OFF: if cmd_str == OpCode.OFF:
q.add_log_cmd(f"{prefix}: {Info.OFF}") q.add_log_cmd(f"{prefix}: {Info.OFF}")
command = pack_mode_data(obyt, Mode.OFF, 0) command = pack_mode_data(obyt, Mode.OFF, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command))
if op_code == OpCode.ON: if cmd_str == OpCode.ON:
q.add_log_cmd(f"{prefix}: {Info.ON}") q.add_log_cmd(f"{prefix}: {Info.ON}")
data = pack_mode_data(obyt, Mode.ON, 0) data = pack_mode_data(obyt, Mode.ON, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
if op_code == OpCode.NORMAL: if cmd_str == OpCode.NORMAL:
q.add_log_cmd(f"{prefix}: {Info.NORMAL}") q.add_log_cmd(f"{prefix}: {Info.NORMAL}")
data = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0) data = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
if op_code == OpCode.MEM_WRITE: if cmd_str == OpCode.MEM_WRITE:
q.add_log_cmd("PLOC MPSoC: TC mem write test") q.add_log_cmd("PLOC MPSoC: TC mem write test")
memory_address = int( memory_address = int(
input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16 input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16
@ -210,39 +222,35 @@ def pack_ploc_mpsoc_commands( # noqa C901
object_id.as_bytes, memory_address, memory_data, mem_len object_id.as_bytes, memory_address, memory_data, mem_len
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "4": if cmd_str == OpCode.FLASH_WRITE_FILE:
q.add_log_cmd("PLOC MPSoC: TC mem read test")
data = prepare_mem_read_command(object_id=object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.FLASH_WRITE_FILE:
q.add_log_cmd(f"{prefix}: {Info.FLASH_WRITE_FILE}") q.add_log_cmd(f"{prefix}: {Info.FLASH_WRITE_FILE}")
data = prepare_flash_write_cmd(object_id.as_bytes) data = prepare_flash_write_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.FLASH_READ_FILE: if cmd_str == OpCode.FLASH_READ_FILE:
q.add_log_cmd(f"{prefix}: {Info.FLASH_READ_FILE}") q.add_log_cmd(f"{prefix}: {Info.FLASH_READ_FILE}")
data = prepare_flash_read_cmd(object_id.as_bytes) data = prepare_flash_read_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.FLASH_DELETE_FILE: if cmd_str == OpCode.FLASH_DELETE_FILE:
q.add_log_cmd("PLOC MPSoC: Flash delete") q.add_log_cmd("PLOC MPSoC: Flash delete")
data = prepare_flash_delete_cmd(object_id.as_bytes) data = prepare_flash_delete_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code in OpCode.REPLAY_START: if cmd_str in OpCode.REPLAY_START:
q.add_log_cmd(f"{prefix}: {Info.REPLAY_START}") q.add_log_cmd(f"{prefix}: {Info.REPLAY_START}")
data = prepare_replay_start_cmd(object_id.as_bytes) data = prepare_replay_start_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.REPLAY_STOP: if cmd_str == OpCode.REPLAY_STOP:
q.add_log_cmd("PLOC MPSoC: Replay stop") q.add_log_cmd("PLOC MPSoC: Replay stop")
data = object_id.as_bytes + struct.pack("!I", ActionId.TC_REPLAY_STOP) data = object_id.as_bytes + struct.pack("!I", ActionId.TC_REPLAY_STOP)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.DOWNLINK_PWR_ON: if cmd_str == OpCode.DOWNLINK_PWR_ON:
q.add_log_cmd(f"{prefix}: {OpCode.DOWNLINK_PWR_ON}") q.add_log_cmd(f"{prefix}: {OpCode.DOWNLINK_PWR_ON}")
data = prepare_downlink_pwr_on_cmd(object_id.as_bytes) data = prepare_downlink_pwr_on_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.DOWNLINK_PWR_OFF: if cmd_str == OpCode.DOWNLINK_PWR_OFF:
q.add_log_cmd("PLOC MPSoC: Downlink pwr off") q.add_log_cmd("PLOC MPSoC: Downlink pwr off")
data = object_id.as_bytes + struct.pack("!I", ActionId.TC_DOWNLINK_PWR_OFF) data = object_id.as_bytes + struct.pack("!I", ActionId.TC_DOWNLINK_PWR_OFF)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.FLASH_GET_DIR_CONTENT: if cmd_str == OpCode.FLASH_GET_DIR_CONTENT:
q.add_log_cmd(f"{prefix}: {Info.FLASH_GET_DIR_CONTENT}") q.add_log_cmd(f"{prefix}: {Info.FLASH_GET_DIR_CONTENT}")
dir_name = input("Please specify MPSoC directory name to get information for: ") dir_name = input("Please specify MPSoC directory name to get information for: ")
dir_name = bytearray(dir_name.encode("utf-8")) dir_name = bytearray(dir_name.encode("utf-8"))
@ -254,15 +262,15 @@ def pack_ploc_mpsoc_commands( # noqa C901
user_data=dir_name, user_data=dir_name,
) )
) )
if op_code == OpCode.REPLAY_WRITE_SEQ: if cmd_str == OpCode.REPLAY_WRITE_SEQ:
q.add_log_cmd(f"{prefix}: {Info.REPLAY_WRITE_SEQ}") q.add_log_cmd(f"{prefix}: {Info.REPLAY_WRITE_SEQ}")
data = prepare_replay_write_sequence_cmd(object_id.as_bytes) data = prepare_replay_write_sequence_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "12": if cmd_str == "12":
q.add_log_cmd("PLOC MPSoC: Reset OBSW sequence count") q.add_log_cmd("PLOC MPSoC: Reset OBSW sequence count")
data = object_id.as_bytes + struct.pack("!I", ActionId.OBSW_RESET_SEQ_COUNT) data = object_id.as_bytes + struct.pack("!I", ActionId.OBSW_RESET_SEQ_COUNT)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.VERIFY_BOOT: if cmd_str == OpCode.VERIFY_BOOT:
num_words = 1 num_words = 1
q.add_log_cmd(f"{prefix} {Info.VERIFY_BOOT}") q.add_log_cmd(f"{prefix} {Info.VERIFY_BOOT}")
data = ( data = (
@ -272,15 +280,15 @@ def pack_ploc_mpsoc_commands( # noqa C901
+ struct.pack("!H", num_words) + struct.pack("!H", num_words)
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.MODE_REPLAY: if cmd_str == OpCode.MODE_REPLAY:
q.add_log_cmd("PLOC MPSoC: Tc mode replay") q.add_log_cmd("PLOC MPSoC: Tc mode replay")
data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_REPLAY) data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_REPLAY)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.MODE_IDLE: if cmd_str == OpCode.MODE_IDLE:
q.add_log_cmd("PLOC MPSoC: Tc mode idle") q.add_log_cmd("PLOC MPSoC: Tc mode idle")
data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_IDLE) data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_IDLE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "16": if cmd_str == "16":
q.add_log_cmd("PLOC MPSoC: Tc cam command send") q.add_log_cmd("PLOC MPSoC: Tc cam command send")
cam_cmd = input("Specify cam command string: ") cam_cmd = input("Specify cam command string: ")
data = ( data = (
@ -289,27 +297,27 @@ def pack_ploc_mpsoc_commands( # noqa C901
+ bytearray(cam_cmd, "utf-8") + bytearray(cam_cmd, "utf-8")
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "17": if cmd_str == "17":
q.add_log_cmd("PLOC MPSoC: Set UART TX tristate") q.add_log_cmd("PLOC MPSoC: Set UART TX tristate")
data = object_id.as_bytes + struct.pack("!I", ActionId.SET_UART_TX_TRISTATE) data = object_id.as_bytes + struct.pack("!I", ActionId.SET_UART_TX_TRISTATE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == "18": if cmd_str == "18":
q.add_log_cmd("PLOC MPSoC: Release UART TX") q.add_log_cmd("PLOC MPSoC: Release UART TX")
data = object_id.as_bytes + struct.pack("!I", ActionId.RELEASE_UART_TX) data = object_id.as_bytes + struct.pack("!I", ActionId.RELEASE_UART_TX)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.CAM_TAKE_PIC: if cmd_str == OpCode.CAM_TAKE_PIC:
q.add_log_cmd("PLOC MPSoC: Cam take picture") q.add_log_cmd("PLOC MPSoC: Cam take picture")
data = prepare_cam_take_pic_cmd(object_id.as_bytes) data = prepare_cam_take_pic_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.SIMPLEX_SEND_FILE: if cmd_str == OpCode.SIMPLEX_SEND_FILE:
q.add_log_cmd("PLOC MPSoC: Simplex send file") q.add_log_cmd("PLOC MPSoC: Simplex send file")
data = prepare_simplex_send_file_cmd(object_id.as_bytes) data = prepare_simplex_send_file_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.DOWNLINK_DATA_MODULATE: if cmd_str == OpCode.DOWNLINK_DATA_MODULATE:
q.add_log_cmd("PLOC MPSoC: Downlink data modulate") q.add_log_cmd("PLOC MPSoC: Downlink data modulate")
data = prepare_downlink_data_modulate_cmd(object_id.as_bytes) data = prepare_downlink_data_modulate_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
if op_code == OpCode.MODE_SNAPSHOT: if cmd_str == OpCode.MODE_SNAPSHOT:
q.add_log_cmd("PLOC MPSoC: Mode snapshot") q.add_log_cmd("PLOC MPSoC: Mode snapshot")
data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_SNAPSHOT) data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_SNAPSHOT)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
@ -671,10 +679,7 @@ def handle_mpsoc_data_reply(action_id: int, pw: PrintWrapper, custom_data: bytea
current_idx = 0 current_idx = 0
dir_name_short = custom_data[current_idx : current_idx + 12].decode("utf-8") dir_name_short = custom_data[current_idx : current_idx + 12].decode("utf-8")
current_idx += 12 current_idx += 12
num_elements = struct.unpack("!I", custom_data[current_idx : current_idx + 4])[ num_elements = struct.unpack("!I", custom_data[current_idx : current_idx + 4])[0]
0
]
current_idx += 4
elem_names = [] elem_names = []
elem_attrs = [] elem_attrs = []
elem_sizes = [] elem_sizes = []

View File

@ -11,14 +11,11 @@ import logging
import struct import struct
from eive_tmtc.config.object_ids import PLOC_SUPV_ID, get_object_ids from eive_tmtc.config.object_ids import PLOC_SUPV_ID, get_object_ids
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.config import TmtcDefinitionWrapper from tmtccmd.config.tmtc import CmdTreeNode
from tmtccmd.config.tmtc import tmtc_definitions_provider, OpCodeEntry from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.tmtc import service_provider
from tmtccmd.tmtc.decorator import ServiceProviderParams
from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode
from tmtccmd.pus.s8_fsfw_action import create_action_cmd from tmtccmd.pus.s8_fsfw_action import create_action_cmd
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
@ -26,7 +23,7 @@ from eive_tmtc.utility.input_helper import InputHelper
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
latchup_id_dict = { LATCHUP_ID_DICT = {
"0": "0.85V", "0": "0.85V",
"1": "1.8V", "1": "1.8V",
"2": "MISC", "2": "MISC",
@ -135,22 +132,23 @@ class SetIds(enum.IntEnum):
BOOT_STATUS_REPORT = 103 BOOT_STATUS_REPORT = 103
class OpCodes: class OpCode(str, enum.Enum):
OFF = ["0", "off"] value: str
ON = ["1", "on"] OFF = "off"
NORMAL = ["2", "nml"] ON = "on"
HK_TO_OBC = ["3", "hk_to_obc"] NML = "nml"
REQUEST_HK = ["4", "req_hk"] HK_TO_OBC = "hk_to_obc"
START_MPSOC = ["5", "start_mpsoc"] REQUEST_HK = "req_hk"
SHUTDOWN_MPSOC = ["6", "stop_mpsoc"] START_MPSOC = "start_mpsoc"
SEL_NVM = ["7", "sel_nvm"] SHUTDOWN_MPSOC = "stop_mpsoc"
SET_TIME_REF = ["set_time_ref"] SEL_NVM = "sel_nvm"
FACTORY_FLASH = ["factory_flash"] SET_TIME_REF = "set_time_ref"
REQ_BOOT_STATUS_REPORT = ["13", "boot_report"] FACTORY_FLASH = "factory_flash"
START_UPDATE = ["42", "start_update"] REQ_BOOT_STATUS_REPORT = "boot_report"
PERFORM_UPDATE = ["update"] START_UPDATE = "start_update"
FACTORY_RESET = ["factory_reset"] PERFORM_UPDATE = "update"
MEM_CHECK = ["mem_check"] FACTORY_RESET = "factory_reset"
MEM_CHECK = "mem_check"
RESET_MPSOC = "reset_mpsoc" RESET_MPSOC = "reset_mpsoc"
@ -161,6 +159,8 @@ class Info(str, enum.Enum):
NML = "Switch Normal" NML = "Switch Normal"
HK_TO_OBC = "Request HK from PLOC SUPV" HK_TO_OBC = "Request HK from PLOC SUPV"
REQUEST_HK = "Request HK set from PLOC Handler" REQUEST_HK = "Request HK set from PLOC Handler"
START_MPSOC = "Start MPSoC"
SHUTDOWN_MPSOC = "Shutdown MPSoC"
SET_TIME_REF = "Set time reference" SET_TIME_REF = "Set time reference"
FACTORY_FLASH = "Factory Flash Mode" FACTORY_FLASH = "Factory Flash Mode"
PERFORM_UPDATE = "Start or continue MPSoC SW update at starting bytes" PERFORM_UPDATE = "Start or continue MPSoC SW update at starting bytes"
@ -172,95 +172,54 @@ class Info(str, enum.Enum):
RESET_MPSOC = "Reset MPSoC" RESET_MPSOC = "Reset MPSoC"
@tmtc_definitions_provider def create_ploc_supv_node() -> CmdTreeNode:
def add_ploc_supv_cmds(defs: TmtcDefinitionWrapper): op_code_strs = [
oce = OpCodeEntry() getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__")
oce.add(OpCodes.OFF, Info.OFF) ]
oce.add(OpCodes.ON, Info.ON) info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
oce.add(OpCodes.NORMAL, Info.NML) combined_dict = dict(zip(op_code_strs, info_strs))
oce.add(OpCodes.HK_TO_OBC, Info.HK_TO_OBC) node = CmdTreeNode("ploc_supv", "PLOC Supervisor")
oce.add(OpCodes.REQUEST_HK, Info.REQUEST_HK) for op_code, info in combined_dict.items():
oce.add(OpCodes.START_MPSOC, "PLOC Supervisor: Start MPSoC") node.add_child(CmdTreeNode(op_code, info))
oce.add(OpCodes.SHUTDOWN_MPSOC, "PLOC Supervisor: Shutdown MPSoC") return node
oce.add(OpCodes.SEL_NVM, Info.SEL_NVM)
oce.add(OpCodes.SET_TIME_REF, Info.SET_TIME_REF)
oce.add(OpCodes.FACTORY_RESET, Info.FACTORY_RESET)
oce.add(OpCodes.RESET_MPSOC, Info.RESET_MPSOC)
oce.add("8", "PLOC Supervisor: Set max restart tries")
oce.add("11", "PLOC Supervisor: Set boot timeout")
oce.add("12", "PLOC Supervisor: Disable Hk")
oce.add(OpCodes.REQ_BOOT_STATUS_REPORT, Info.REQ_BOOT_STATUS_REPORT)
oce.add("17", "PLOC Supervisor: Enable latchup alert")
oce.add("18", "PLOC Supervisor: Disable latchup alert")
oce.add("20", "PLOC Supervisor: Set alert limit")
oce.add("23", "PLOC Supervisor: Set ADC enabled channels")
oce.add("24", "PLOC Supervisor: Set ADC window and stride")
oce.add("25", "PLOC Supervisor: Set ADC threshold")
oce.add("26", "PLOC Supervisor: Request latchup status report")
oce.add("27", "PLOC Supervisor: Copy ADC data to MRAM")
oce.add("30", "PLOC Supervisor: Run auto EM tests")
oce.add("31", "PLOC Supervisor: MRAM Wipe")
oce.add("35", "PLOC Supervisor: Set GPIO")
oce.add("36", "PLOC Supervisor: Read GPIO")
oce.add("37", "PLOC Supervisor: Restart supervisor")
oce.add(OpCodes.PERFORM_UPDATE, Info.PERFORM_UPDATE)
oce.add(OpCodes.START_UPDATE, Info.START_UPDATE)
oce.add("43", "PLOC Supervisor: Terminate supervisor process")
oce.add("44", "PLOC Supervisor: Start MPSoC quiet")
oce.add("45", "PLOC Supervisor: Set shutdown timeout")
oce.add(OpCodes.FACTORY_FLASH, Info.FACTORY_FLASH)
oce.add("47", "PLOC Supervisor: Enable auto TM")
oce.add("48", "PLOC Supervisor: Disable auto TM")
oce.add("51", "PLOC Supervisor: Logging request event buffers")
oce.add("52", "PLOC Supervisor: Logging clear counters")
oce.add("53", "PLOC Supervisor: Logging set topic")
oce.add("54", "PLOC Supervisor: Logging request counters")
oce.add("55", "PLOC Supervisor: Request ADC Report")
oce.add("56", "PLOC Supervisor: Reset PL")
oce.add("57", "PLOC Supervisor: Enable NVMs")
oce.add("58", "PLOC Supervisor: Continue update")
oce.add(OpCodes.MEM_CHECK, Info.MEM_CHECK)
defs.add_service(CustomServiceList.PLOC_SUPV.value, "PLOC Supervisor", oce)
@service_provider(CustomServiceList.PLOC_SUPV) def pack_ploc_supv_commands(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901
def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901
q = p.queue_helper
op_code = p.op_code
object_id = get_object_ids().get(PLOC_SUPV_ID) object_id = get_object_ids().get(PLOC_SUPV_ID)
assert object_id is not None
q.add_log_cmd(f"Testing PLOC Supervisor with object id: {object_id.as_hex_string}") q.add_log_cmd(f"Testing PLOC Supervisor with object id: {object_id.as_hex_string}")
obyt = object_id.as_bytes obyt = object_id.as_bytes
prefix = "PLOC Supervisor" prefix = "PLOC Supervisor"
if op_code in OpCodes.OFF: if cmd_str == OpCode.OFF:
q.add_log_cmd(f"{prefix}: {Info.OFF}") q.add_log_cmd(f"{prefix}: {Info.OFF}")
command = pack_mode_data(object_id.as_bytes, Mode.OFF, 0) command = pack_mode_data(object_id.as_bytes, Mode.OFF, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command))
if op_code in OpCodes.ON: if cmd_str == OpCode.ON:
q.add_log_cmd(f"{prefix}: {Info.ON}") q.add_log_cmd(f"{prefix}: {Info.ON}")
command = pack_mode_data(object_id.as_bytes, Mode.ON, 0) command = pack_mode_data(object_id.as_bytes, Mode.ON, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command))
if op_code in OpCodes.NORMAL: if cmd_str == OpCode.NML:
q.add_log_cmd(f"{prefix}: {Info.NML}") q.add_log_cmd(f"{prefix}: {Info.NML}")
command = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0) command = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command))
if op_code in OpCodes.HK_TO_OBC: if cmd_str == OpCode.HK_TO_OBC:
q.add_log_cmd(f"{prefix}: {Info.HK_TO_OBC}") q.add_log_cmd(f"{prefix}: {Info.HK_TO_OBC}")
command = obyt + struct.pack("!I", SupvActionId.HK_REPORT) command = obyt + struct.pack("!I", SupvActionId.HK_REPORT)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code in OpCodes.REQUEST_HK: if cmd_str == OpCode.REQUEST_HK:
q.add_log_cmd(f"{prefix}: {Info.REQUEST_HK}") q.add_log_cmd(f"{prefix}: {Info.REQUEST_HK}")
sid = make_sid(object_id.as_bytes, SetIds.HK_REPORT) sid = make_sid(object_id.as_bytes, SetIds.HK_REPORT)
cmd = generate_one_hk_command(sid) cmd = generate_one_hk_command(sid)
q.add_pus_tc(cmd) q.add_pus_tc(cmd)
elif op_code in OpCodes.START_MPSOC: elif cmd_str == OpCode.START_MPSOC:
q.add_log_cmd("PLOC Supervisor: Start MPSoC") q.add_log_cmd("PLOC Supervisor: Start MPSoC")
command = obyt + struct.pack("!I", SupvActionId.START_MPSOC) command = obyt + struct.pack("!I", SupvActionId.START_MPSOC)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code in OpCodes.SHUTDOWN_MPSOC: if cmd_str == OpCode.SHUTDOWN_MPSOC:
q.add_log_cmd("PLOC Supervisor: Shutdown MPSoC") q.add_log_cmd("PLOC Supervisor: Shutdown MPSoC")
command = object_id.as_bytes + struct.pack("!I", SupvActionId.SHUTWOWN_MPSOC) command = object_id.as_bytes + struct.pack("!I", SupvActionId.SHUTWOWN_MPSOC)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code in OpCodes.SEL_NVM: if cmd_str == OpCode.SEL_NVM:
q.add_log_cmd("PLOC Supervisor: Select MPSoC boot image") q.add_log_cmd("PLOC Supervisor: Select MPSoC boot image")
mem = int(input("MEM (NVM0 - 0 or NVM1 - 1): ")) mem = int(input("MEM (NVM0 - 0 or NVM1 - 1): "))
bp0 = int(input("BP0 (0 or 1): ")) bp0 = int(input("BP0 (0 or 1): "))
@ -268,7 +227,7 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901
bp2 = int(input("BP2 (0 or 1): ")) bp2 = int(input("BP2 (0 or 1): "))
command = pack_sel_boot_image_cmd(object_id.as_bytes, mem, bp0, bp1, bp2) command = pack_sel_boot_image_cmd(object_id.as_bytes, mem, bp0, bp1, bp2)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code in OpCodes.FACTORY_RESET: if cmd_str == OpCode.FACTORY_RESET:
q.add_log_cmd(f"{prefix}: {Info.FACTORY_RESET}") q.add_log_cmd(f"{prefix}: {Info.FACTORY_RESET}")
while True: while True:
print("Please select the key for a factory reset operation") print("Please select the key for a factory reset operation")
@ -285,7 +244,7 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901
user_data=bytes([key]), user_data=bytes([key]),
) )
) )
if op_code == "8": if cmd_str == "8":
q.add_log_cmd("PLOC Supervisor: Set max restart tries") q.add_log_cmd("PLOC Supervisor: Set max restart tries")
restart_tries = int(input("Specify maximum restart tries: ")) restart_tries = int(input("Specify maximum restart tries: "))
command = ( command = (
@ -294,15 +253,15 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901
+ struct.pack("!B", restart_tries) + struct.pack("!B", restart_tries)
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == OpCodes.RESET_MPSOC: if cmd_str == OpCode.RESET_MPSOC:
q.add_log_cmd(Info.RESET_MPSOC) q.add_log_cmd(Info.RESET_MPSOC)
command = object_id.as_bytes + struct.pack("!I", SupvActionId.RESET_MPSOC) command = object_id.as_bytes + struct.pack("!I", SupvActionId.RESET_MPSOC)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code in OpCodes.SET_TIME_REF: if cmd_str in OpCode.SET_TIME_REF:
q.add_log_cmd("PLOC Supervisor: Set time reference") q.add_log_cmd("PLOC Supervisor: Set time reference")
command = object_id.as_bytes + struct.pack("!I", SupvActionId.SET_TIME_REF) command = object_id.as_bytes + struct.pack("!I", SupvActionId.SET_TIME_REF)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "11": if cmd_str == "11":
q.add_log_cmd("PLOC Supervisor: Set boot timeout") q.add_log_cmd("PLOC Supervisor: Set boot timeout")
boot_timeout = int(input("Specify boot timeout [ms]: ")) boot_timeout = int(input("Specify boot timeout [ms]: "))
command = ( command = (
@ -311,11 +270,11 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901
+ struct.pack("!I", boot_timeout) + struct.pack("!I", boot_timeout)
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "12": if cmd_str == "12":
q.add_log_cmd("PLOC Supervisor: Disable HK") q.add_log_cmd("PLOC Supervisor: Disable HK")
command = object_id.as_bytes + struct.pack("!I", SupvActionId.DISABLE_HK) command = object_id.as_bytes + struct.pack("!I", SupvActionId.DISABLE_HK)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code in OpCodes.REQ_BOOT_STATUS_REPORT: if cmd_str in OpCode.REQ_BOOT_STATUS_REPORT:
q.add_log_cmd(f"{prefix}: {Info.REQ_BOOT_STATUS_REPORT}") q.add_log_cmd(f"{prefix}: {Info.REQ_BOOT_STATUS_REPORT}")
command = object_id.as_bytes + struct.pack( command = object_id.as_bytes + struct.pack(
"!I", SupvActionId.GET_BOOT_STATUS_REPORT "!I", SupvActionId.GET_BOOT_STATUS_REPORT
@ -325,129 +284,129 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901
sid = make_sid(object_id.as_bytes, SetIds.BOOT_STATUS_REPORT) sid = make_sid(object_id.as_bytes, SetIds.BOOT_STATUS_REPORT)
req_hk = generate_one_hk_command(sid) req_hk = generate_one_hk_command(sid)
q.add_pus_tc(req_hk) q.add_pus_tc(req_hk)
if op_code == "17": if cmd_str == "17":
q.add_log_cmd("PLOC Supervisor: Enable latchup alert") q.add_log_cmd("PLOC Supervisor: Enable latchup alert")
command = pack_lachtup_alert_cmd(object_id.as_bytes, True) command = pack_lachtup_alert_cmd(object_id.as_bytes, True)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "18": if cmd_str == "18":
q.add_log_cmd("PLOC Supervisor: Disable latchup alert") q.add_log_cmd("PLOC Supervisor: Disable latchup alert")
command = pack_lachtup_alert_cmd(object_id.as_bytes, False) command = pack_lachtup_alert_cmd(object_id.as_bytes, False)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "20": if cmd_str == "20":
q.add_log_cmd("PLOC Supervisor: Set alert limit") q.add_log_cmd("PLOC Supervisor: Set alert limit")
command = pack_set_alert_limit_cmd(object_id.as_bytes) command = pack_set_alert_limit_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "23": if cmd_str == "23":
q.add_log_cmd("PLOC Supervisor: Set ADC enabled channels") q.add_log_cmd("PLOC Supervisor: Set ADC enabled channels")
command = pack_set_adc_enabled_channels_cmd(object_id.as_bytes) command = pack_set_adc_enabled_channels_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "24": if cmd_str == "24":
q.add_log_cmd("PLOC Supervisor: Set ADC window and stride") q.add_log_cmd("PLOC Supervisor: Set ADC window and stride")
command = pack_set_adc_window_and_stride_cmd(object_id.as_bytes) command = pack_set_adc_window_and_stride_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "25": if cmd_str == "25":
q.add_log_cmd("PLOC Supervisor: Set ADC threshold") q.add_log_cmd("PLOC Supervisor: Set ADC threshold")
command = pack_set_adc_threshold_cmd(object_id.as_bytes) command = pack_set_adc_threshold_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "26": if cmd_str == "26":
q.add_log_cmd("PLOC Supervisor: Request latchup status report") q.add_log_cmd("PLOC Supervisor: Request latchup status report")
command = object_id.as_bytes + struct.pack( command = object_id.as_bytes + struct.pack(
"!I", SupvActionId.GET_LATCHUP_STATUS_REPORT "!I", SupvActionId.GET_LATCHUP_STATUS_REPORT
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "27": if cmd_str == "27":
q.add_log_cmd("PLOC Supervisor: Copy ADC data to MRAM") q.add_log_cmd("PLOC Supervisor: Copy ADC data to MRAM")
command = object_id.as_bytes + struct.pack( command = object_id.as_bytes + struct.pack(
"!I", SupvActionId.COPY_ADC_DATA_TO_MRAM "!I", SupvActionId.COPY_ADC_DATA_TO_MRAM
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "30": if cmd_str == "30":
q.add_log_cmd("PLOC Supervisor: Run auto EM tests") q.add_log_cmd("PLOC Supervisor: Run auto EM tests")
command = pack_auto_em_tests_cmd(object_id.as_bytes) command = pack_auto_em_tests_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "31": if cmd_str == "31":
q.add_log_cmd("PLOC Supervisor: Wipe MRAM") q.add_log_cmd("PLOC Supervisor: Wipe MRAM")
command = pack_mram_wipe_cmd(object_id.as_bytes) command = pack_mram_wipe_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "35": if cmd_str == "35":
q.add_log_cmd("PLOC Supervisor: Set GPIO command") q.add_log_cmd("PLOC Supervisor: Set GPIO command")
command = pack_set_gpio_cmd(object_id.as_bytes) command = pack_set_gpio_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "36": if cmd_str == "36":
q.add_log_cmd("PLOC Supervisor: Read GPIO command") q.add_log_cmd("PLOC Supervisor: Read GPIO command")
command = pack_read_gpio_cmd(object_id.as_bytes) command = pack_read_gpio_cmd(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "37": if cmd_str == "37":
q.add_log_cmd("PLOC Supervisor: Restart supervisor") q.add_log_cmd("PLOC Supervisor: Restart supervisor")
command = object_id.as_bytes + struct.pack( command = object_id.as_bytes + struct.pack(
"!I", SupvActionId.RESTART_SUPERVISOR "!I", SupvActionId.RESTART_SUPERVISOR
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code in OpCodes.START_UPDATE: if cmd_str in OpCode.START_UPDATE:
q.add_log_cmd("PLOC Supversior: Start new MPSoC SW update") q.add_log_cmd("PLOC Supversior: Start new MPSoC SW update")
command = pack_update_command(object_id.as_bytes, True) command = pack_update_command(object_id.as_bytes, True)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code in OpCodes.PERFORM_UPDATE: if cmd_str in OpCode.PERFORM_UPDATE:
q.add_log_cmd("PLOC Supervisor: Perform MPSoC SW update") q.add_log_cmd("PLOC Supervisor: Perform MPSoC SW update")
command = pack_update_command(object_id.as_bytes, False) command = pack_update_command(object_id.as_bytes, False)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "43": if cmd_str == "43":
q.add_log_cmd("PLOC Supervisor: Terminate supervisor process") q.add_log_cmd("PLOC Supervisor: Terminate supervisor process")
command = object_id.as_bytes + struct.pack( command = object_id.as_bytes + struct.pack(
"!I", SupvActionId.TERMINATE_SUPV_HELPER "!I", SupvActionId.TERMINATE_SUPV_HELPER
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "44": if cmd_str == "44":
q.add_log_cmd("PLOC Supervisor: Start MPSoC quiet") q.add_log_cmd("PLOC Supervisor: Start MPSoC quiet")
command = object_id.as_bytes + struct.pack("!I", SupvActionId.START_MPSOC_QUIET) command = object_id.as_bytes + struct.pack("!I", SupvActionId.START_MPSOC_QUIET)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "45": if cmd_str == "45":
q.add_log_cmd("PLOC Supervisor: Set shutdown timeout") q.add_log_cmd("PLOC Supervisor: Set shutdown timeout")
command = pack_set_shutdown_timeout_command(object_id.as_bytes) command = pack_set_shutdown_timeout_command(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code in OpCodes.FACTORY_FLASH: if cmd_str in OpCode.FACTORY_FLASH:
q.add_log_cmd(f"{prefix}: {Info.FACTORY_FLASH}") q.add_log_cmd(f"{prefix}: {Info.FACTORY_FLASH}")
command = object_id.as_bytes + struct.pack("!I", SupvActionId.FACTORY_FLASH) command = object_id.as_bytes + struct.pack("!I", SupvActionId.FACTORY_FLASH)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "47": if cmd_str == "47":
q.add_log_cmd("PLOC Supervisor: Enable auto TM") q.add_log_cmd("PLOC Supervisor: Enable auto TM")
command = object_id.as_bytes + struct.pack("!I", SupvActionId.ENABLE_AUTO_TM) command = object_id.as_bytes + struct.pack("!I", SupvActionId.ENABLE_AUTO_TM)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "48": if cmd_str == "48":
q.add_log_cmd("PLOC Supervisor: Disable auto TM") q.add_log_cmd("PLOC Supervisor: Disable auto TM")
command = object_id.as_bytes + struct.pack("!I", SupvActionId.DISABLE_AUTO_TM) command = object_id.as_bytes + struct.pack("!I", SupvActionId.DISABLE_AUTO_TM)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "51": if cmd_str == "51":
q.add_log_cmd("PLOC Supervisor: Logging request event buffers") q.add_log_cmd("PLOC Supervisor: Logging request event buffers")
command = pack_logging_buffer_request(object_id.as_bytes) command = pack_logging_buffer_request(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "52": if cmd_str == "52":
q.add_log_cmd("PLOC Supervisor: Logging clear counters") q.add_log_cmd("PLOC Supervisor: Logging clear counters")
command = object_id.as_bytes + struct.pack( command = object_id.as_bytes + struct.pack(
"!I", SupvActionId.LOGGING_CLEAR_COUNTERS "!I", SupvActionId.LOGGING_CLEAR_COUNTERS
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "53": if cmd_str == "53":
q.add_log_cmd("PLOC Supervisor: Logging set topic") q.add_log_cmd("PLOC Supervisor: Logging set topic")
command = pack_logging_set_topic(object_id.as_bytes) command = pack_logging_set_topic(object_id.as_bytes)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "54": if cmd_str == "54":
q.add_log_cmd("PLOC Supervisor: Logging request counters") q.add_log_cmd("PLOC Supervisor: Logging request counters")
command = object_id.as_bytes + struct.pack( command = object_id.as_bytes + struct.pack(
"!I", SupvActionId.LOGGING_REQUEST_COUNTERS "!I", SupvActionId.LOGGING_REQUEST_COUNTERS
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "55": if cmd_str == "55":
q.add_log_cmd("PLOC Supervisor: Request ADC report") q.add_log_cmd("PLOC Supervisor: Request ADC report")
command = object_id.as_bytes + struct.pack( command = object_id.as_bytes + struct.pack(
"!I", SupvActionId.REQUEST_ADC_REPORT "!I", SupvActionId.REQUEST_ADC_REPORT
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "56": if cmd_str == "56":
q.add_log_cmd("PLOC Supervisor: Reset PL") q.add_log_cmd("PLOC Supervisor: Reset PL")
command = object_id.as_bytes + struct.pack("!I", SupvActionId.RESET_PL) command = object_id.as_bytes + struct.pack("!I", SupvActionId.RESET_PL)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "57": if cmd_str == "57":
q.add_log_cmd("PLOC Supervisor: Enable NVMs") q.add_log_cmd("PLOC Supervisor: Enable NVMs")
nvm01 = int(input("Enable (1) or disable(0) NVM 0 and 1: ")) nvm01 = int(input("Enable (1) or disable(0) NVM 0 and 1: "))
nvm3 = int(input("Enable (1) or disable(0) NVM 3: ")) nvm3 = int(input("Enable (1) or disable(0) NVM 3: "))
@ -458,11 +417,11 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901
+ struct.pack("B", nvm3) + struct.pack("B", nvm3)
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "58": if cmd_str == "58":
q.add_log_cmd("PLOC Supervisor: Continue update") q.add_log_cmd("PLOC Supervisor: Continue update")
command = object_id.as_bytes + struct.pack("!I", SupvActionId.CONTINUE_UPDATE) command = object_id.as_bytes + struct.pack("!I", SupvActionId.CONTINUE_UPDATE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code in OpCodes.MEM_CHECK: if cmd_str == OpCode.MEM_CHECK:
custom_data = bytearray() custom_data = bytearray()
update_file = get_update_file() update_file = get_update_file()
memory_id = int(input("Specify memory ID: ")) memory_id = int(input("Specify memory ID: "))
@ -545,9 +504,9 @@ def get_latchup_id() -> int:
description_string = "Description".ljust(description_column_width) description_string = "Description".ljust(description_column_width)
print(f"{key_string} | {description_string}") print(f"{key_string} | {description_string}")
print(separator_string) print(separator_string)
for key in latchup_id_dict: for key in LATCHUP_ID_DICT:
key_string = key.ljust(key_column_width) key_string = key.ljust(key_column_width)
description_string = latchup_id_dict[key].ljust(description_column_width) description_string = LATCHUP_ID_DICT[key].ljust(description_column_width)
print(f"{key_string} | {description_string}") print(f"{key_string} | {description_string}")
return int(input("Specify latchup ID: ")) return int(input("Specify latchup ID: "))

View File

@ -2,16 +2,12 @@ import enum
import json import json
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand
from tmtccmd.config.tmtc import CmdTreeNode
from eive_tmtc.config.definitions import CustomServiceList
from tmtccmd.config.tmtc import tmtc_definitions_provider
from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data, Subservice
from tmtccmd.tmtc import DefaultPusQueueHelper, service_provider
from tmtccmd.tmtc.decorator import ServiceProviderParams
from tmtccmd.pus.s8_fsfw_action import create_action_cmd from tmtccmd.pus.s8_fsfw_action import create_action_cmd
from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper from tmtccmd.pus.s200_fsfw_mode import Mode, Subservice, pack_mode_data
from eive_tmtc.config.object_ids import SCEX_HANDLER_ID from tmtccmd.tmtc import DefaultPusQueueHelper
from eive_tmtc.config.object_ids import SCEX_HANDLER_ID
USE_SCEX_CONF_FILE = True USE_SCEX_CONF_FILE = True
@ -26,10 +22,8 @@ class OpCode:
ALL_CELLS_CMD = "allcells" ALL_CELLS_CMD = "allcells"
FRAM = "fram" FRAM = "fram"
ON = "on"
SWITCH_ON = "on" SWITCH_ON = "on"
OFF = "off" SWITCH_OFF = "off"
SWITCH_OFF = OFF
NORMAL = "normal" NORMAL = "normal"
@ -59,28 +53,20 @@ class Info:
NORMAL = "Switch SCEX to normal mode" NORMAL = "Switch SCEX to normal mode"
@tmtc_definitions_provider def create_scex_node() -> CmdTreeNode:
def add_scex_cmds(defs: TmtcDefinitionWrapper): op_code_strs = [
oce = OpCodeEntry() getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__")
oce.add(keys=OpCode.PING, info=Info.PING) ]
oce.add(keys=OpCode.ION_CMD, info=Info.ION_CMD) info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
oce.add(keys=OpCode.TEMP_CMD, info=Info.TEMP_CMD) combined_dict = dict(zip(op_code_strs, info_strs))
oce.add(keys=OpCode.EXP_STATUS_CMD, info=Info.EXP_STATUS_CMD) scex_node = CmdTreeNode("ploc_mpsoc", "PLOC MPSoC")
oce.add(keys=OpCode.ONE_CELLS_CMD, info=Info.ONE_CELLS_CMD) for op_code, info in combined_dict.items():
scex_node.add_child(CmdTreeNode(op_code, info))
oce.add(keys=OpCode.ALL_CELLS_CMD, info=Info.ALL_CELLS_CMD) return scex_node
oce.add(keys=OpCode.FRAM, info=Info.FRAM)
oce.add(keys=OpCode.ON, info=Info.SWITCH_ON)
oce.add(keys=OpCode.OFF, info=Info.SWITCH_OFF)
oce.add(keys=OpCode.NORMAL, info=Info.NORMAL)
defs.add_service(
name=CustomServiceList.SCEX.value, info="SCEX Device", op_code_entry=oce
)
def pack_scex_cmds(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901 def pack_scex_cmds(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901
if cmd_str == OpCode.ON: if cmd_str == OpCode.SWITCH_ON:
q.add_log_cmd(Info.SWITCH_ON) q.add_log_cmd(Info.SWITCH_ON)
q.add_pus_tc( q.add_pus_tc(
PusTelecommand( PusTelecommand(
@ -98,7 +84,7 @@ def pack_scex_cmds(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901
app_data=pack_mode_data(SCEX_HANDLER_ID, Mode.NORMAL, 0), app_data=pack_mode_data(SCEX_HANDLER_ID, Mode.NORMAL, 0),
) )
) )
if cmd_str == OpCode.OFF: if cmd_str == OpCode.SWITCH_OFF:
q.add_log_cmd(Info.SWITCH_OFF) q.add_log_cmd(Info.SWITCH_OFF)
q.add_pus_tc( q.add_pus_tc(
PusTelecommand( PusTelecommand(
@ -158,7 +144,8 @@ def pack_scex_cmds(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901
dac_weight1 = json_data["dac_weight1"] dac_weight1 = json_data["dac_weight1"]
dac_weight2 = json_data["dac_weight2"] dac_weight2 = json_data["dac_weight2"]
dac_weight3 = json_data["dac_weight3"] dac_weight3 = json_data["dac_weight3"]
else:
raise ValueError("CLI support for SCEX params not implemented")
# in app_data # in app_data
# app_data.extend(struct.pack("!H", first_dac)) # app_data.extend(struct.pack("!H", first_dac))
app_data.append(cell_select) app_data.append(cell_select)
@ -192,6 +179,8 @@ def pack_scex_cmds(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901
dac_weight2 = json_data["dac_weight2"] dac_weight2 = json_data["dac_weight2"]
dac_weight3 = json_data["dac_weight3"] dac_weight3 = json_data["dac_weight3"]
else:
raise ValueError("CLI support for SCEX params not implemented")
# in app_data # in app_data
# app_data.extend(struct.pack("!H", first_dac)) # app_data.extend(struct.pack("!H", first_dac))
append_16_bit_val(packet=app_data, val=first_dac[cn]) append_16_bit_val(packet=app_data, val=first_dac[cn])

View File

@ -30,6 +30,7 @@ from tmtccmd.config.args import (
PreArgsParsingWrapper, PreArgsParsingWrapper,
ProcedureParamsWrapper, ProcedureParamsWrapper,
SetupParams, SetupParams,
perform_tree_printout,
) )
from tmtccmd.core import BackendRequest from tmtccmd.core import BackendRequest
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
@ -133,6 +134,9 @@ def setup_params() -> Tuple[SetupWrapper, int]:
hk_level = int(post_arg_parsing_wrapper.args_raw.hk) hk_level = int(post_arg_parsing_wrapper.args_raw.hk)
else: else:
hk_level = 0 hk_level = 0
if params.app_params.print_tree:
perform_tree_printout(params.app_params, hook_obj.get_command_definitions())
sys.exit(0)
params.apid = PUS_APID params.apid = PUS_APID
if params.com_if is None: if params.com_if is None:
raise ValueError("could not determine a COM interface.") raise ValueError("could not determine a COM interface.")