diff --git a/eive_tmtc/config/hook.py b/eive_tmtc/config/hook.py index 8088464..96b0957 100644 --- a/eive_tmtc/config/hook.py +++ b/eive_tmtc/config/hook.py @@ -1,6 +1,6 @@ from typing import Optional -from prompt_toolkit.history import FileHistory +from prompt_toolkit.history import FileHistory from tmtccmd import CcsdsTmtcBackend, HookBase from tmtccmd.com import ComInterface from tmtccmd.config import CmdTreeNode @@ -9,11 +9,14 @@ from tmtccmd.util import ObjectIdDictT, RetvalDictT from eive_tmtc.config.definitions import SPACE_PACKET_IDS from eive_tmtc.config.retvals import get_retval_dict -from eive_tmtc.tmtc.core import create_core_node +from eive_tmtc.tmtc.acs.acs_ctrl import create_acs_ctrl_node +from eive_tmtc.tmtc.acs.gps import create_gnss_node +from eive_tmtc.tmtc.acs.imtq import create_mgt_node from eive_tmtc.tmtc.acs.subsystem import create_acs_subsystem_node from eive_tmtc.tmtc.com.ccsds_handler import create_ccsds_node from eive_tmtc.tmtc.com.subsystem import create_com_subsystem_node from eive_tmtc.tmtc.com.syrlinks_handler import create_syrlinks_node +from eive_tmtc.tmtc.core import create_core_node from eive_tmtc.tmtc.health import create_global_health_node from eive_tmtc.tmtc.payload.ploc_mpsoc import create_ploc_mpsoc_node from eive_tmtc.tmtc.payload.ploc_supervisor import create_ploc_supv_node @@ -23,6 +26,8 @@ from eive_tmtc.tmtc.power.bpx_batt import create_bpx_batt_node from eive_tmtc.tmtc.power.pwr_ctrl import create_pwr_ctrl_node from eive_tmtc.tmtc.power.subsystem import create_eps_subsystem_node from eive_tmtc.tmtc.system import create_system_node +from eive_tmtc.tmtc.tcs.ctrl import create_tcs_ctrl_node +from eive_tmtc.tmtc.tcs.heater import create_heater_node from eive_tmtc.tmtc.test import create_test_node from eive_tmtc.tmtc.time import create_time_node from eive_tmtc.tmtc.tm_store import create_persistent_tm_store_node @@ -56,10 +61,7 @@ class EiveHookObject(HookBase): gyro_devs = CmdTreeNode("gyro_devs", "Gyro Devices") acs_brd_assy_node.add_child(mgm_devs) acs_brd_assy_node.add_child(gyro_devs) - acs_ctrl = CmdTreeNode("acs_ctrl", "ACS Controller") - acs_ctrl.add_child(mode_node) - acs_ctrl.add_child(action_node) - acs_ctrl.add_child(param_node) + acs_ctrl = create_acs_ctrl_node() rws = CmdTreeNode("rws", "Reaction Wheel Devices") rw_assy = CmdTreeNode("rw_assy", "Reaction Wheel Assembly") rw_1 = CmdTreeNode("rw_1", "Reaction Wheel 1") @@ -71,29 +73,26 @@ class EiveHookObject(HookBase): rws.add_child(rw_2) rws.add_child(rw_3) rws.add_child(rw_4) - mgt = CmdTreeNode("mgt", "ISIS MGT") - mgt.add_child(dev_node) - mgt.add_child(assy_node) star_tracker = CmdTreeNode("str", "Star Tracker") star_tracker_img_helper = CmdTreeNode( "str_img_helper", "Star Tracker Image Helper" ) star_tracker.add_child(star_tracker_img_helper) - gnss_devs = CmdTreeNode("gnss", "GNSS Devices") + gnss_devs = create_gnss_node() acs_node.add_child(acs_brd_assy_node) acs_node.add_child(acs_ctrl) acs_node.add_child(mode_node) acs_node.add_child(rws) - acs_node.add_child(mgt) + acs_node.add_child(create_mgt_node()) acs_node.add_child(star_tracker) acs_node.add_child(gnss_devs) tcs_node = CmdTreeNode("tcs", "TCS Subsystem") tmp_1075_node = CmdTreeNode("tmp1075_devs", "TMP1075 Devices") rtds_node = CmdTreeNode("rtd_devs", "RTD Devices") - heaters_node = CmdTreeNode("heaters", "Heater Devices") - tcs_ctrl = CmdTreeNode("tcs_ctrl", "TCS Controller") + heaters_node = create_heater_node() + tcs_ctrl = create_tcs_ctrl_node() tcs_node.add_child(rtds_node) tcs_node.add_child(tmp_1075_node) tcs_node.add_child(tcs_ctrl) diff --git a/eive_tmtc/tmtc/acs/acs_ctrl.py b/eive_tmtc/tmtc/acs/acs_ctrl.py index def521b..4a8df2d 100644 --- a/eive_tmtc/tmtc/acs/acs_ctrl.py +++ b/eive_tmtc/tmtc/acs/acs_ctrl.py @@ -217,7 +217,9 @@ def create_acs_ctrl_node() -> CmdTreeNode: ] info_strs = [getattr(Info, key) for key in dir(OpCodes) if not key.startswith("__")] combined_dict = dict(zip(op_code_strs, info_strs)) - acs_ctrl = CmdTreeNode("acs_ctrl", "ACS Controller") + acs_ctrl = CmdTreeNode( + "acs_ctrl", "ACS Controller", hide_children_which_are_leaves=True + ) for op_code, info in combined_dict.items(): acs_ctrl.add_child(CmdTreeNode(op_code, info)) return acs_ctrl diff --git a/eive_tmtc/tmtc/acs/gps.py b/eive_tmtc/tmtc/acs/gps.py index 8662612..a99a7f0 100644 --- a/eive_tmtc/tmtc/acs/gps.py +++ b/eive_tmtc/tmtc/acs/gps.py @@ -5,7 +5,7 @@ import struct from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.pus_tm.defs import PrintWrapper -from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry +from tmtccmd.config import CmdTreeNode, TmtcDefinitionWrapper, OpCodeEntry from tmtccmd.config.tmtc import tmtc_definitions_provider from tmtccmd.pus.s200_fsfw_mode import create_mode_command, Mode from tmtccmd.tmtc import DefaultPusQueueHelper @@ -27,13 +27,13 @@ class GpsInfo: class OpCode: OFF = "off" ON = "on" - REQ_CORE_HK = ["core_hk_request"] - ENABLE_CORE_HK = ["core_hk_enable"] - DISABLE_CORE_HK = ["core_hk_disable"] - REQ_SKYVIEW_HK = ["skyview_hk_request"] - ENABLE_SKYVIEW_HK = ["skyview_hk_enable"] - DISABLE_SKYVIEW_HK = ["skyview_hk_disable"] - RESET_GNSS = ["reset"] + REQ_CORE_HK = "core_hk_request" + ENABLE_CORE_HK = "core_hk_enable" + DISABLE_CORE_HK = "core_hk_disable" + REQ_SKYVIEW_HK = "skyview_hk_request" + ENABLE_SKYVIEW_HK = "skyview_hk_enable" + DISABLE_SKYVIEW_HK = "skyview_hk_disable" + RESET_GNSS = "reset" class Info: @@ -53,6 +53,18 @@ class SetId(enum.IntEnum): SKYVIEW_HK = 1 +def create_gnss_node() -> CmdTreeNode: + op_code_strs = [ + getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__") + ] + info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")] + combined_dict = dict(zip(op_code_strs, info_strs)) + node = CmdTreeNode("gnss", "GNSS device", hide_children_for_print=True) + for op_code, info in combined_dict.items(): + node.add_child(CmdTreeNode(op_code, info)) + return node + + @tmtc_definitions_provider def add_gps_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() @@ -75,10 +87,10 @@ def add_gps_cmds(defs: TmtcDefinitionWrapper): def pack_gps_command( # noqa: C901 object_id: bytes, q: DefaultPusQueueHelper, cmd_str: str ): # noqa: C901: - if cmd_str in OpCode.RESET_GNSS: + if cmd_str == OpCode.RESET_GNSS: # TODO: This needs to be re-implemented _LOGGER.warning("Reset pin handling needs to be re-implemented") - if cmd_str in OpCode.ENABLE_CORE_HK: + if cmd_str == OpCode.ENABLE_CORE_HK: interval = float(input("Please specify interval in floating point seconds: ")) if interval <= 0: raise ValueError("invalid interval") @@ -90,21 +102,21 @@ def pack_gps_command( # noqa: C901 ) for cmd in cmds: q.add_pus_tc(cmd) - if cmd_str in OpCode.DISABLE_CORE_HK: + if cmd_str == OpCode.DISABLE_CORE_HK: q.add_log_cmd(f"gps: {Info.DISABLE_CORE_HK}") q.add_pus_tc( create_disable_periodic_hk_command_with_diag( diag=False, sid=make_sid(object_id=object_id, set_id=SetId.CORE_HK) ) ) - if cmd_str in OpCode.REQ_CORE_HK: + if cmd_str == OpCode.REQ_CORE_HK: q.add_log_cmd(f"GPS: {Info.REQ_CORE_HK}") q.add_pus_tc( create_request_one_hk_command( sid=make_sid(object_id=object_id, set_id=SetId.CORE_HK) ) ) - if cmd_str in OpCode.ENABLE_SKYVIEW_HK: + if cmd_str == OpCode.ENABLE_SKYVIEW_HK: interval = float(input("Please specify interval in floating point seconds: ")) if interval <= 0: raise ValueError("invalid interval") @@ -116,24 +128,24 @@ def pack_gps_command( # noqa: C901 ) for cmd in cmds: q.add_pus_tc(cmd) - if cmd_str in OpCode.DISABLE_SKYVIEW_HK: + if cmd_str == OpCode.DISABLE_SKYVIEW_HK: q.add_log_cmd(f"gps: {Info.DISABLE_SKYVIEW_HK}") q.add_pus_tc( create_disable_periodic_hk_command_with_diag( diag=False, sid=make_sid(object_id=object_id, set_id=SetId.SKYVIEW_HK) ) ) - if cmd_str in OpCode.REQ_SKYVIEW_HK: + if cmd_str == OpCode.REQ_SKYVIEW_HK: q.add_log_cmd(f"GPS: {Info.REQ_SKYVIEW_HK}") q.add_pus_tc( create_request_one_hk_command( sid=make_sid(object_id=object_id, set_id=SetId.SKYVIEW_HK) ) ) - if cmd_str in OpCode.ON: + if cmd_str == OpCode.ON: q.add_log_cmd(f"GPS: {Info.ON}") q.add_pus_tc(create_mode_command(object_id, Mode.ON, 0)) - if cmd_str in OpCode.OFF: + if cmd_str == OpCode.OFF: q.add_log_cmd(f"GPS: {Info.OFF}") q.add_pus_tc(create_mode_command(object_id, Mode.OFF, 0)) diff --git a/eive_tmtc/tmtc/acs/imtq.py b/eive_tmtc/tmtc/acs/imtq.py index 919d411..53cc9a2 100644 --- a/eive_tmtc/tmtc/acs/imtq.py +++ b/eive_tmtc/tmtc/acs/imtq.py @@ -10,6 +10,8 @@ import logging import struct from typing import List +from tmtccmd.config import CmdTreeNode + from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.pus_tm.defs import PrintWrapper from spacepackets.ecss.tc import PusTelecommand @@ -91,6 +93,74 @@ class ImtqActionId: read_self_test_results = bytearray([0x0, 0x0, 0x0, 0x0D]) +CTN = CmdTreeNode + + +def create_mgt_node() -> CmdTreeNode: + node = CmdTreeNode( + "mgt", "iMTQ MGT device node", hide_children_which_are_leaves=True + ) + node.add_child(CmdTreeNode(OpCode.OFF, "Mode OFF")) + node.add_child(CmdTreeNode(OpCode.ON, "Mode ON")) + node.add_child(CmdTreeNode(OpCode.NORMAL, "Mode Normal")) + node.add_child(CmdTreeNode(OpCode.OFF, "Mode OFF")) + node.add_child( + CmdTreeNode(OpCode.REQUEST_ENG_HK_NO_TORQUE, "Request Engineering HK One Shot") + ) + node.add_child( + CmdTreeNode( + OpCode.REQUEST_ENG_HK_WITH_TORQUE, + "Request Engineering HK One Shot during Torque", + ) + ) + node.add_child( + CmdTreeNode(OpCode.ENABLE_ENG_HK_NO_TORQUE, "Enable ENG HK not torque") + ) + node.add_child( + CmdTreeNode(OpCode.ENABLE_ENG_HK_WITH_TORQUE, "Enable ENG HK with torque") + ) + node.add_child( + CmdTreeNode(OpCode.DISABLE_ENG_HK_NO_TORQUE, "Disable ENG HK not torque") + ) + node.add_child( + CmdTreeNode(OpCode.DISABLE_ENG_HK_WITH_TORQUE, "Disable ENG HK with torque") + ) + node.add_child( + CmdTreeNode( + OpCode.REQUEST_MGM_RAW_NO_TORQUE, + "Request MGM Raw Without Torque HK One Shot", + ) + ) + node.add_child( + CmdTreeNode(OpCode.ENABLE_MGM_RAW_NO_TORQUE, "Enable MGM Raw Without Torque HK") + ) + node.add_child( + CmdTreeNode( + OpCode.DISABLE_MGM_RAW_NO_TORQUE, "Disable MGM Raw Without Torque HK" + ) + ) + node.add_child( + CmdTreeNode( + OpCode.REQUEST_MGM_RAW_WITH_TORQUE, + "Request MGM Raw With Torque HK One Shot", + ) + ) + node.add_child( + CmdTreeNode(OpCode.ENABLE_MGM_RAW_WITH_TORQUE, "Enable MGM Raw With Torque HK") + ) + node.add_child( + CTN(OpCode.DISABLE_MGM_RAW_WITH_TORQUE, "Disable MGM Raw With Torque HK") + ) + node.add_child(CTN(OpCode.POS_X_SELF_TEST, "IMTQ perform pos X self test")) + node.add_child(CTN(OpCode.NEG_X_SELF_TEST, "IMTQ perform neg X self test")) + node.add_child(CTN(OpCode.POS_Y_SELF_TEST, "IMTQ perform pos Y self test")) + node.add_child(CTN(OpCode.NEG_Y_SELF_TEST, "IMTQ perform neg Y self test")) + node.add_child(CTN(OpCode.POS_Z_SELF_TEST, "IMTQ perform pos Z self test")) + node.add_child(CTN(OpCode.NEG_Z_SELF_TEST, "IMTQ perform neg Z self test")) + node.add_child(CTN(OpCode.SET_DIPOLE, "IMTQ command dipole")) + return node + + @tmtc_definitions_provider def add_imtq_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() diff --git a/eive_tmtc/tmtc/tcs/ctrl.py b/eive_tmtc/tmtc/tcs/ctrl.py index 4f644f8..5df4252 100644 --- a/eive_tmtc/tmtc/tcs/ctrl.py +++ b/eive_tmtc/tmtc/tcs/ctrl.py @@ -3,6 +3,7 @@ from eive_tmtc.config.object_ids import TCS_CONTROLLER from eive_tmtc.tmtc.tcs import CtrlSetId from eive_tmtc.tmtc.tcs.brd_assy import pack_tcs_ass_cmds from tmtccmd.config.tmtc import ( + CmdTreeNode, tmtc_definitions_provider, TmtcDefinitionWrapper, OpCodeEntry, @@ -76,6 +77,24 @@ def pack_tcs_ctrl_commands(q: DefaultPusQueueHelper, cmd_str: str): pack_tcs_ass_cmds(q, cmd_str) +CTN = CmdTreeNode + + +def create_tcs_ctrl_node() -> CmdTreeNode: + node = CmdTreeNode( + "tcs_ctrl", "TCS Controller", hide_children_which_are_leaves=True + ) + node.add_child(CTN(CmdStr.ENABLE_TEMP_SET, CmdInfo.ENABLE_TEMP_SET)) + node.add_child( + CTN(CmdStr.REQUEST_PRIMARY_TEMP_SET, CmdInfo.REQUEST_PRIMARY_TEMP_SET) + ) + node.add_child(CTN(CmdStr.REQUEST_DEVICE_TEMP_SET, CmdInfo.REQUEST_DEVICE_TEMP_SET)) + node.add_child(CTN(CmdStr.REQUEST_DEVICE_SUS_SET, CmdInfo.REQUEST_DEVICE_SUS_SET)) + node.add_child(CTN(CmdStr.REQUEST_HEATER_INFO, CmdInfo.REQUEST_HEATER_INFO)) + node.add_child(CTN(CmdStr.REQUEST_TCS_CTRL_INFO, CmdInfo.REQUEST_TCS_CTRL_INFO)) + return node + + @tmtc_definitions_provider def add_tcs_ctrl_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() diff --git a/eive_tmtc/tmtc/tcs/heater.py b/eive_tmtc/tmtc/tcs/heater.py index 0ce09ae..99fe3eb 100644 --- a/eive_tmtc/tmtc/tcs/heater.py +++ b/eive_tmtc/tmtc/tcs/heater.py @@ -5,21 +5,21 @@ """ import enum +from spacepackets.ecss.tc import PusTelecommand +from tmtccmd.config import CmdTreeNode, OpCodeEntry, TmtcDefinitionWrapper +from tmtccmd.config.tmtc import tmtc_definitions_provider +from tmtccmd.pus.s8_fsfw_action import create_action_cmd +from tmtccmd.pus.s201_fsfw_health import ( + FsfwHealth, + Subservice, + pack_set_health_cmd_data, +) +from tmtccmd.tmtc import DefaultPusQueueHelper +from tmtccmd.util.obj_id import ObjectIdU32 + from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.object_ids import get_object_ids from eive_tmtc.tmtc.tcs.defs import Heater -from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry -from tmtccmd.config.tmtc import tmtc_definitions_provider -from tmtccmd.tmtc import DefaultPusQueueHelper -from tmtccmd.util.obj_id import ObjectIdU32 -from tmtccmd.pus.s201_fsfw_health import ( - pack_set_health_cmd_data, - FsfwHealth, - Subservice, -) -from tmtccmd.pus.s8_fsfw_action import create_action_cmd -from spacepackets.ecss.tc import PusTelecommand - HEATER_LOCATION = [ "PLOC Processing Board", @@ -34,10 +34,10 @@ HEATER_LOCATION = [ class OpCode: - HEATER_CMD = ["switch_cmd"] - HEATER_EXT_CTRL = ["set_ext_ctrl"] - HEATER_FAULTY_CMD = ["set_faulty"] - HEATER_HEALTHY_CMD = ["set_healthy"] + HEATER_CMD = "switch_cmd" + HEATER_EXT_CTRL = "set_ext_ctrl" + HEATER_FAULTY_CMD = "set_faulty" + HEATER_HEALTHY_CMD = "set_healthy" class Info: @@ -55,6 +55,18 @@ class ActionIds(enum.IntEnum): SWITCH_HEATER = 0 +CTN = CmdTreeNode + + +def create_heater_node() -> CmdTreeNode: + node = CmdTreeNode("heater", "Heater Device", hide_children_which_are_leaves=True) + node.add_child(CTN(OpCode.HEATER_CMD, Info.HEATER_CMD)) + node.add_child(CTN(OpCode.HEATER_HEALTHY_CMD, Info.HEATER_HEALTHY_CMD)) + node.add_child(CTN(OpCode.HEATER_EXT_CTRL, Info.HEATER_EXT_CTRL)) + node.add_child(CTN(OpCode.HEATER_FAULTY_CMD, Info.HEATER_FAULTY_CMD)) + return node + + @tmtc_definitions_provider def add_heater_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() @@ -70,7 +82,7 @@ def add_heater_cmds(defs: TmtcDefinitionWrapper): def pack_heater_cmds(object_id: bytes, cmd_str: str, q: DefaultPusQueueHelper): - if cmd_str in OpCode.HEATER_CMD: + if cmd_str == OpCode.HEATER_CMD: q.add_log_cmd("Heater Switching") heater_number = prompt_heater() while True: @@ -90,7 +102,7 @@ def pack_heater_cmds(object_id: bytes, cmd_str: str, q: DefaultPusQueueHelper): debug_string = f"Switching heater {heater_number} {act_str}" q.add_log_cmd(debug_string) q.add_pus_tc(pack_switch_heater_command(object_id, heater_number, action)) - if cmd_str in OpCode.HEATER_EXT_CTRL: + if cmd_str == OpCode.HEATER_EXT_CTRL: heater_number = prompt_heater() obj_id = heater_idx_to_obj(heater_number) health_cmd( @@ -100,7 +112,7 @@ def pack_heater_cmds(object_id: bytes, cmd_str: str, q: DefaultPusQueueHelper): health_str="External Control", heater_idx=heater_number, ) - if cmd_str in OpCode.HEATER_FAULTY_CMD: + if cmd_str == OpCode.HEATER_FAULTY_CMD: heater_number = prompt_heater() obj_id = heater_idx_to_obj(heater_number) health_cmd( @@ -110,7 +122,7 @@ def pack_heater_cmds(object_id: bytes, cmd_str: str, q: DefaultPusQueueHelper): health_str="Faulty", heater_idx=heater_number, ) - if cmd_str in OpCode.HEATER_HEALTHY_CMD: + if cmd_str == OpCode.HEATER_HEALTHY_CMD: heater_number = prompt_heater() obj_id = heater_idx_to_obj(heater_number) health_cmd(