looking good
EIVE/-/pipeline/head This commit looks good Details

This commit is contained in:
Robin Müller 2023-11-22 16:40:27 +01:00
parent ae239031ed
commit af02c106b9
Signed by: muellerr
GPG Key ID: A649FB78196E3849
8 changed files with 105 additions and 51 deletions

View File

@ -7,9 +7,12 @@ from tmtccmd.util import ObjectIdDictT, RetvalDictT
from eive_tmtc.config.definitions import SPACE_PACKET_IDS
from eive_tmtc.config.retvals import get_retval_dict
from eive_tmtc.tmtc.com.subsystem import create_com_subsystem_node
from eive_tmtc.tmtc.health import create_global_health_node
from eive_tmtc.tmtc.core import create_core_node
from eive_tmtc.tmtc.acs.subsystem import create_acs_subsystem_node
from eive_tmtc.tmtc.com.ccsds_handler import create_ccsds_node
from eive_tmtc.tmtc.com.subsystem import create_com_subsystem_node
from eive_tmtc.tmtc.com.syrlinks_handler import create_syrlinks_node
from eive_tmtc.tmtc.health import create_global_health_node
from eive_tmtc.tmtc.payload.ploc_mpsoc import create_ploc_mpsoc_node
from eive_tmtc.tmtc.payload.ploc_supervisor import create_ploc_supv_node
from eive_tmtc.tmtc.payload.scex import create_scex_node
@ -97,13 +100,8 @@ class EiveHookObject(HookBase):
tcs_node.add_child(tcs_brd_assy)
com_node = create_com_subsystem_node()
syrlinks_node = CmdTreeNode("syrlinks", "Syrlinks")
syrlinks_node.add_child(dev_node)
syrlinks_node.add_child(assy_node)
syrlinks_node.add_child(action_node)
ccsds_node = CmdTreeNode("ccsds", "CCSDS Handler")
ccsds_node.add_child(action_node)
com_node.add_child(syrlinks_node)
com_node.add_child(create_syrlinks_node())
com_node.add_child(create_ccsds_node())
eps_node = create_eps_subsystem_node()
acu_node = CmdTreeNode("acu", "PCDU ACU component")
@ -128,7 +126,7 @@ class EiveHookObject(HookBase):
xiphos_wdt = CmdTreeNode("wdt", "Xiphos WDT")
core_ctrl = CmdTreeNode("core", "Core Controller")
obdh_node.add_child(xiphos_wdt)
obdh_node.add_child(core_ctrl)
obdh_node.add_child(create_core_node())
obdh_node.add_child(create_time_node())
obdh_node.add_child(create_persistent_tm_store_node())

View File

@ -8,6 +8,8 @@
import enum
import struct
from tmtccmd.config import CmdTreeNode
from eive_tmtc.config.definitions import CustomServiceList
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.tmtc import (
@ -51,11 +53,11 @@ class Submode(enum.IntEnum):
class OpCode:
ENABLE_WITH_LOW_DATARATE = ["enable_low_datarate"]
ENABLE_WITH_HIGH_DATARATE = ["enable_high_datarate"]
DISABLE = ["disable"]
ENABLE_ACTION = ["legacy_enable_tx"]
DISABLE_ACTION = ["legacy_disable_tx"]
ENABLE_WITH_LOW_DATARATE = "enable_low_datarate"
ENABLE_WITH_HIGH_DATARATE = "enable_high_datarate"
DISABLE = "disable"
ENABLE_ACTION = "legacy_enable_tx"
DISABLE_ACTION = "legacy_disable_tx"
class Info:
@ -116,6 +118,19 @@ def pack_ccsds_handler_command( # noqa C901
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
def create_ccsds_node() -> CmdTreeNode:
op_code_strs = [
getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__")
]
info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
combined_dict = dict(zip(op_code_strs, info_strs))
node = CmdTreeNode("ccsds", "CCSDS Handler", hide_children_for_print=True)
for op_code, info in combined_dict.items():
node.add_child(CmdTreeNode(op_code, info))
return node
@tmtc_definitions_provider
def add_ccsds_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()

View File

@ -9,6 +9,8 @@ import enum
import logging
import math
from tmtccmd.config import CmdTreeNode
from eive_tmtc.pus_tm.defs import PrintWrapper
from eive_tmtc.tmtc.com.defs import Mode as ComMode
from eive_tmtc.config.definitions import CustomServiceList
@ -57,6 +59,7 @@ class OpCode:
HK_TX_REGS = "hk_tx_regs"
TX_STATUS = "tx_status"
RX_STATUS = "rx_status"
SET_CW = "tx_cw"
class Info:
@ -102,6 +105,18 @@ class Datarate(enum.IntEnum):
HIGH_RATE_MODULATION_0QPSK = 1
def create_syrlinks_node() -> CmdTreeNode:
op_code_strs = [
getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__")
]
info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
combined_dict = dict(zip(op_code_strs, info_strs))
node = CmdTreeNode("syrlinks", "Syrlinks Device", hide_children_for_print=True)
for op_code, info in combined_dict.items():
node.add_child(CmdTreeNode(op_code, info))
return node
@tmtc_definitions_provider
def add_syrlinks_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()

View File

@ -9,7 +9,7 @@ from eive_tmtc.pus_tm.defs import PrintWrapper
from eive_tmtc.config.definitions import CustomServiceList
from spacepackets.ecss import PusTelecommand
from tmtccmd.config import TmtcDefinitionWrapper
from tmtccmd.config import CmdTreeNode, TmtcDefinitionWrapper
from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
@ -90,20 +90,20 @@ class OpCode:
RM_HELPER = "rm_helper"
MKDIR_HELPER = "mkdir_helper"
SET_PREF_SD = "set_pref_sd"
REBOOT_XSC = ["reboot_xsc"]
XSC_REBOOT_SELF = ["reboot_self"]
XSC_REBOOT_0_0 = ["reboot_00"]
XSC_REBOOT_0_1 = ["reboot_01"]
XSC_REBOOT_1_0 = ["reboot_10"]
XSC_REBOOT_1_1 = ["reboot_11"]
REBOOT_FULL = ["reboot_regular"]
GET_HK = ["get_hk"]
OBSW_UPDATE_FROM_SD_0 = ["obsw_update_sd0"]
OBSW_UPDATE_FROM_SD_1 = ["obsw_update_sd1"]
OBSW_UPDATE_FROM_TMP = ["obsw_update_tmp"]
SWITCH_TO_SD_0 = ["switch_to_sd_0"]
SWITCH_TO_SD_1 = ["switch_to_sd_1"]
SWITCH_TO_BOTH_SD_CARDS = ["switch_to_both_sd_cards"]
REBOOT_XSC = "reboot_xsc"
XSC_REBOOT_SELF = "reboot_self"
XSC_REBOOT_0_0 = "reboot_00"
XSC_REBOOT_0_1 = "reboot_01"
XSC_REBOOT_1_0 = "reboot_10"
XSC_REBOOT_1_1 = "reboot_11"
REBOOT_FULL = "reboot_regular"
GET_HK = "get_hk"
OBSW_UPDATE_FROM_SD_0 = "obsw_update_sd0"
OBSW_UPDATE_FROM_SD_1 = "obsw_update_sd1"
OBSW_UPDATE_FROM_TMP = "obsw_update_tmp"
SWITCH_TO_SD_0 = "switch_to_sd_0"
SWITCH_TO_SD_1 = "switch_to_sd_1"
SWITCH_TO_BOTH_SD_CARDS = "switch_to_both_sd_cards"
READ_REBOOT_MECHANISM_INFO = "rbh_info"
ENABLE_REBOOT_FILE_HANDLING = "rwd_on"
DISABLE_REBOOT_FILE_HANDLING = "rwd_off"
@ -127,6 +127,11 @@ class Info:
SET_PREF_SD = "Set preferred SD card"
REBOOT_XSC = "XSC reboot with prompt"
REBOOT_FULL = "Full regular reboot"
XSC_REBOOT_SELF = "Reboot Self"
XSC_REBOOT_0_0 = "Reboot to 0 0"
XSC_REBOOT_0_1 = "Reboot to 0 1"
XSC_REBOOT_1_0 = "Reboot to 1 0"
XSC_REBOOT_1_1 = "Reboot to 1 1"
OBSW_UPDATE_FROM_SD_0 = "Update OBSW from SD Card 0"
OBSW_UPDATE_FROM_SD_1 = "Update OBSW from SD Card 1"
OBSW_UPDATE_FROM_TMP = "Update OBSW from tmp folder"
@ -140,8 +145,17 @@ class Info:
MV_HELPER = "Filesystem Move Helper"
RM_HELPER = "Filesystem Removal Helper"
MKDIR_HELPER = "Filesystem Directory Creation Helper"
ENABLE_REBOOT_FILE_HANDLING = "Enable reboot file handling"
DISABLE_REBOOT_FILE_HANDLING = "Disable reboot file handling"
RESET_ALL_REBOOT_COUNTERS = "Reset all reboot counters"
RWD_RESET_REBOOT_COUNTER_00 = "Reset reboot counter 0 0"
RWD_RESET_REBOOT_COUNTER_01 = "Reset reboot counter 0 0"
RWD_RESET_REBOOT_COUNTER_10 = "Reset reboot counter 1 0"
RWD_RESET_REBOOT_COUNTER_11 = "Reset reboot counter 1 1"
RWD_SET_MAX_REBOOT_CNT = "rwd_max_cnt"
AUTO_SWITCH_ENABLE = "Enable Auto-Switch Feature with a specific target image"
AUTO_SWITCH_DISABLE = "Disable Auto-Switch Feature"
GET_HK = "get_hk"
class Chip(enum.IntEnum):
@ -162,6 +176,17 @@ class SystemctlCmd(enum.IntEnum):
RESTART = 2
def create_core_node() -> CmdTreeNode:
op_code_strs = [
getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__")
]
info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
combined_dict = dict(zip(op_code_strs, info_strs))
node = CmdTreeNode("core", "Core Controller", hide_children_for_print=True)
for op_code, info in combined_dict.items():
node.add_child(CmdTreeNode(op_code, info))
return node
@tmtc_definitions_provider
def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
@ -260,7 +285,7 @@ def pack_core_commands( # noqa C901
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_BOOT_COUNTS)
)
elif cmd_str in OpCode.REBOOT_XSC:
elif cmd_str == OpCode.REBOOT_XSC:
reboot_self, chip_select, copy_select = determine_reboot_params()
add_xsc_reboot_cmd(
q=q,
@ -268,14 +293,14 @@ def pack_core_commands( # noqa C901
chip=chip_select,
copy=copy_select,
)
elif cmd_str in OpCode.REBOOT_FULL:
elif cmd_str == OpCode.REBOOT_FULL:
q.add_log_cmd(f"Core Command: {Info.REBOOT_FULL}")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT
)
)
elif cmd_str in OpCode.XSC_REBOOT_SELF:
elif cmd_str == OpCode.XSC_REBOOT_SELF:
add_xsc_reboot_cmd(q=q, reboot_self=True)
elif cmd_str == OpCode.SYSTEMCTL_CMD_EXECUTOR:
print("systemctl command types: ")
@ -304,22 +329,22 @@ def pack_core_commands( # noqa C901
user_data=custom_cmd.encode(),
)
)
elif cmd_str in OpCode.XSC_REBOOT_0_0:
elif cmd_str == OpCode.XSC_REBOOT_0_0:
add_xsc_reboot_cmd(
q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM
)
elif cmd_str in OpCode.XSC_REBOOT_0_1:
elif cmd_str == OpCode.XSC_REBOOT_0_1:
add_xsc_reboot_cmd(
q=q,
reboot_self=False,
chip=Chip.CHIP_0,
copy=Copy.COPY_1_GOLD,
)
elif cmd_str in OpCode.XSC_REBOOT_1_0:
elif cmd_str == OpCode.XSC_REBOOT_1_0:
add_xsc_reboot_cmd(
q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_0_NOM
)
elif cmd_str in OpCode.XSC_REBOOT_1_1:
elif cmd_str == OpCode.XSC_REBOOT_1_1:
add_xsc_reboot_cmd(
q=q,
reboot_self=False,
@ -381,16 +406,16 @@ def pack_core_commands( # noqa C901
user_data=bytes([max_count]),
)
)
elif cmd_str in OpCode.OBSW_UPDATE_FROM_SD_0:
elif cmd_str == OpCode.OBSW_UPDATE_FROM_SD_0:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_0)
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_0))
elif cmd_str in OpCode.OBSW_UPDATE_FROM_SD_1:
elif cmd_str == OpCode.OBSW_UPDATE_FROM_SD_1:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_1)
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_1))
elif cmd_str in OpCode.OBSW_UPDATE_FROM_TMP:
elif cmd_str == OpCode.OBSW_UPDATE_FROM_TMP:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_TMP)
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_TMP))
elif cmd_str in OpCode.AUTO_SWITCH_ENABLE:
elif cmd_str == OpCode.AUTO_SWITCH_ENABLE:
q.add_log_cmd(Info.AUTO_SWITCH_ENABLE)
chip, copy = determine_chip_and_copy()
user_data = bytes([chip, copy])
@ -399,26 +424,26 @@ def pack_core_commands( # noqa C901
CORE_CONTROLLER_ID, ActionId.AUTO_SWITCH_ENABLE, user_data
)
)
elif cmd_str in OpCode.AUTO_SWITCH_DISABLE:
elif cmd_str == OpCode.AUTO_SWITCH_DISABLE:
q.add_log_cmd(Info.AUTO_SWITCH_DISABLE)
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.AUTO_SWITCH_DISABLE)
)
elif cmd_str in OpCode.SWITCH_TO_SD_0:
elif cmd_str == OpCode.SWITCH_TO_SD_0:
q.add_log_cmd(Info.SWITCH_TO_SD_0)
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_0
)
)
elif cmd_str in OpCode.SWITCH_TO_SD_1:
elif cmd_str == OpCode.SWITCH_TO_SD_1:
q.add_log_cmd(Info.SWITCH_TO_SD_1)
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_1
)
)
elif cmd_str in OpCode.SWITCH_TO_BOTH_SD_CARDS:
elif cmd_str == OpCode.SWITCH_TO_BOTH_SD_CARDS:
while True:
active_sd_card = int(input("Please specify active SD card [0/1]: "))
if active_sd_card not in [0, 1]:
@ -432,11 +457,11 @@ def pack_core_commands( # noqa C901
user_data=bytes([active_sd_card]),
)
)
elif cmd_str in OpCode.GET_HK:
elif cmd_str == OpCode.GET_HK:
q.add_log_cmd("Requesting housekeeping set")
sid = make_sid(object_id=CORE_CONTROLLER_ID, set_id=SetId.HK)
q.add_pus_tc(generate_one_hk_command(sid))
elif cmd_str in OpCode.SET_PREF_SD:
elif cmd_str == OpCode.SET_PREF_SD:
q.add_log_cmd("Set preferred SD card")
pref_sd = int(
input("Specify which SD card to set as the preferred one (0/1): ")

View File

@ -151,12 +151,13 @@ def create_ploc_mpsoc_node() -> CmdTreeNode:
]
info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
combined_dict = dict(zip(op_code_strs, info_strs))
node = CmdTreeNode("ploc_mpsoc", "PLOC MPSoC")
node = CmdTreeNode("ploc_mpsoc", "PLOC MPSoC", hide_children_for_print=True)
for op_code, info in combined_dict.items():
node.add_child(CmdTreeNode(op_code, info))
return node
# Legacy command definitions.
@tmtc_definitions_provider
def add_ploc_mpsoc_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()

View File

@ -178,7 +178,7 @@ def create_ploc_supv_node() -> CmdTreeNode:
]
info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
combined_dict = dict(zip(op_code_strs, info_strs))
node = CmdTreeNode("ploc_supv", "PLOC Supervisor")
node = CmdTreeNode("ploc_supv", "PLOC Supervisor", hide_children_for_print=True)
for op_code, info in combined_dict.items():
node.add_child(CmdTreeNode(op_code, info))
return node

View File

@ -51,7 +51,7 @@ class BpxOpCode:
def create_bpx_batt_node() -> CmdTreeNode:
node = CmdTreeNode("bat", "BPX battery device")
node = CmdTreeNode("bat", "BPX battery device", hide_children_for_print=True)
node.add_child(CmdTreeNode(BpxOpCode.ON, "ON command"))
node.add_child(CmdTreeNode(BpxOpCode.OFF, "OFF command"))
node.add_child(CmdTreeNode(BpxOpCode.HK, "HK command"))

View File

@ -77,7 +77,7 @@ def create_pwr_ctrl_node() -> CmdTreeNode:
]
info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
combined_dict = dict(zip(op_code_strs, info_strs))
node = CmdTreeNode("pwr_ctrl", "Power Controller")
node = CmdTreeNode("pwr_ctrl", "Power Controller", hide_children_for_print=True)
for op_code, info in combined_dict.items():
node.add_child(CmdTreeNode(op_code, info))
return node