From d640d547bd439c6059dbaee2bb414c20b844c429 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 22 Nov 2023 13:51:33 +0100 Subject: [PATCH] OOF3 --- eive_tmtc/config/hook.py | 47 +++++++----- eive_tmtc/tmtc/__init__.py | 3 - eive_tmtc/tmtc/acs/subsystem.py | 57 +++++++-------- eive_tmtc/tmtc/com/__init__.py | 1 - eive_tmtc/tmtc/com/subsystem.py | 90 ++++++++++------------- eive_tmtc/tmtc/health.py | 44 ++++++----- eive_tmtc/tmtc/payload/ploc_mpsoc.py | 10 ++- eive_tmtc/tmtc/payload/subsystem.py | 27 +++---- eive_tmtc/tmtc/power/__init__.py | 2 - eive_tmtc/tmtc/power/bpx_batt.py | 90 +++++++++++------------ eive_tmtc/tmtc/power/pwr_ctrl.py | 105 ++++++++++++--------------- eive_tmtc/tmtc/power/subsystem.py | 40 +++++----- eive_tmtc/tmtc/system.py | 39 ++++++---- eive_tmtc/tmtc/test.py | 45 ++++-------- eive_tmtc/tmtc/tm_store.py | 47 ++++++------ 15 files changed, 300 insertions(+), 347 deletions(-) diff --git a/eive_tmtc/config/hook.py b/eive_tmtc/config/hook.py index 444d032..3f32f34 100644 --- a/eive_tmtc/config/hook.py +++ b/eive_tmtc/config/hook.py @@ -1,17 +1,27 @@ from typing import Optional -from eive_tmtc.config.definitions import SPACE_PACKET_IDS -from eive_tmtc.tmtc.payload.ploc_supervisor import create_ploc_supv_node -from eive_tmtc.tmtc.payload.scex import create_scex_node -from eive_tmtc.tmtc.time import create_time_node -from eive_tmtc.tmtc.payload.ploc_mpsoc import create_ploc_mpsoc_node -from tmtccmd import HookBase, CcsdsTmtcBackend +from tmtccmd import CcsdsTmtcBackend, HookBase from tmtccmd.com import ComInterface from tmtccmd.config import CmdTreeNode - -from eive_tmtc.config.retvals import get_retval_dict from tmtccmd.util import ObjectIdDictT, RetvalDictT +from eive_tmtc.config.definitions import SPACE_PACKET_IDS +from eive_tmtc.config.retvals import get_retval_dict +from eive_tmtc.tmtc.com.subsystem import create_com_subsystem_node +from eive_tmtc.tmtc.health import create_global_health_node +from eive_tmtc.tmtc.acs.subsystem import create_acs_subsystem_node +from eive_tmtc.tmtc.payload.ploc_mpsoc import create_ploc_mpsoc_node +from eive_tmtc.tmtc.payload.ploc_supervisor import create_ploc_supv_node +from eive_tmtc.tmtc.payload.scex import create_scex_node +from eive_tmtc.tmtc.payload.subsystem import create_payload_subsystem_node +from eive_tmtc.tmtc.power.bpx_batt import create_bpx_batt_node +from eive_tmtc.tmtc.power.pwr_ctrl import create_pwr_ctrl_node +from eive_tmtc.tmtc.power.subsystem import create_eps_subsystem_node +from eive_tmtc.tmtc.system import create_system_node +from eive_tmtc.tmtc.test import create_test_node +from eive_tmtc.tmtc.time import create_time_node +from eive_tmtc.tmtc.tm_store import create_persistent_tm_store_node + class EiveHookObject(HookBase): def __init__(self, json_cfg_path: str): @@ -34,9 +44,7 @@ class EiveHookObject(HookBase): assy_node = CmdTreeNode("assy", "Assembly Commands") assy_node.add_child(mode_node) - system_node = CmdTreeNode("sys", "EIVE System") - - acs_node = CmdTreeNode("acs", "ACS Subsystem") + acs_node = create_acs_subsystem_node() acs_brd_assy_node = CmdTreeNode("acs_brd_assy", "ACS Board Assembly") acs_brd_assy_node.add_child(mode_node) mgm_devs = CmdTreeNode("mgm_devs", "MGM Devices") @@ -88,7 +96,7 @@ class EiveHookObject(HookBase): tcs_brd_assy = CmdTreeNode("tcs_brd_assy", "TCS Board Assembly") tcs_node.add_child(tcs_brd_assy) - com_node = CmdTreeNode("com", "COM Subsystem") + com_node = create_com_subsystem_node() syrlinks_node = CmdTreeNode("syrlinks", "Syrlinks") syrlinks_node.add_child(dev_node) syrlinks_node.add_child(assy_node) @@ -97,19 +105,19 @@ class EiveHookObject(HookBase): ccsds_node.add_child(action_node) com_node.add_child(syrlinks_node) - eps_node = CmdTreeNode("eps", "EPS Subsystem") + eps_node = create_eps_subsystem_node() acu_node = CmdTreeNode("acu", "PCDU ACU component") pdu_1_node = CmdTreeNode("pdu1", "PCDU PDU 1 component") pdu_2_node = CmdTreeNode("pdu2", "PCDU PDU 2 component") p60_dock_node = CmdTreeNode("p60_dock", "PCDU P60 Dock component") - bat_node = CmdTreeNode("bat", "Battery Component") + eps_node.add_child(create_pwr_ctrl_node()) eps_node.add_child(acu_node) eps_node.add_child(pdu_1_node) eps_node.add_child(pdu_2_node) eps_node.add_child(p60_dock_node) - eps_node.add_child(bat_node) + eps_node.add_child(create_bpx_batt_node()) - payload_node = CmdTreeNode("payload", "Payload Subsystem") + payload_node = create_payload_subsystem_node() pl_pcdu = CmdTreeNode("pl_pcdu", "Payload PCDU") payload_node.add_child(pl_pcdu) payload_node.add_child(create_scex_node()) @@ -122,8 +130,11 @@ class EiveHookObject(HookBase): obdh_node.add_child(xiphos_wdt) obdh_node.add_child(core_ctrl) obdh_node.add_child(create_time_node()) + obdh_node.add_child(create_persistent_tm_store_node()) - root_node.add_child(system_node) + root_node.add_child(create_test_node()) + root_node.add_child(create_system_node()) + root_node.add_child(create_global_health_node()) root_node.add_child(acs_node) root_node.add_child(tcs_node) root_node.add_child(com_node) @@ -134,8 +145,8 @@ class EiveHookObject(HookBase): def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: from tmtccmd.config.com import ( - create_com_interface_default, create_com_interface_cfg_default, + create_com_interface_default, ) assert self.cfg_path is not None diff --git a/eive_tmtc/tmtc/__init__.py b/eive_tmtc/tmtc/__init__.py index 2c6f4c8..84ed9c4 100644 --- a/eive_tmtc/tmtc/__init__.py +++ b/eive_tmtc/tmtc/__init__.py @@ -1,7 +1,4 @@ -from .payload.subsystem import add_payload_subsystem_cmds from .solar_array_deployment import add_sa_depl_cmds -from .test import add_test_defs -from .health import add_health_cmd_defs from .system import add_system_cmd_defs from .tm_store import add_persistent_tm_store_cmd_defs from .tcs import add_tmp_sens_cmds diff --git a/eive_tmtc/tmtc/acs/subsystem.py b/eive_tmtc/tmtc/acs/subsystem.py index 657c8e7..f250344 100644 --- a/eive_tmtc/tmtc/acs/subsystem.py +++ b/eive_tmtc/tmtc/acs/subsystem.py @@ -1,22 +1,19 @@ import enum -from typing import Tuple, Dict +from typing import Dict, Tuple -from eive_tmtc.tmtc.acs.defs import AcsMode, SafeSubmode from spacepackets.ecss import PusTelecommand -from eive_tmtc.tmtc.common import pack_mode_cmd_with_info -from eive_tmtc.config.object_ids import ACS_SUBSYSTEM_ID -from eive_tmtc.config.definitions import CustomServiceList from tmtccmd.config.tmtc import ( - tmtc_definitions_provider, - TmtcDefinitionWrapper, - OpCodeEntry, + CmdTreeNode, ) from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices -from tmtccmd.tmtc import service_provider -from tmtccmd.tmtc.decorator import ServiceProviderParams +from tmtccmd.tmtc import DefaultPusQueueHelper + +from eive_tmtc.config.object_ids import ACS_SUBSYSTEM_ID +from eive_tmtc.tmtc.acs.defs import AcsMode, SafeSubmode +from eive_tmtc.tmtc.common import pack_mode_cmd_with_info -class OpCode(str, enum.Enum): +class CmdStr(str, enum.Enum): OFF = "off" SAFE = "safe" DETUMBLE = "detumble" @@ -41,23 +38,20 @@ class Info(str, enum.Enum): HANDLER_LIST: Dict[str, Tuple[int, int, str]] = { - OpCode.OFF: (AcsMode.OFF, 0, Info.OFF), - OpCode.SAFE: (AcsMode.SAFE, SafeSubmode.DEFAULT, Info.SAFE), - OpCode.DETUMBLE: (AcsMode.SAFE, SafeSubmode.DETUMBLE, Info.DETUMBLE), - OpCode.IDLE: (AcsMode.IDLE, 0, Info.IDLE), - OpCode.PTG_TARGET: (AcsMode.PTG_TARGET, 0, Info.PTG_TARGET), - OpCode.PTG_TARGET_GS: (AcsMode.PTG_TARGET_GS, 0, Info.PTG_TARGET_GS), - OpCode.PTG_TARGET_NADIR: (AcsMode.PTG_NADIR, 0, Info.PTG_TARGET_NADIR), - OpCode.PTG_TARGET_INERTIAL: (AcsMode.PTG_INERTIAL, 0, Info.PTG_TARGET_INERTIAL), + CmdStr.OFF: (AcsMode.OFF, 0, Info.OFF), + CmdStr.SAFE: (AcsMode.SAFE, SafeSubmode.DEFAULT, Info.SAFE), + CmdStr.DETUMBLE: (AcsMode.SAFE, SafeSubmode.DETUMBLE, Info.DETUMBLE), + CmdStr.IDLE: (AcsMode.IDLE, 0, Info.IDLE), + CmdStr.PTG_TARGET: (AcsMode.PTG_TARGET, 0, Info.PTG_TARGET), + CmdStr.PTG_TARGET_GS: (AcsMode.PTG_TARGET_GS, 0, Info.PTG_TARGET_GS), + CmdStr.PTG_TARGET_NADIR: (AcsMode.PTG_NADIR, 0, Info.PTG_TARGET_NADIR), + CmdStr.PTG_TARGET_INERTIAL: (AcsMode.PTG_INERTIAL, 0, Info.PTG_TARGET_INERTIAL), } -@service_provider(CustomServiceList.ACS_SS.value) -def build_acs_subsystem_cmd(p: ServiceProviderParams): - op_code = p.op_code - q = p.queue_helper +def build_acs_subsystem_cmd(q: DefaultPusQueueHelper, cmd_path: str): info_prefix = "ACS Subsystem" - if op_code in OpCode.REPORT_ALL_MODES: + if cmd_path in CmdStr.REPORT_ALL_MODES: q.add_log_cmd(f"{info_prefix}: {Info.REPORT_ALL_MODES}") q.add_pus_tc( PusTelecommand( @@ -66,7 +60,7 @@ def build_acs_subsystem_cmd(p: ServiceProviderParams): app_data=ACS_SUBSYSTEM_ID, ) ) - mode_info_tup = HANDLER_LIST.get(op_code) + mode_info_tup = HANDLER_LIST.get(cmd_path) if mode_info_tup is None: return pack_mode_cmd_with_info( @@ -78,10 +72,9 @@ def build_acs_subsystem_cmd(p: ServiceProviderParams): ) -@tmtc_definitions_provider -def add_acs_subsystem_cmds(defs: TmtcDefinitionWrapper): - oce = OpCodeEntry() - for op_code, (_, _, info) in HANDLER_LIST.items(): - oce.add(op_code, info) - oce.add(OpCode.REPORT_ALL_MODES, Info.REPORT_ALL_MODES) - defs.add_service(CustomServiceList.ACS_SS, "ACS Subsystem", oce) +def create_acs_subsystem_node() -> CmdTreeNode: + node = CmdTreeNode("acs", "ACS Subsystem") + for cmd_str, (_, _, info) in HANDLER_LIST.items(): + node.add_child(CmdTreeNode(cmd_str, info)) + node.add_child(CmdTreeNode(CmdStr.REPORT_ALL_MODES, Info.REPORT_ALL_MODES)) + return node diff --git a/eive_tmtc/tmtc/com/__init__.py b/eive_tmtc/tmtc/com/__init__.py index b70bd5c..e69de29 100644 --- a/eive_tmtc/tmtc/com/__init__.py +++ b/eive_tmtc/tmtc/com/__init__.py @@ -1 +0,0 @@ -from .subsystem import add_com_subsystem_cmds diff --git a/eive_tmtc/tmtc/com/subsystem.py b/eive_tmtc/tmtc/com/subsystem.py index 900981f..ba8c098 100644 --- a/eive_tmtc/tmtc/com/subsystem.py +++ b/eive_tmtc/tmtc/com/subsystem.py @@ -1,30 +1,25 @@ import enum -from eive_tmtc.config.definitions import CustomServiceList -from eive_tmtc.config.object_ids import COM_SUBSYSTEM_ID -from eive_tmtc.tmtc.com.syrlinks_handler import Datarate -from tmtccmd.pus.s20_fsfw_param_defs import create_scalar_u8_parameter - -from .defs import Mode as ComMode - from tmtccmd.config.tmtc import ( - tmtc_definitions_provider, - TmtcDefinitionWrapper, - OpCodeEntry, -) -from tmtccmd.tmtc import service_provider -from tmtccmd.tmtc.decorator import ServiceProviderParams -from tmtccmd.pus.s200_fsfw_mode import ( - create_mode_command, - create_read_mode_command, - create_announce_mode_command, - create_announce_mode_recursive_command, + CmdTreeNode, ) from tmtccmd.pus.s20_fsfw_param import ( create_load_param_cmd, + create_scalar_u32_parameter, ) +from tmtccmd.pus.s20_fsfw_param_defs import create_scalar_u8_parameter +from tmtccmd.pus.s200_fsfw_mode import ( + create_announce_mode_command, + create_announce_mode_recursive_command, + create_mode_command, + create_read_mode_command, +) +from tmtccmd.tmtc import DefaultPusQueueHelper -from tmtccmd.pus.s20_fsfw_param import create_scalar_u32_parameter +from eive_tmtc.config.object_ids import COM_SUBSYSTEM_ID +from eive_tmtc.tmtc.com.syrlinks_handler import Datarate + +from .defs import Mode as ComMode class ParameterId(enum.IntEnum): @@ -48,9 +43,9 @@ class OpCode: class Info: RX_ONLY = "Syrlinks RX Only" - TX_AND_RX_DEF_DATARATE = "Syrlinks with TX default datarate" - TX_AND_RX_LOW_DATARATE = "Syrlinks with TX low datarate (BPSK modulation)" - TX_AND_RX_HIGH_DATARATE = "Syrlinks with TX high datarate (0QPSK modulation)" + TX_AND_RX_DEF_RATE = "Syrlinks with TX default datarate" + TX_AND_RX_LOW_RATE = "Syrlinks with TX low datarate (BPSK modulation)" + TX_AND_RX_HIGH_RATE = "Syrlinks with TX high datarate (0QPSK modulation)" TX_AND_RX_CARRIER_WAVE = "Syrlinks with TX carrier wave" UPDATE_DEFAULT_DATARATE_LOW = "Configure default low datarate (BPSK modulation)" UPDATE_DEFAULT_DATARATE_HIGH = "Configure default high datarate (0QPSK modulation)" @@ -60,30 +55,27 @@ class Info: ANNOUNCE_MODE_RECURSIVE = "Announce mode recursively" -@service_provider(CustomServiceList.COM_SS) -def build_com_subsystem_cmd(p: ServiceProviderParams): # noqa C901 - q = p.queue_helper - o = p.op_code +def build_com_subsystem_cmd(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901 prefix = "COM Subsystem" - if o == OpCode.RX_ONLY: + if cmd_str == OpCode.RX_ONLY: q.add_log_cmd(Info.RX_ONLY) q.add_pus_tc(create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_ONLY, 0)) - elif o == OpCode.TX_AND_RX_DEF_RATE: + elif cmd_str == OpCode.TX_AND_RX_DEF_RATE: q.add_log_cmd(Info.TX_AND_RX_DEF_DATARATE) q.add_pus_tc( create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_DEF_DATARATE, 0) ) - elif o == OpCode.TX_AND_RX_LOW_RATE: + elif cmd_str == OpCode.TX_AND_RX_LOW_RATE: q.add_log_cmd(Info.TX_AND_RX_LOW_DATARATE) q.add_pus_tc( create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_LOW_DATARATE, 0) ) - elif o == OpCode.TX_AND_RX_HIGH_RATE: + elif cmd_str == OpCode.TX_AND_RX_HIGH_RATE: q.add_log_cmd(Info.TX_AND_RX_HIGH_DATARATE) q.add_pus_tc( create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_HIGH_DATARATE, 0) ) - if o == OpCode.UPDATE_DEFAULT_DATARATE_LOW: + if cmd_str == OpCode.UPDATE_DEFAULT_DATARATE_LOW: q.add_log_cmd(f"{prefix}: {Info.UPDATE_DEFAULT_DATARATE_LOW}") q.add_pus_tc( create_load_param_cmd( @@ -95,7 +87,7 @@ def build_com_subsystem_cmd(p: ServiceProviderParams): # noqa C901 ) ) ) - if o == OpCode.UPDATE_DEFAULT_DATARATE_HIGH: + if cmd_str == OpCode.UPDATE_DEFAULT_DATARATE_HIGH: q.add_log_cmd(f"{prefix}: {Info.UPDATE_DEFAULT_DATARATE_HIGH}") q.add_pus_tc( create_load_param_cmd( @@ -107,12 +99,12 @@ def build_com_subsystem_cmd(p: ServiceProviderParams): # noqa C901 ) ) ) - elif o == OpCode.TX_AND_RX_CARRIER_WAVE: + elif cmd_str == OpCode.TX_AND_RX_CARRIER_WAVE: q.add_log_cmd(Info.TX_AND_RX_CARRIER_WAVE) q.add_pus_tc( create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_CARRIER_WAVE, 0) ) - elif o == OpCode.CHANGE_TRANSMITTER_TIMEOUT: + elif cmd_str == OpCode.CHANGE_TRANSMITTER_TIMEOUT: timeout = int(input("Specify timeout to set [ms]: ")) q.add_log_cmd(Info.CHANGE_TRANSMITTER_TIMEOUT) q.add_pus_tc( @@ -125,28 +117,24 @@ def build_com_subsystem_cmd(p: ServiceProviderParams): # noqa C901 ) ) ) - elif o == OpCode.READ_MODE: + elif cmd_str == OpCode.READ_MODE: q.add_log_cmd(Info.READ_MODE) q.add_pus_tc(create_read_mode_command(COM_SUBSYSTEM_ID)) - elif o == OpCode.ANNOUNCE_MODE: + elif cmd_str == OpCode.ANNOUNCE_MODE: q.add_log_cmd(Info.ANNOUNCE_MODE) q.add_pus_tc(create_announce_mode_command(COM_SUBSYSTEM_ID)) - elif o == OpCode.ANNOUNCE_MODE_RECURSIVE: + elif cmd_str == OpCode.ANNOUNCE_MODE_RECURSIVE: q.add_log_cmd(Info.ANNOUNCE_MODE_RECURSIVE) q.add_pus_tc(create_announce_mode_recursive_command(COM_SUBSYSTEM_ID)) -@tmtc_definitions_provider -def add_com_subsystem_cmds(defs: TmtcDefinitionWrapper): - oce = OpCodeEntry() - oce.add(OpCode.RX_ONLY, Info.RX_ONLY) - oce.add(OpCode.TX_AND_RX_LOW_RATE, Info.TX_AND_RX_LOW_DATARATE) - oce.add(OpCode.TX_AND_RX_HIGH_RATE, Info.TX_AND_RX_HIGH_DATARATE) - oce.add(OpCode.TX_AND_RX_DEF_RATE, Info.TX_AND_RX_DEF_DATARATE) - oce.add(OpCode.UPDATE_DEFAULT_DATARATE_LOW, Info.UPDATE_DEFAULT_DATARATE_LOW) - oce.add(OpCode.UPDATE_DEFAULT_DATARATE_HIGH, Info.UPDATE_DEFAULT_DATARATE_HIGH) - oce.add(OpCode.CHANGE_TRANSMITTER_TIMEOUT, Info.CHANGE_TRANSMITTER_TIMEOUT) - oce.add(OpCode.READ_MODE, Info.READ_MODE) - oce.add(OpCode.ANNOUNCE_MODE, Info.ANNOUNCE_MODE) - oce.add(OpCode.ANNOUNCE_MODE_RECURSIVE, Info.ANNOUNCE_MODE_RECURSIVE) - defs.add_service(CustomServiceList.COM_SS, "COM Subsystem", oce) +def create_com_subsystem_node() -> CmdTreeNode: + op_code_strs = [ + getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__") + ] + info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")] + combined_dict = dict(zip(op_code_strs, info_strs)) + node = CmdTreeNode("com", "COM Subsystem") + for op_code, info in combined_dict.items(): + node.add_child(CmdTreeNode(op_code, info)) + return node diff --git a/eive_tmtc/tmtc/health.py b/eive_tmtc/tmtc/health.py index 70efb4c..4b551b2 100644 --- a/eive_tmtc/tmtc/health.py +++ b/eive_tmtc/tmtc/health.py @@ -1,14 +1,12 @@ -from eive_tmtc.config.definitions import CustomServiceList -from eive_tmtc.tmtc.obj_prompt import prompt_object from spacepackets.ecss import PusTelecommand from tmtccmd.config.tmtc import ( - tmtc_definitions_provider, - TmtcDefinitionWrapper, - OpCodeEntry, + CmdTreeNode, ) -from tmtccmd.tmtc import service_provider -from tmtccmd.pus.s201_fsfw_health import Subservice, FsfwHealth -from tmtccmd.tmtc.decorator import ServiceProviderParams +from tmtccmd.pus.s201_fsfw_health import FsfwHealth, Subservice +from tmtccmd.tmtc import DefaultPusQueueHelper + +from eive_tmtc.config.definitions import CustomServiceList +from eive_tmtc.tmtc.obj_prompt import prompt_object class OpCode: @@ -30,11 +28,8 @@ def prompt_health() -> FsfwHealth: return FsfwHealth(health_idx) -@service_provider(CustomServiceList.HEALTH) -def pack_test_command(p: ServiceProviderParams): - o = p.op_code - q = p.queue_helper - if o == OpCode.SET_HEALTH: +def pack_health_cmd(q: DefaultPusQueueHelper, cmd_str: str): + if cmd_str == OpCode.SET_HEALTH: app_data = bytearray(prompt_object()) health = prompt_health() app_data.append(health) @@ -44,7 +39,7 @@ def pack_test_command(p: ServiceProviderParams): service=201, subservice=Subservice.TC_SET_HEALTH, app_data=app_data ) ) - elif o == OpCode.ANNOUNCE_HEALTH: + elif cmd_str == OpCode.ANNOUNCE_HEALTH: app_data = bytearray(prompt_object()) q.add_log_cmd(Info.ANNOUNCE_HEALTH) q.add_pus_tc( @@ -52,19 +47,22 @@ def pack_test_command(p: ServiceProviderParams): service=201, subservice=Subservice.TC_ANNOUNCE_HEALTH, app_data=app_data ) ) - elif o == OpCode.ANNOUNCE_HEALTH_ALL: + elif cmd_str == OpCode.ANNOUNCE_HEALTH_ALL: q.add_log_cmd(Info.ANNOUNCE_HEALTH_ALL) q.add_pus_tc( PusTelecommand(service=201, subservice=Subservice.TC_ANNOUNCE_HEALTH_ALL) ) else: - raise ValueError(f"unknown op code {o} for service {CustomServiceList.HEALTH}") + raise ValueError( + f"unknown command string {cmd_str} for service {CustomServiceList.HEALTH}" + ) -@tmtc_definitions_provider -def add_health_cmd_defs(defs: TmtcDefinitionWrapper): - oce = OpCodeEntry() - oce.add(OpCode.ANNOUNCE_HEALTH_ALL, Info.ANNOUNCE_HEALTH_ALL) - oce.add(OpCode.ANNOUNCE_HEALTH, Info.ANNOUNCE_HEALTH) - oce.add(OpCode.SET_HEALTH, Info.SET_HEALTH) - defs.add_service(CustomServiceList.HEALTH, info="Health Service", op_code_entry=oce) +def create_global_health_node() -> CmdTreeNode: + health_node = CmdTreeNode("health", "Health Commands") + health_node.add_child( + CmdTreeNode(OpCode.ANNOUNCE_HEALTH_ALL, Info.ANNOUNCE_HEALTH_ALL) + ) + health_node.add_child(CmdTreeNode(OpCode.ANNOUNCE_HEALTH, Info.ANNOUNCE_HEALTH)) + health_node.add_child(CmdTreeNode(OpCode.SET_HEALTH, Info.SET_HEALTH)) + return health_node diff --git a/eive_tmtc/tmtc/payload/ploc_mpsoc.py b/eive_tmtc/tmtc/payload/ploc_mpsoc.py index 9953b9f..ddb434b 100644 --- a/eive_tmtc/tmtc/payload/ploc_mpsoc.py +++ b/eive_tmtc/tmtc/payload/ploc_mpsoc.py @@ -151,10 +151,10 @@ def create_ploc_mpsoc_node() -> CmdTreeNode: ] info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")] combined_dict = dict(zip(op_code_strs, info_strs)) - ploc_mpsoc = CmdTreeNode("ploc_mpsoc", "PLOC MPSoC") + node = CmdTreeNode("ploc_mpsoc", "PLOC MPSoC") for op_code, info in combined_dict.items(): - ploc_mpsoc.add_child(CmdTreeNode(op_code, info)) - return ploc_mpsoc + node.add_child(CmdTreeNode(op_code, info)) + return node @tmtc_definitions_provider @@ -679,7 +679,9 @@ def handle_mpsoc_data_reply(action_id: int, pw: PrintWrapper, custom_data: bytea current_idx = 0 dir_name_short = custom_data[current_idx : current_idx + 12].decode("utf-8") current_idx += 12 - num_elements = struct.unpack("!I", custom_data[current_idx : current_idx + 4])[0] + num_elements = struct.unpack("!I", custom_data[current_idx : current_idx + 4])[ + 0 + ] elem_names = [] elem_attrs = [] elem_sizes = [] diff --git a/eive_tmtc/tmtc/payload/subsystem.py b/eive_tmtc/tmtc/payload/subsystem.py index 69010d6..529865e 100644 --- a/eive_tmtc/tmtc/payload/subsystem.py +++ b/eive_tmtc/tmtc/payload/subsystem.py @@ -1,14 +1,11 @@ import enum from typing import Dict, Tuple -from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.object_ids import PL_SUBSYSTEM_ID from spacepackets.ecss import PusTelecommand from eive_tmtc.tmtc.common import pack_mode_cmd_with_info -from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry -from tmtccmd.config.tmtc import tmtc_definitions_provider -from tmtccmd.tmtc import service_provider -from tmtccmd.tmtc.decorator import ServiceProviderParams +from tmtccmd.config import CmdTreeNode +from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice @@ -36,12 +33,9 @@ HANDLER_LIST: Dict[str, Tuple[int, str]] = { } -@service_provider(CustomServiceList.PL_SS) -def build_acs_subsystem_cmd(p: ServiceProviderParams): - op_code = p.op_code - q = p.queue_helper +def build_acs_subsystem_cmd(q: DefaultPusQueueHelper, cmd_str: str): info_prefix = "ACS Subsystem" - if op_code in OpCode.REPORT_ALL_MODES: + if cmd_str == OpCode.REPORT_ALL_MODES: q.add_log_cmd(f"{info_prefix}: {Info.REPORT_ALL_MODES}") q.add_pus_tc( PusTelecommand( @@ -50,7 +44,7 @@ def build_acs_subsystem_cmd(p: ServiceProviderParams): app_data=PL_SUBSYSTEM_ID, ) ) - mode_info_tup = HANDLER_LIST.get(op_code) + mode_info_tup = HANDLER_LIST.get(cmd_str) if mode_info_tup is None: return pack_mode_cmd_with_info( @@ -62,9 +56,8 @@ def build_acs_subsystem_cmd(p: ServiceProviderParams): ) -@tmtc_definitions_provider -def add_payload_subsystem_cmds(defs: TmtcDefinitionWrapper): - oce = OpCodeEntry() - oce.add(OpCode.OFF, Info.OFF) - oce.add(OpCode.REPORT_ALL_MODES, Info.REPORT_ALL_MODES) - defs.add_service(CustomServiceList.PL_SS, "Payload Subsystem", oce) +def create_payload_subsystem_node() -> CmdTreeNode: + payload_node = CmdTreeNode("payload", "Payload Subsystem") + payload_node.add_child(CmdTreeNode(OpCode.OFF, Info.OFF)) + payload_node.add_child(CmdTreeNode(OpCode.REPORT_ALL_MODES, Info.REPORT_ALL_MODES)) + return payload_node diff --git a/eive_tmtc/tmtc/power/__init__.py b/eive_tmtc/tmtc/power/__init__.py index b1d859b..e69de29 100644 --- a/eive_tmtc/tmtc/power/__init__.py +++ b/eive_tmtc/tmtc/power/__init__.py @@ -1,2 +0,0 @@ -from .subsystem import add_eps_subsystem_cmds -from .pwr_ctrl import pwr_cmd_defs diff --git a/eive_tmtc/tmtc/power/bpx_batt.py b/eive_tmtc/tmtc/power/bpx_batt.py index 8fd387c..155cc10 100644 --- a/eive_tmtc/tmtc/power/bpx_batt.py +++ b/eive_tmtc/tmtc/power/bpx_batt.py @@ -1,23 +1,19 @@ import enum import struct -from eive_tmtc.pus_tm.defs import PrintWrapper from spacepackets.ecss import PusTelecommand - -from eive_tmtc.config.definitions import CustomServiceList -from eive_tmtc.config.object_ids import BPX_HANDLER_ID from tmtccmd.config.tmtc import ( - tmtc_definitions_provider, - TmtcDefinitionWrapper, - OpCodeEntry, + CmdTreeNode, ) -from tmtccmd.tmtc import service_provider -from tmtccmd.tmtc.decorator import ServiceProviderParams -from tmtccmd.pus.s8_fsfw_action import create_action_cmd -from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid -from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode -from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter +from tmtccmd.pus.s8_fsfw_action import create_action_cmd +from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data +from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices +from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid +from tmtccmd.tmtc import DefaultPusQueueHelper + +from eive_tmtc.config.object_ids import BPX_HANDLER_ID +from eive_tmtc.pus_tm.defs import PrintWrapper class BpxSetId(enum.IntEnum): @@ -54,38 +50,38 @@ class BpxOpCode: REBOOT = "reboot" -@tmtc_definitions_provider -def add_bpx_cmd_definitions(defs: TmtcDefinitionWrapper): - oce = OpCodeEntry() - oce.add(keys=BpxOpCode.ON, info="On command") - oce.add(keys=BpxOpCode.OFF, info="Off command") - oce.add(keys=BpxOpCode.HK, info="Request BPX HK") - oce.add(keys=BpxOpCode.RST_BOOT_CNT, info="Reset Boot Count") - oce.add(keys=BpxOpCode.RST_CFG, info="Reset Config to stored default settings") - oce.add(keys=BpxOpCode.SET_CFG, info="Set BPX configuration") - oce.add(keys=BpxOpCode.MAN_HEATER_ON, info="Manual heater on") - oce.add(keys=BpxOpCode.MAN_HEATER_OFF, info="Manual heater off") - oce.add(keys=BpxOpCode.REQUEST_CFG, info="Request Configuration Struct (Step 1)") - oce.add( - keys=BpxOpCode.REQUEST_CFG_HK, info="Request Configuration Struct HK (Step 2)" +def create_bpx_batt_node() -> CmdTreeNode: + node = CmdTreeNode("bat", "BPX battery device") + node.add_child(CmdTreeNode(BpxOpCode.ON, "ON command")) + node.add_child(CmdTreeNode(BpxOpCode.OFF, "OFF command")) + node.add_child(CmdTreeNode(BpxOpCode.HK, "HK command")) + node.add_child(CmdTreeNode(BpxOpCode.RST_BOOT_CNT, "Reset boot count")) + node.add_child( + CmdTreeNode(BpxOpCode.RST_CFG, "Reset Config to stored default settings") ) - oce.add(keys=BpxOpCode.REBOOT, info="Reboot Command") - defs.add_service( - name=CustomServiceList.BPX_BATTERY.value, - info="BPX Battery Handler", - op_code_entry=oce, + node.add_child(CmdTreeNode(BpxOpCode.SET_CFG, "Set BPX configuration")) + node.add_child(CmdTreeNode(BpxOpCode.MAN_HEATER_ON, "Manual heater on")) + node.add_child(CmdTreeNode(BpxOpCode.MAN_HEATER_OFF, "Manual heater off")) + node.add_child( + CmdTreeNode(BpxOpCode.REQUEST_CFG, "Request Configuration Struct (Step 1)") ) + node.add_child( + CmdTreeNode( + BpxOpCode.REQUEST_CFG_HK, "Request Configuration Struct HK (Step 2)" + ) + ) + node.add_child(CmdTreeNode(BpxOpCode.REBOOT, "Reboot Command")) + return node -@service_provider(CustomServiceList.BPX_BATTERY.value) -def pack_bpx_commands(p: ServiceProviderParams): # noqa C901: Complexity is okay here. - op_code = p.op_code - q = p.queue_helper - if op_code == BpxOpCode.HK: +def pack_bpx_commands( + q: DefaultPusQueueHelper, cmd_str: str +): # noqa C901: Complexity is okay here. + if cmd_str == BpxOpCode.HK: q.add_log_cmd("Requesting BPX battery HK set") sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetId.GET_HK_SET) q.add_pus_tc(generate_one_hk_command(sid=sid)) - if op_code == BpxOpCode.OFF: + if cmd_str == BpxOpCode.OFF: q.add_log_cmd("Off mode") mode_cmd = pack_mode_data(BPX_HANDLER_ID, Mode.OFF, 0) q.add_pus_tc( @@ -95,7 +91,7 @@ def pack_bpx_commands(p: ServiceProviderParams): # noqa C901: Complexity is oka app_data=mode_cmd, ) ) - if op_code == BpxOpCode.ON: + if cmd_str == BpxOpCode.ON: q.add_log_cmd("On mode") mode_cmd = pack_mode_data(BPX_HANDLER_ID, Mode.ON, 0) q.add_pus_tc( @@ -105,21 +101,21 @@ def pack_bpx_commands(p: ServiceProviderParams): # noqa C901: Complexity is oka app_data=mode_cmd, ) ) - if op_code == BpxOpCode.RST_BOOT_CNT: + if cmd_str == BpxOpCode.RST_BOOT_CNT: q.add_log_cmd("Resetting reboot counters") q.add_pus_tc( create_action_cmd( object_id=BPX_HANDLER_ID, action_id=BpxActionId.RESET_COUNTERS ) ) - if op_code == BpxOpCode.RST_CFG: + if cmd_str == BpxOpCode.RST_CFG: q.add_log_cmd("Reset BPX configuration") q.add_pus_tc( create_action_cmd( object_id=BPX_HANDLER_ID, action_id=BpxActionId.CONFIG_CMD ) ) - if op_code == BpxOpCode.SET_CFG: + if cmd_str == BpxOpCode.SET_CFG: q.add_log_cmd("Setting BPX configuration") user_data = bytearray() batt_mode = BpxHeaterModeSelect( @@ -137,21 +133,21 @@ def pack_bpx_commands(p: ServiceProviderParams): # noqa C901: Complexity is oka user_data=user_data, ) ) - if op_code == BpxOpCode.REQUEST_CFG: + if cmd_str == BpxOpCode.REQUEST_CFG: q.add_log_cmd("Requesting configuration struct") q.add_pus_tc( create_action_cmd(object_id=BPX_HANDLER_ID, action_id=BpxActionId.GET_CFG) ) - if op_code == BpxOpCode.REQUEST_CFG_HK: + if cmd_str == BpxOpCode.REQUEST_CFG_HK: q.add_log_cmd("Requesting configuration struct HK") sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetId.GET_CFG_SET) q.add_pus_tc(generate_one_hk_command(sid=sid)) - if op_code == BpxOpCode.REBOOT: + if cmd_str == BpxOpCode.REBOOT: q.add_log_cmd("Rebooting BPX battery") q.add_pus_tc( create_action_cmd(object_id=BPX_HANDLER_ID, action_id=BpxActionId.REBOOT) ) - if op_code == BpxOpCode.MAN_HEATER_ON: + if cmd_str == BpxOpCode.MAN_HEATER_ON: q.add_log_cmd("BPX manual heater on with seconds burntime") burn_time = int(input("BPX heater burn time in seconds [1-65535]: ")) if burn_time < 1 or burn_time > 65535: @@ -163,7 +159,7 @@ def pack_bpx_commands(p: ServiceProviderParams): # noqa C901: Complexity is oka user_data=struct.pack("!H", burn_time), ) ) - if op_code == BpxOpCode.MAN_HEATER_OFF: + if cmd_str == BpxOpCode.MAN_HEATER_OFF: q.add_log_cmd("BPX manual heater off") q.add_pus_tc( create_action_cmd( diff --git a/eive_tmtc/tmtc/power/pwr_ctrl.py b/eive_tmtc/tmtc/power/pwr_ctrl.py index ecab1dd..4488491 100644 --- a/eive_tmtc/tmtc/power/pwr_ctrl.py +++ b/eive_tmtc/tmtc/power/pwr_ctrl.py @@ -3,32 +3,26 @@ import enum import logging import struct -from eive_tmtc.config.definitions import CustomServiceList -from eive_tmtc.config.object_ids import PWR_CONTROLLER -from eive_tmtc.pus_tm.defs import PrintWrapper from tmtccmd.config.tmtc import ( - tmtc_definitions_provider, - TmtcDefinitionWrapper, - OpCodeEntry, -) -from tmtccmd.tmtc import service_provider -from tmtccmd.tmtc.queue import DefaultPusQueueHelper -from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_command -from tmtccmd.tmtc.decorator import ServiceProviderParams -from tmtccmd.pus.tc.s3_fsfw_hk import ( - generate_one_hk_command, - make_sid, - enable_periodic_hk_command_with_interval, - disable_periodic_hk_command, + CmdTreeNode, ) from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter - from tmtccmd.pus.s20_fsfw_param import create_load_param_cmd - from tmtccmd.pus.s20_fsfw_param_defs import ( - create_scalar_float_parameter, create_scalar_double_parameter, + create_scalar_float_parameter, ) +from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_command +from tmtccmd.pus.tc.s3_fsfw_hk import ( + disable_periodic_hk_command, + enable_periodic_hk_command_with_interval, + generate_one_hk_command, + make_sid, +) +from tmtccmd.tmtc.queue import DefaultPusQueueHelper + +from eive_tmtc.config.object_ids import PWR_CONTROLLER +from eive_tmtc.pus_tm.defs import PrintWrapper _LOGGER = logging.getLogger(__name__) @@ -51,17 +45,17 @@ class ParamId(enum.IntEnum): HIGHER_MODES_LIMIT = 6 -class OpCodes: - OFF = ["mode_off"] - ON = ["mode_on"] - NML = ["mode_normal"] - SET_PARAMETER = ["set_parameter"] - REQUEST_CORE_HK = ["core_hk"] - ENABLE_CORE_HK = ["core_enable_hk"] - DISABLE_CORE_HK = ["core_disable_hk"] - REQUEST_ENABLE_PL_HK = ["enable_pl_hk"] - ENABLE_ENABLE_PL_HK = ["enable_pl_enable_hk"] - DISABLE_ENABLE_PL_HK = ["enable_pl_disable_hk"] +class OpCode: + OFF = "off" + ON = "on" + NML = "normal" + SET_PARAMETER = "set_parameter" + REQUEST_CORE_HK = "core_hk" + ENABLE_CORE_HK = "core_enable_hk" + DISABLE_CORE_HK = "core_disable_hk" + REQUEST_ENABLE_PL_HK = "enable_pl_hk" + ENABLE_ENABLE_PL_HK = "enable_pl_enable_hk" + DISABLE_ENABLE_PL_HK = "enable_pl_disable_hk" class Info: @@ -77,46 +71,37 @@ class Info: DISABLE_ENABLE_PL_HK = "Disable Enable PL HK Data Generation" -@tmtc_definitions_provider -def pwr_cmd_defs(defs: TmtcDefinitionWrapper): - oce = OpCodeEntry() - oce.add(keys=OpCodes.OFF, info=Info.OFF) - oce.add(keys=OpCodes.ON, info=Info.ON) - oce.add(keys=OpCodes.NML, info=Info.NML) - oce.add(keys=OpCodes.SET_PARAMETER, info=Info.SET_PARAMETER) - oce.add(keys=OpCodes.REQUEST_CORE_HK, info=Info.REQUEST_CORE_HK) - oce.add(keys=OpCodes.ENABLE_CORE_HK, info=Info.ENABLE_CORE_HK) - oce.add(keys=OpCodes.DISABLE_CORE_HK, info=Info.DISABLE_CORE_HK) - oce.add(keys=OpCodes.REQUEST_ENABLE_PL_HK, info=Info.REQUEST_ENABLE_PL_HK) - oce.add(keys=OpCodes.ENABLE_ENABLE_PL_HK, info=Info.ENABLE_ENABLE_PL_HK) - oce.add(keys=OpCodes.DISABLE_ENABLE_PL_HK, info=Info.DISABLE_ENABLE_PL_HK) - defs.add_service( - name=CustomServiceList.PWR_CTRL.value, info="PWR Controller", op_code_entry=oce - ) +def create_pwr_ctrl_node() -> CmdTreeNode: + op_code_strs = [ + getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__") + ] + info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")] + combined_dict = dict(zip(op_code_strs, info_strs)) + node = CmdTreeNode("pwr_ctrl", "Power Controller") + for op_code, info in combined_dict.items(): + node.add_child(CmdTreeNode(op_code, info)) + return node -@service_provider(CustomServiceList.PWR_CTRL.value) -def pack_acs_ctrl_command(p: ServiceProviderParams): - op_code = p.op_code - q = p.queue_helper - if op_code in OpCodes.OFF: +def pack_acs_ctrl_command(q: DefaultPusQueueHelper, cmd_str: str): + if cmd_str == OpCode.OFF: q.add_log_cmd(f"{Info.OFF}") q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.OFF, 0)) - elif op_code in OpCodes.ON: + elif cmd_str == OpCode.ON: q.add_log_cmd(f"{Info.ON}") q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.ON, 0)) - elif op_code in OpCodes.NML: + elif cmd_str == OpCode.NML: q.add_log_cmd(f"{Info.NML}") q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.NORMAL, 0)) - elif op_code in OpCodes.SET_PARAMETER: + elif cmd_str in OpCode.SET_PARAMETER: q.add_log_cmd(f"{Info.SET_PARAMETER}") set_pwr_ctrl_param(q) - elif op_code in OpCodes.REQUEST_CORE_HK: + elif cmd_str == OpCode.REQUEST_CORE_HK: q.add_log_cmd(Info.REQUEST_CORE_HK) q.add_pus_tc( generate_one_hk_command(make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET)) ) - elif op_code in OpCodes.ENABLE_CORE_HK: + elif cmd_str == OpCode.ENABLE_CORE_HK: interval = float(input("Please specify interval in floating point seconds: ")) q.add_log_cmd(Info.ENABLE_CORE_HK) cmd_tuple = enable_periodic_hk_command_with_interval( @@ -124,19 +109,19 @@ def pack_acs_ctrl_command(p: ServiceProviderParams): ) q.add_pus_tc(cmd_tuple[0]) q.add_pus_tc(cmd_tuple[1]) - elif op_code in OpCodes.DISABLE_CORE_HK: + elif cmd_str == OpCode.DISABLE_CORE_HK: q.add_log_cmd(Info.DISABLE_CORE_HK) q.add_pus_tc( disable_periodic_hk_command( False, make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET) ) ) - elif op_code in OpCodes.REQUEST_ENABLE_PL_HK: + elif cmd_str == OpCode.REQUEST_ENABLE_PL_HK: q.add_log_cmd(Info.REQUEST_ENABLE_PL_HK) q.add_pus_tc( generate_one_hk_command(make_sid(PWR_CONTROLLER, SetId.ENABLE_PL_SET)) ) - elif op_code in OpCodes.ENABLE_ENABLE_PL_HK: + elif cmd_str == OpCode.ENABLE_ENABLE_PL_HK: interval = float(input("Please specify interval in floating point seconds: ")) q.add_log_cmd(Info.ENABLE_ENABLE_PL_HK) cmd_tuple = enable_periodic_hk_command_with_interval( @@ -144,7 +129,7 @@ def pack_acs_ctrl_command(p: ServiceProviderParams): ) q.add_pus_tc(cmd_tuple[0]) q.add_pus_tc(cmd_tuple[1]) - elif op_code in OpCodes.DISABLE_ENABLE_PL_HK: + elif cmd_str == OpCode.DISABLE_ENABLE_PL_HK: q.add_log_cmd(Info.DISABLE_ENABLE_PL_HK) q.add_pus_tc( disable_periodic_hk_command( diff --git a/eive_tmtc/tmtc/power/subsystem.py b/eive_tmtc/tmtc/power/subsystem.py index 71c1443..6af172e 100644 --- a/eive_tmtc/tmtc/power/subsystem.py +++ b/eive_tmtc/tmtc/power/subsystem.py @@ -1,18 +1,16 @@ import enum -from typing import Tuple, Dict +from typing import Dict, Tuple from spacepackets.ecss import PusTelecommand -from eive_tmtc.tmtc.common import pack_mode_cmd_with_info -from eive_tmtc.config.object_ids import EPS_SUBSYSTEM_ID -from eive_tmtc.config.definitions import CustomServiceList from tmtccmd.config.tmtc import ( - tmtc_definitions_provider, - TmtcDefinitionWrapper, - OpCodeEntry, + CmdTreeNode, ) -from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices, Mode -from tmtccmd.tmtc import service_provider -from tmtccmd.tmtc.decorator import ServiceProviderParams +from tmtccmd.pus.s200_fsfw_mode import Mode +from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices +from tmtccmd.tmtc import DefaultPusQueueHelper + +from eive_tmtc.config.object_ids import EPS_SUBSYSTEM_ID +from eive_tmtc.tmtc.common import pack_mode_cmd_with_info class OpCode(str, enum.Enum): @@ -33,12 +31,9 @@ HANDLER_LIST: Dict[str, Tuple[int, int, str]] = { } -@service_provider(CustomServiceList.EPS_SS.value) -def build_eps_subsystem_cmd(p: ServiceProviderParams): - op_code = p.op_code - q = p.queue_helper +def build_eps_subsystem_cmd(q: DefaultPusQueueHelper, cmd_str: str): info_prefix = "EPS Subsystem" - if op_code in OpCode.REPORT_ALL_MODES: + if cmd_str in OpCode.REPORT_ALL_MODES: q.add_log_cmd(f"{info_prefix}: {Info.REPORT_ALL_MODES}") q.add_pus_tc( PusTelecommand( @@ -47,7 +42,7 @@ def build_eps_subsystem_cmd(p: ServiceProviderParams): app_data=EPS_SUBSYSTEM_ID, ) ) - mode_info_tup = HANDLER_LIST.get(op_code) + mode_info_tup = HANDLER_LIST.get(cmd_str) if mode_info_tup is None: return pack_mode_cmd_with_info( @@ -59,10 +54,9 @@ def build_eps_subsystem_cmd(p: ServiceProviderParams): ) -@tmtc_definitions_provider -def add_eps_subsystem_cmds(defs: TmtcDefinitionWrapper): - oce = OpCodeEntry() - for op_code, (_, _, info) in HANDLER_LIST.items(): - oce.add(op_code, info) - oce.add(OpCode.REPORT_ALL_MODES, Info.REPORT_ALL_MODES) - defs.add_service(CustomServiceList.EPS_SS, "EPS Subsystem", oce) +def create_eps_subsystem_node() -> CmdTreeNode: + eps_node = CmdTreeNode("eps", "EPS Subsystem") + for cmd_str, (_, _, info) in HANDLER_LIST.items(): + eps_node.add_child(CmdTreeNode(cmd_str, info)) + eps_node.add_child(CmdTreeNode(OpCode.REPORT_ALL_MODES, Info.REPORT_ALL_MODES)) + return eps_node diff --git a/eive_tmtc/tmtc/system.py b/eive_tmtc/tmtc/system.py index b040131..36a0c17 100644 --- a/eive_tmtc/tmtc/system.py +++ b/eive_tmtc/tmtc/system.py @@ -3,18 +3,18 @@ import enum from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.tmtc.acs.subsystem import AcsMode from tmtccmd.config.tmtc import ( + CmdTreeNode, tmtc_definitions_provider, TmtcDefinitionWrapper, OpCodeEntry, ) -from tmtccmd.tmtc import service_provider +from tmtccmd.tmtc import DefaultPusQueueHelper from eive_tmtc.config.object_ids import EIVE_SYSTEM_ID from tmtccmd.pus.s200_fsfw_mode import ( create_mode_command, create_announce_mode_recursive_command, ) from tmtccmd.pus.s8_fsfw_action import create_action_cmd -from tmtccmd.tmtc.decorator import ServiceProviderParams class SystemMode: @@ -55,40 +55,49 @@ class Info: REBOOT_I2C = "Reboot I2C bus" -@service_provider(CustomServiceList.SYSTEM.value) -def build_system_cmds(p: ServiceProviderParams): - o = p.op_code - q = p.queue_helper +def build_system_cmds(q: DefaultPusQueueHelper, cmd_str: str): prefix = "EIVE System" - if o == OpCode.SAFE_MODE: + if cmd_str == OpCode.SAFE_MODE: q.add_log_cmd(f"{prefix}: {Info.SAFE_MODE}") q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.SAFE, 0)) - elif o == OpCode.IDLE_MODE: + elif cmd_str == OpCode.IDLE_MODE: q.add_log_cmd(f"{prefix}: {Info.IDLE_MODE}") q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.IDLE, 0)) - elif o == OpCode.NADIR_MODE: + elif cmd_str == OpCode.NADIR_MODE: q.add_log_cmd(f"{prefix}: {Info.NADIR_MODE}") q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.PTG_NADIR, 0)) - elif o == OpCode.TARGET_MODE: + elif cmd_str == OpCode.TARGET_MODE: q.add_log_cmd(f"{prefix}: {Info.TARGET_MODE}") q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.PTG_TARGET, 0)) - elif o == OpCode.TARGET_GS_MODE: + elif cmd_str == OpCode.TARGET_GS_MODE: q.add_log_cmd(f"{prefix}: {Info.TARGET_GS_MODE}") q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.PTG_TARGET_GS, 0)) - elif o == OpCode.INERTIAL_MODE: + elif cmd_str == OpCode.INERTIAL_MODE: q.add_log_cmd(f"{prefix}: {Info.INERTIAL_MODE}") q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.PTG_INERTIAL, 0)) - elif o == OpCode.ANNOUNCE_MODES: + elif cmd_str == OpCode.ANNOUNCE_MODES: q.add_log_cmd(f"{prefix}: {Info.ANNOUNCE_MODES}") q.add_pus_tc(create_announce_mode_recursive_command(EIVE_SYSTEM_ID)) - elif o == OpCode.BOOT_MODE: + elif cmd_str == OpCode.BOOT_MODE: q.add_log_cmd(f"{prefix}: {Info.BOOT_MODE}") q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.BOOT, 0)) - elif o == OpCode.REBOOT_I2C: + elif cmd_str == OpCode.REBOOT_I2C: q.add_log_cmd(f"{prefix}: {Info.REBOOT_I2C}") q.add_pus_tc(create_action_cmd(EIVE_SYSTEM_ID, ActionId.EXECUTE_I2C_REBOOT)) +def create_system_node() -> CmdTreeNode: + op_code_strs = [ + getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__") + ] + info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")] + combined_dict = dict(zip(op_code_strs, info_strs)) + node = CmdTreeNode("system", "EIVE System") + for op_code, info in combined_dict.items(): + node.add_child(CmdTreeNode(op_code, info)) + return node + + @tmtc_definitions_provider def add_system_cmd_defs(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() diff --git a/eive_tmtc/tmtc/test.py b/eive_tmtc/tmtc/test.py index c484a38..876713b 100644 --- a/eive_tmtc/tmtc/test.py +++ b/eive_tmtc/tmtc/test.py @@ -1,16 +1,10 @@ -from spacepackets.ecss import PusTelecommand, PusService -from tmtccmd.config import CoreServiceList -from tmtccmd.config.tmtc import ( - tmtc_definitions_provider, - TmtcDefinitionWrapper, - OpCodeEntry, -) +from spacepackets.ecss import PusService, PusTelecommand +from tmtccmd.config import CmdTreeNode from tmtccmd.pus.s17_test import create_service_17_ping_command -from tmtccmd.tmtc import service_provider -from tmtccmd.tmtc.decorator import ServiceProviderParams +from tmtccmd.tmtc import DefaultPusQueueHelper -class OpCodes: +class OpCode: PING = "ping" TRIGGER_EVENT = "trig_event" PING_WITH_DATA = "ping_with_data" @@ -22,31 +16,22 @@ class Info: PING_WITH_DATA = "Ping with data. Size of sent data is sent back" -@tmtc_definitions_provider -def add_test_defs(defs: TmtcDefinitionWrapper): - oce = OpCodeEntry() - oce.add(keys=OpCodes.PING, info=Info.PING) - oce.add(keys=OpCodes.TRIGGER_EVENT, info=Info.TRIGGER_EVENT) - oce.add(keys=OpCodes.PING_WITH_DATA, info=Info.PING_WITH_DATA) - - defs.add_service( - name=CoreServiceList.SERVICE_17_ALT, - info="PUS 17 Test Service", - op_code_entry=oce, - ) +def create_test_node() -> CmdTreeNode: + node = CmdTreeNode("test", "Test Commands") + node.add_child(CmdTreeNode(OpCode.PING, Info.PING)) + node.add_child(CmdTreeNode(OpCode.TRIGGER_EVENT, Info.TRIGGER_EVENT)) + node.add_child(CmdTreeNode(OpCode.PING_WITH_DATA, Info.PING_WITH_DATA)) + return node -@service_provider(CoreServiceList.SERVICE_17_ALT) -def pack_test_command(p: ServiceProviderParams): - info = p.info - q = p.queue_helper - if info.op_code == OpCodes.PING: +def pack_test_command(q: DefaultPusQueueHelper, cmd_path: str): + if cmd_path == OpCode.PING: q.add_log_cmd("Sending PUS TC [17,1]") q.add_pus_tc(create_service_17_ping_command()) - if info.op_code == OpCodes.TRIGGER_EVENT: + if cmd_path == OpCode.TRIGGER_EVENT: q.add_log_cmd("Sending PUS TC Event Trigger [17, 128]") q.add_pus_tc(PusTelecommand(service=PusService.S17_TEST, subservice=128)) - if info.op_code == OpCodes.PING_WITH_DATA: + if cmd_path == OpCode.PING_WITH_DATA: q.add_log_cmd("Sending Ping With Data, Size Reported Back [17, 129]") while True: data_size = int(input("Please specify data size [0-1024]: ")) @@ -55,7 +40,7 @@ def pack_test_command(p: ServiceProviderParams): break dummy_data = bytearray() next_byte = True - for i in range(data_size): + for _ in range(data_size): dummy_data.append(int(next_byte)) next_byte = not next_byte q.add_pus_tc( diff --git a/eive_tmtc/tmtc/tm_store.py b/eive_tmtc/tmtc/tm_store.py index 3f4fc5f..6d5f9e8 100644 --- a/eive_tmtc/tmtc/tm_store.py +++ b/eive_tmtc/tmtc/tm_store.py @@ -3,26 +3,25 @@ import enum import logging import math import struct +from typing import Tuple +from dateutil.parser import parse +from spacepackets.ecss.pus_15_tm_storage import Subservice +from spacepackets.ecss.tc import PusTelecommand +from tmtccmd.config import CmdTreeNode, TmtcDefinitionWrapper +from tmtccmd.config.tmtc import OpCodeEntry +from tmtccmd.tmtc.queue import DefaultPusQueueHelper +from tmtccmd.util import ObjectIdU32 + +from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.object_ids import ( + CFDP_TM_STORE, HK_TM_STORE, MISC_TM_STORE, - OK_TM_STORE, NOT_OK_TM_STORE, - CFDP_TM_STORE, + OK_TM_STORE, get_object_ids, ) -from eive_tmtc.config.definitions import CustomServiceList -from tmtccmd.config import TmtcDefinitionWrapper -from tmtccmd.config.tmtc import tmtc_definitions_provider, OpCodeEntry -from tmtccmd.tmtc import service_provider -from tmtccmd.tmtc.decorator import ServiceProviderParams -from dateutil.parser import parse - -from spacepackets.ecss import PusService # noqa -from spacepackets.ecss.tc import PusTelecommand -from spacepackets.ecss.pus_15_tm_storage import Subservice -from tmtccmd.util import ObjectIdU32 class OpCode: @@ -38,11 +37,8 @@ class Info: _LOGGER = logging.getLogger(__name__) -@service_provider(CustomServiceList.TM_STORE) -def pack_tm_store_commands(p: ServiceProviderParams): - q = p.queue_helper - o = p.op_code - if o == OpCode.DELETE_UP_TO: +def pack_tm_store_commands(q: DefaultPusQueueHelper, cmd_path: str): + if cmd_path == OpCode.DELETE_UP_TO: obj_id, store_string = store_select_prompt() app_data = bytearray(obj_id.as_bytes) delete_up_to_time = time_prompt("Determining deletion end time") @@ -56,7 +52,7 @@ def pack_tm_store_commands(p: ServiceProviderParams): service=15, subservice=Subservice.DELETE_UP_TO, app_data=app_data ) ) - elif o == OpCode.RETRIEVAL_BY_TIME_RANGE: + elif cmd_path == OpCode.RETRIEVAL_BY_TIME_RANGE: q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE) obj_id, store_string = store_select_prompt() app_data = bytearray(obj_id.as_bytes) @@ -80,7 +76,15 @@ def pack_tm_store_commands(p: ServiceProviderParams): ) -@tmtc_definitions_provider +def create_persistent_tm_store_node() -> CmdTreeNode: + node = CmdTreeNode("tm_store", "Persistent TM Store") + node.add_child(CmdTreeNode(OpCode.DELETE_UP_TO, Info.DELETE_UP_TO)) + node.add_child( + CmdTreeNode(OpCode.RETRIEVAL_BY_TIME_RANGE, Info.RETRIEVAL_BY_TIME_RANGE) + ) + return node + + def add_persistent_tm_store_cmd_defs(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=OpCode.DELETE_UP_TO, info=Info.DELETE_UP_TO) @@ -130,6 +134,7 @@ def time_prompt(info_str: str) -> datetime.datetime: return time_prompt_fully_manually() elif time_input_key == 2: return time_prompt_offset_from_now() + raise ValueError("can not determine datetime") def time_prompt_fully_manually() -> datetime.datetime: @@ -160,7 +165,7 @@ def time_prompt_offset_from_now() -> datetime.datetime: return time_now_with_offset -def store_select_prompt() -> (ObjectIdU32, str): +def store_select_prompt() -> Tuple[ObjectIdU32, str]: obj_id_dict = get_object_ids() print("Available TM stores:") for k, v in STORE_DICT.items():