OOF3
EIVE/-/pipeline/head This commit looks good Details

This commit is contained in:
Robin Müller 2023-11-22 13:51:33 +01:00
parent 02d9d6adfc
commit d640d547bd
Signed by: muellerr
GPG Key ID: A649FB78196E3849
15 changed files with 300 additions and 347 deletions

View File

@ -1,17 +1,27 @@
from typing import Optional from typing import Optional
from eive_tmtc.config.definitions import SPACE_PACKET_IDS from tmtccmd import CcsdsTmtcBackend, HookBase
from eive_tmtc.tmtc.payload.ploc_supervisor import create_ploc_supv_node
from eive_tmtc.tmtc.payload.scex import create_scex_node
from eive_tmtc.tmtc.time import create_time_node
from eive_tmtc.tmtc.payload.ploc_mpsoc import create_ploc_mpsoc_node
from tmtccmd import HookBase, CcsdsTmtcBackend
from tmtccmd.com import ComInterface from tmtccmd.com import ComInterface
from tmtccmd.config import CmdTreeNode from tmtccmd.config import CmdTreeNode
from eive_tmtc.config.retvals import get_retval_dict
from tmtccmd.util import ObjectIdDictT, RetvalDictT from tmtccmd.util import ObjectIdDictT, RetvalDictT
from eive_tmtc.config.definitions import SPACE_PACKET_IDS
from eive_tmtc.config.retvals import get_retval_dict
from eive_tmtc.tmtc.com.subsystem import create_com_subsystem_node
from eive_tmtc.tmtc.health import create_global_health_node
from eive_tmtc.tmtc.acs.subsystem import create_acs_subsystem_node
from eive_tmtc.tmtc.payload.ploc_mpsoc import create_ploc_mpsoc_node
from eive_tmtc.tmtc.payload.ploc_supervisor import create_ploc_supv_node
from eive_tmtc.tmtc.payload.scex import create_scex_node
from eive_tmtc.tmtc.payload.subsystem import create_payload_subsystem_node
from eive_tmtc.tmtc.power.bpx_batt import create_bpx_batt_node
from eive_tmtc.tmtc.power.pwr_ctrl import create_pwr_ctrl_node
from eive_tmtc.tmtc.power.subsystem import create_eps_subsystem_node
from eive_tmtc.tmtc.system import create_system_node
from eive_tmtc.tmtc.test import create_test_node
from eive_tmtc.tmtc.time import create_time_node
from eive_tmtc.tmtc.tm_store import create_persistent_tm_store_node
class EiveHookObject(HookBase): class EiveHookObject(HookBase):
def __init__(self, json_cfg_path: str): def __init__(self, json_cfg_path: str):
@ -34,9 +44,7 @@ class EiveHookObject(HookBase):
assy_node = CmdTreeNode("assy", "Assembly Commands") assy_node = CmdTreeNode("assy", "Assembly Commands")
assy_node.add_child(mode_node) assy_node.add_child(mode_node)
system_node = CmdTreeNode("sys", "EIVE System") acs_node = create_acs_subsystem_node()
acs_node = CmdTreeNode("acs", "ACS Subsystem")
acs_brd_assy_node = CmdTreeNode("acs_brd_assy", "ACS Board Assembly") acs_brd_assy_node = CmdTreeNode("acs_brd_assy", "ACS Board Assembly")
acs_brd_assy_node.add_child(mode_node) acs_brd_assy_node.add_child(mode_node)
mgm_devs = CmdTreeNode("mgm_devs", "MGM Devices") mgm_devs = CmdTreeNode("mgm_devs", "MGM Devices")
@ -88,7 +96,7 @@ class EiveHookObject(HookBase):
tcs_brd_assy = CmdTreeNode("tcs_brd_assy", "TCS Board Assembly") tcs_brd_assy = CmdTreeNode("tcs_brd_assy", "TCS Board Assembly")
tcs_node.add_child(tcs_brd_assy) tcs_node.add_child(tcs_brd_assy)
com_node = CmdTreeNode("com", "COM Subsystem") com_node = create_com_subsystem_node()
syrlinks_node = CmdTreeNode("syrlinks", "Syrlinks") syrlinks_node = CmdTreeNode("syrlinks", "Syrlinks")
syrlinks_node.add_child(dev_node) syrlinks_node.add_child(dev_node)
syrlinks_node.add_child(assy_node) syrlinks_node.add_child(assy_node)
@ -97,19 +105,19 @@ class EiveHookObject(HookBase):
ccsds_node.add_child(action_node) ccsds_node.add_child(action_node)
com_node.add_child(syrlinks_node) com_node.add_child(syrlinks_node)
eps_node = CmdTreeNode("eps", "EPS Subsystem") eps_node = create_eps_subsystem_node()
acu_node = CmdTreeNode("acu", "PCDU ACU component") acu_node = CmdTreeNode("acu", "PCDU ACU component")
pdu_1_node = CmdTreeNode("pdu1", "PCDU PDU 1 component") pdu_1_node = CmdTreeNode("pdu1", "PCDU PDU 1 component")
pdu_2_node = CmdTreeNode("pdu2", "PCDU PDU 2 component") pdu_2_node = CmdTreeNode("pdu2", "PCDU PDU 2 component")
p60_dock_node = CmdTreeNode("p60_dock", "PCDU P60 Dock component") p60_dock_node = CmdTreeNode("p60_dock", "PCDU P60 Dock component")
bat_node = CmdTreeNode("bat", "Battery Component") eps_node.add_child(create_pwr_ctrl_node())
eps_node.add_child(acu_node) eps_node.add_child(acu_node)
eps_node.add_child(pdu_1_node) eps_node.add_child(pdu_1_node)
eps_node.add_child(pdu_2_node) eps_node.add_child(pdu_2_node)
eps_node.add_child(p60_dock_node) eps_node.add_child(p60_dock_node)
eps_node.add_child(bat_node) eps_node.add_child(create_bpx_batt_node())
payload_node = CmdTreeNode("payload", "Payload Subsystem") payload_node = create_payload_subsystem_node()
pl_pcdu = CmdTreeNode("pl_pcdu", "Payload PCDU") pl_pcdu = CmdTreeNode("pl_pcdu", "Payload PCDU")
payload_node.add_child(pl_pcdu) payload_node.add_child(pl_pcdu)
payload_node.add_child(create_scex_node()) payload_node.add_child(create_scex_node())
@ -122,8 +130,11 @@ class EiveHookObject(HookBase):
obdh_node.add_child(xiphos_wdt) obdh_node.add_child(xiphos_wdt)
obdh_node.add_child(core_ctrl) obdh_node.add_child(core_ctrl)
obdh_node.add_child(create_time_node()) obdh_node.add_child(create_time_node())
obdh_node.add_child(create_persistent_tm_store_node())
root_node.add_child(system_node) root_node.add_child(create_test_node())
root_node.add_child(create_system_node())
root_node.add_child(create_global_health_node())
root_node.add_child(acs_node) root_node.add_child(acs_node)
root_node.add_child(tcs_node) root_node.add_child(tcs_node)
root_node.add_child(com_node) root_node.add_child(com_node)
@ -134,8 +145,8 @@ class EiveHookObject(HookBase):
def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
from tmtccmd.config.com import ( from tmtccmd.config.com import (
create_com_interface_default,
create_com_interface_cfg_default, create_com_interface_cfg_default,
create_com_interface_default,
) )
assert self.cfg_path is not None assert self.cfg_path is not None

View File

@ -1,7 +1,4 @@
from .payload.subsystem import add_payload_subsystem_cmds
from .solar_array_deployment import add_sa_depl_cmds from .solar_array_deployment import add_sa_depl_cmds
from .test import add_test_defs
from .health import add_health_cmd_defs
from .system import add_system_cmd_defs from .system import add_system_cmd_defs
from .tm_store import add_persistent_tm_store_cmd_defs from .tm_store import add_persistent_tm_store_cmd_defs
from .tcs import add_tmp_sens_cmds from .tcs import add_tmp_sens_cmds

View File

@ -1,22 +1,19 @@
import enum import enum
from typing import Tuple, Dict from typing import Dict, Tuple
from eive_tmtc.tmtc.acs.defs import AcsMode, SafeSubmode
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand
from eive_tmtc.tmtc.common import pack_mode_cmd_with_info
from eive_tmtc.config.object_ids import ACS_SUBSYSTEM_ID
from eive_tmtc.config.definitions import CustomServiceList
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
tmtc_definitions_provider, CmdTreeNode,
TmtcDefinitionWrapper,
OpCodeEntry,
) )
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices
from tmtccmd.tmtc import service_provider from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.tmtc.decorator import ServiceProviderParams
from eive_tmtc.config.object_ids import ACS_SUBSYSTEM_ID
from eive_tmtc.tmtc.acs.defs import AcsMode, SafeSubmode
from eive_tmtc.tmtc.common import pack_mode_cmd_with_info
class OpCode(str, enum.Enum): class CmdStr(str, enum.Enum):
OFF = "off" OFF = "off"
SAFE = "safe" SAFE = "safe"
DETUMBLE = "detumble" DETUMBLE = "detumble"
@ -41,23 +38,20 @@ class Info(str, enum.Enum):
HANDLER_LIST: Dict[str, Tuple[int, int, str]] = { HANDLER_LIST: Dict[str, Tuple[int, int, str]] = {
OpCode.OFF: (AcsMode.OFF, 0, Info.OFF), CmdStr.OFF: (AcsMode.OFF, 0, Info.OFF),
OpCode.SAFE: (AcsMode.SAFE, SafeSubmode.DEFAULT, Info.SAFE), CmdStr.SAFE: (AcsMode.SAFE, SafeSubmode.DEFAULT, Info.SAFE),
OpCode.DETUMBLE: (AcsMode.SAFE, SafeSubmode.DETUMBLE, Info.DETUMBLE), CmdStr.DETUMBLE: (AcsMode.SAFE, SafeSubmode.DETUMBLE, Info.DETUMBLE),
OpCode.IDLE: (AcsMode.IDLE, 0, Info.IDLE), CmdStr.IDLE: (AcsMode.IDLE, 0, Info.IDLE),
OpCode.PTG_TARGET: (AcsMode.PTG_TARGET, 0, Info.PTG_TARGET), CmdStr.PTG_TARGET: (AcsMode.PTG_TARGET, 0, Info.PTG_TARGET),
OpCode.PTG_TARGET_GS: (AcsMode.PTG_TARGET_GS, 0, Info.PTG_TARGET_GS), CmdStr.PTG_TARGET_GS: (AcsMode.PTG_TARGET_GS, 0, Info.PTG_TARGET_GS),
OpCode.PTG_TARGET_NADIR: (AcsMode.PTG_NADIR, 0, Info.PTG_TARGET_NADIR), CmdStr.PTG_TARGET_NADIR: (AcsMode.PTG_NADIR, 0, Info.PTG_TARGET_NADIR),
OpCode.PTG_TARGET_INERTIAL: (AcsMode.PTG_INERTIAL, 0, Info.PTG_TARGET_INERTIAL), CmdStr.PTG_TARGET_INERTIAL: (AcsMode.PTG_INERTIAL, 0, Info.PTG_TARGET_INERTIAL),
} }
@service_provider(CustomServiceList.ACS_SS.value) def build_acs_subsystem_cmd(q: DefaultPusQueueHelper, cmd_path: str):
def build_acs_subsystem_cmd(p: ServiceProviderParams):
op_code = p.op_code
q = p.queue_helper
info_prefix = "ACS Subsystem" info_prefix = "ACS Subsystem"
if op_code in OpCode.REPORT_ALL_MODES: if cmd_path in CmdStr.REPORT_ALL_MODES:
q.add_log_cmd(f"{info_prefix}: {Info.REPORT_ALL_MODES}") q.add_log_cmd(f"{info_prefix}: {Info.REPORT_ALL_MODES}")
q.add_pus_tc( q.add_pus_tc(
PusTelecommand( PusTelecommand(
@ -66,7 +60,7 @@ def build_acs_subsystem_cmd(p: ServiceProviderParams):
app_data=ACS_SUBSYSTEM_ID, app_data=ACS_SUBSYSTEM_ID,
) )
) )
mode_info_tup = HANDLER_LIST.get(op_code) mode_info_tup = HANDLER_LIST.get(cmd_path)
if mode_info_tup is None: if mode_info_tup is None:
return return
pack_mode_cmd_with_info( pack_mode_cmd_with_info(
@ -78,10 +72,9 @@ def build_acs_subsystem_cmd(p: ServiceProviderParams):
) )
@tmtc_definitions_provider def create_acs_subsystem_node() -> CmdTreeNode:
def add_acs_subsystem_cmds(defs: TmtcDefinitionWrapper): node = CmdTreeNode("acs", "ACS Subsystem")
oce = OpCodeEntry() for cmd_str, (_, _, info) in HANDLER_LIST.items():
for op_code, (_, _, info) in HANDLER_LIST.items(): node.add_child(CmdTreeNode(cmd_str, info))
oce.add(op_code, info) node.add_child(CmdTreeNode(CmdStr.REPORT_ALL_MODES, Info.REPORT_ALL_MODES))
oce.add(OpCode.REPORT_ALL_MODES, Info.REPORT_ALL_MODES) return node
defs.add_service(CustomServiceList.ACS_SS, "ACS Subsystem", oce)

View File

@ -1 +0,0 @@
from .subsystem import add_com_subsystem_cmds

View File

@ -1,30 +1,25 @@
import enum import enum
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import COM_SUBSYSTEM_ID
from eive_tmtc.tmtc.com.syrlinks_handler import Datarate
from tmtccmd.pus.s20_fsfw_param_defs import create_scalar_u8_parameter
from .defs import Mode as ComMode
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
tmtc_definitions_provider, CmdTreeNode,
TmtcDefinitionWrapper,
OpCodeEntry,
)
from tmtccmd.tmtc import service_provider
from tmtccmd.tmtc.decorator import ServiceProviderParams
from tmtccmd.pus.s200_fsfw_mode import (
create_mode_command,
create_read_mode_command,
create_announce_mode_command,
create_announce_mode_recursive_command,
) )
from tmtccmd.pus.s20_fsfw_param import ( from tmtccmd.pus.s20_fsfw_param import (
create_load_param_cmd, create_load_param_cmd,
create_scalar_u32_parameter,
) )
from tmtccmd.pus.s20_fsfw_param_defs import create_scalar_u8_parameter
from tmtccmd.pus.s200_fsfw_mode import (
create_announce_mode_command,
create_announce_mode_recursive_command,
create_mode_command,
create_read_mode_command,
)
from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.pus.s20_fsfw_param import create_scalar_u32_parameter from eive_tmtc.config.object_ids import COM_SUBSYSTEM_ID
from eive_tmtc.tmtc.com.syrlinks_handler import Datarate
from .defs import Mode as ComMode
class ParameterId(enum.IntEnum): class ParameterId(enum.IntEnum):
@ -48,9 +43,9 @@ class OpCode:
class Info: class Info:
RX_ONLY = "Syrlinks RX Only" RX_ONLY = "Syrlinks RX Only"
TX_AND_RX_DEF_DATARATE = "Syrlinks with TX default datarate" TX_AND_RX_DEF_RATE = "Syrlinks with TX default datarate"
TX_AND_RX_LOW_DATARATE = "Syrlinks with TX low datarate (BPSK modulation)" TX_AND_RX_LOW_RATE = "Syrlinks with TX low datarate (BPSK modulation)"
TX_AND_RX_HIGH_DATARATE = "Syrlinks with TX high datarate (0QPSK modulation)" TX_AND_RX_HIGH_RATE = "Syrlinks with TX high datarate (0QPSK modulation)"
TX_AND_RX_CARRIER_WAVE = "Syrlinks with TX carrier wave" TX_AND_RX_CARRIER_WAVE = "Syrlinks with TX carrier wave"
UPDATE_DEFAULT_DATARATE_LOW = "Configure default low datarate (BPSK modulation)" UPDATE_DEFAULT_DATARATE_LOW = "Configure default low datarate (BPSK modulation)"
UPDATE_DEFAULT_DATARATE_HIGH = "Configure default high datarate (0QPSK modulation)" UPDATE_DEFAULT_DATARATE_HIGH = "Configure default high datarate (0QPSK modulation)"
@ -60,30 +55,27 @@ class Info:
ANNOUNCE_MODE_RECURSIVE = "Announce mode recursively" ANNOUNCE_MODE_RECURSIVE = "Announce mode recursively"
@service_provider(CustomServiceList.COM_SS) def build_com_subsystem_cmd(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901
def build_com_subsystem_cmd(p: ServiceProviderParams): # noqa C901
q = p.queue_helper
o = p.op_code
prefix = "COM Subsystem" prefix = "COM Subsystem"
if o == OpCode.RX_ONLY: if cmd_str == OpCode.RX_ONLY:
q.add_log_cmd(Info.RX_ONLY) q.add_log_cmd(Info.RX_ONLY)
q.add_pus_tc(create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_ONLY, 0)) q.add_pus_tc(create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_ONLY, 0))
elif o == OpCode.TX_AND_RX_DEF_RATE: elif cmd_str == OpCode.TX_AND_RX_DEF_RATE:
q.add_log_cmd(Info.TX_AND_RX_DEF_DATARATE) q.add_log_cmd(Info.TX_AND_RX_DEF_DATARATE)
q.add_pus_tc( q.add_pus_tc(
create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_DEF_DATARATE, 0) create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_DEF_DATARATE, 0)
) )
elif o == OpCode.TX_AND_RX_LOW_RATE: elif cmd_str == OpCode.TX_AND_RX_LOW_RATE:
q.add_log_cmd(Info.TX_AND_RX_LOW_DATARATE) q.add_log_cmd(Info.TX_AND_RX_LOW_DATARATE)
q.add_pus_tc( q.add_pus_tc(
create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_LOW_DATARATE, 0) create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_LOW_DATARATE, 0)
) )
elif o == OpCode.TX_AND_RX_HIGH_RATE: elif cmd_str == OpCode.TX_AND_RX_HIGH_RATE:
q.add_log_cmd(Info.TX_AND_RX_HIGH_DATARATE) q.add_log_cmd(Info.TX_AND_RX_HIGH_DATARATE)
q.add_pus_tc( q.add_pus_tc(
create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_HIGH_DATARATE, 0) create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_HIGH_DATARATE, 0)
) )
if o == OpCode.UPDATE_DEFAULT_DATARATE_LOW: if cmd_str == OpCode.UPDATE_DEFAULT_DATARATE_LOW:
q.add_log_cmd(f"{prefix}: {Info.UPDATE_DEFAULT_DATARATE_LOW}") q.add_log_cmd(f"{prefix}: {Info.UPDATE_DEFAULT_DATARATE_LOW}")
q.add_pus_tc( q.add_pus_tc(
create_load_param_cmd( create_load_param_cmd(
@ -95,7 +87,7 @@ def build_com_subsystem_cmd(p: ServiceProviderParams): # noqa C901
) )
) )
) )
if o == OpCode.UPDATE_DEFAULT_DATARATE_HIGH: if cmd_str == OpCode.UPDATE_DEFAULT_DATARATE_HIGH:
q.add_log_cmd(f"{prefix}: {Info.UPDATE_DEFAULT_DATARATE_HIGH}") q.add_log_cmd(f"{prefix}: {Info.UPDATE_DEFAULT_DATARATE_HIGH}")
q.add_pus_tc( q.add_pus_tc(
create_load_param_cmd( create_load_param_cmd(
@ -107,12 +99,12 @@ def build_com_subsystem_cmd(p: ServiceProviderParams): # noqa C901
) )
) )
) )
elif o == OpCode.TX_AND_RX_CARRIER_WAVE: elif cmd_str == OpCode.TX_AND_RX_CARRIER_WAVE:
q.add_log_cmd(Info.TX_AND_RX_CARRIER_WAVE) q.add_log_cmd(Info.TX_AND_RX_CARRIER_WAVE)
q.add_pus_tc( q.add_pus_tc(
create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_CARRIER_WAVE, 0) create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_CARRIER_WAVE, 0)
) )
elif o == OpCode.CHANGE_TRANSMITTER_TIMEOUT: elif cmd_str == OpCode.CHANGE_TRANSMITTER_TIMEOUT:
timeout = int(input("Specify timeout to set [ms]: ")) timeout = int(input("Specify timeout to set [ms]: "))
q.add_log_cmd(Info.CHANGE_TRANSMITTER_TIMEOUT) q.add_log_cmd(Info.CHANGE_TRANSMITTER_TIMEOUT)
q.add_pus_tc( q.add_pus_tc(
@ -125,28 +117,24 @@ def build_com_subsystem_cmd(p: ServiceProviderParams): # noqa C901
) )
) )
) )
elif o == OpCode.READ_MODE: elif cmd_str == OpCode.READ_MODE:
q.add_log_cmd(Info.READ_MODE) q.add_log_cmd(Info.READ_MODE)
q.add_pus_tc(create_read_mode_command(COM_SUBSYSTEM_ID)) q.add_pus_tc(create_read_mode_command(COM_SUBSYSTEM_ID))
elif o == OpCode.ANNOUNCE_MODE: elif cmd_str == OpCode.ANNOUNCE_MODE:
q.add_log_cmd(Info.ANNOUNCE_MODE) q.add_log_cmd(Info.ANNOUNCE_MODE)
q.add_pus_tc(create_announce_mode_command(COM_SUBSYSTEM_ID)) q.add_pus_tc(create_announce_mode_command(COM_SUBSYSTEM_ID))
elif o == OpCode.ANNOUNCE_MODE_RECURSIVE: elif cmd_str == OpCode.ANNOUNCE_MODE_RECURSIVE:
q.add_log_cmd(Info.ANNOUNCE_MODE_RECURSIVE) q.add_log_cmd(Info.ANNOUNCE_MODE_RECURSIVE)
q.add_pus_tc(create_announce_mode_recursive_command(COM_SUBSYSTEM_ID)) q.add_pus_tc(create_announce_mode_recursive_command(COM_SUBSYSTEM_ID))
@tmtc_definitions_provider def create_com_subsystem_node() -> CmdTreeNode:
def add_com_subsystem_cmds(defs: TmtcDefinitionWrapper): op_code_strs = [
oce = OpCodeEntry() getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__")
oce.add(OpCode.RX_ONLY, Info.RX_ONLY) ]
oce.add(OpCode.TX_AND_RX_LOW_RATE, Info.TX_AND_RX_LOW_DATARATE) info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
oce.add(OpCode.TX_AND_RX_HIGH_RATE, Info.TX_AND_RX_HIGH_DATARATE) combined_dict = dict(zip(op_code_strs, info_strs))
oce.add(OpCode.TX_AND_RX_DEF_RATE, Info.TX_AND_RX_DEF_DATARATE) node = CmdTreeNode("com", "COM Subsystem")
oce.add(OpCode.UPDATE_DEFAULT_DATARATE_LOW, Info.UPDATE_DEFAULT_DATARATE_LOW) for op_code, info in combined_dict.items():
oce.add(OpCode.UPDATE_DEFAULT_DATARATE_HIGH, Info.UPDATE_DEFAULT_DATARATE_HIGH) node.add_child(CmdTreeNode(op_code, info))
oce.add(OpCode.CHANGE_TRANSMITTER_TIMEOUT, Info.CHANGE_TRANSMITTER_TIMEOUT) return node
oce.add(OpCode.READ_MODE, Info.READ_MODE)
oce.add(OpCode.ANNOUNCE_MODE, Info.ANNOUNCE_MODE)
oce.add(OpCode.ANNOUNCE_MODE_RECURSIVE, Info.ANNOUNCE_MODE_RECURSIVE)
defs.add_service(CustomServiceList.COM_SS, "COM Subsystem", oce)

View File

@ -1,14 +1,12 @@
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.tmtc.obj_prompt import prompt_object
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
tmtc_definitions_provider, CmdTreeNode,
TmtcDefinitionWrapper,
OpCodeEntry,
) )
from tmtccmd.tmtc import service_provider from tmtccmd.pus.s201_fsfw_health import FsfwHealth, Subservice
from tmtccmd.pus.s201_fsfw_health import Subservice, FsfwHealth from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.tmtc.decorator import ServiceProviderParams
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.tmtc.obj_prompt import prompt_object
class OpCode: class OpCode:
@ -30,11 +28,8 @@ def prompt_health() -> FsfwHealth:
return FsfwHealth(health_idx) return FsfwHealth(health_idx)
@service_provider(CustomServiceList.HEALTH) def pack_health_cmd(q: DefaultPusQueueHelper, cmd_str: str):
def pack_test_command(p: ServiceProviderParams): if cmd_str == OpCode.SET_HEALTH:
o = p.op_code
q = p.queue_helper
if o == OpCode.SET_HEALTH:
app_data = bytearray(prompt_object()) app_data = bytearray(prompt_object())
health = prompt_health() health = prompt_health()
app_data.append(health) app_data.append(health)
@ -44,7 +39,7 @@ def pack_test_command(p: ServiceProviderParams):
service=201, subservice=Subservice.TC_SET_HEALTH, app_data=app_data service=201, subservice=Subservice.TC_SET_HEALTH, app_data=app_data
) )
) )
elif o == OpCode.ANNOUNCE_HEALTH: elif cmd_str == OpCode.ANNOUNCE_HEALTH:
app_data = bytearray(prompt_object()) app_data = bytearray(prompt_object())
q.add_log_cmd(Info.ANNOUNCE_HEALTH) q.add_log_cmd(Info.ANNOUNCE_HEALTH)
q.add_pus_tc( q.add_pus_tc(
@ -52,19 +47,22 @@ def pack_test_command(p: ServiceProviderParams):
service=201, subservice=Subservice.TC_ANNOUNCE_HEALTH, app_data=app_data service=201, subservice=Subservice.TC_ANNOUNCE_HEALTH, app_data=app_data
) )
) )
elif o == OpCode.ANNOUNCE_HEALTH_ALL: elif cmd_str == OpCode.ANNOUNCE_HEALTH_ALL:
q.add_log_cmd(Info.ANNOUNCE_HEALTH_ALL) q.add_log_cmd(Info.ANNOUNCE_HEALTH_ALL)
q.add_pus_tc( q.add_pus_tc(
PusTelecommand(service=201, subservice=Subservice.TC_ANNOUNCE_HEALTH_ALL) PusTelecommand(service=201, subservice=Subservice.TC_ANNOUNCE_HEALTH_ALL)
) )
else: else:
raise ValueError(f"unknown op code {o} for service {CustomServiceList.HEALTH}") raise ValueError(
f"unknown command string {cmd_str} for service {CustomServiceList.HEALTH}"
)
@tmtc_definitions_provider def create_global_health_node() -> CmdTreeNode:
def add_health_cmd_defs(defs: TmtcDefinitionWrapper): health_node = CmdTreeNode("health", "Health Commands")
oce = OpCodeEntry() health_node.add_child(
oce.add(OpCode.ANNOUNCE_HEALTH_ALL, Info.ANNOUNCE_HEALTH_ALL) CmdTreeNode(OpCode.ANNOUNCE_HEALTH_ALL, Info.ANNOUNCE_HEALTH_ALL)
oce.add(OpCode.ANNOUNCE_HEALTH, Info.ANNOUNCE_HEALTH) )
oce.add(OpCode.SET_HEALTH, Info.SET_HEALTH) health_node.add_child(CmdTreeNode(OpCode.ANNOUNCE_HEALTH, Info.ANNOUNCE_HEALTH))
defs.add_service(CustomServiceList.HEALTH, info="Health Service", op_code_entry=oce) health_node.add_child(CmdTreeNode(OpCode.SET_HEALTH, Info.SET_HEALTH))
return health_node

View File

@ -151,10 +151,10 @@ def create_ploc_mpsoc_node() -> CmdTreeNode:
] ]
info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")] info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
combined_dict = dict(zip(op_code_strs, info_strs)) combined_dict = dict(zip(op_code_strs, info_strs))
ploc_mpsoc = CmdTreeNode("ploc_mpsoc", "PLOC MPSoC") node = CmdTreeNode("ploc_mpsoc", "PLOC MPSoC")
for op_code, info in combined_dict.items(): for op_code, info in combined_dict.items():
ploc_mpsoc.add_child(CmdTreeNode(op_code, info)) node.add_child(CmdTreeNode(op_code, info))
return ploc_mpsoc return node
@tmtc_definitions_provider @tmtc_definitions_provider
@ -679,7 +679,9 @@ def handle_mpsoc_data_reply(action_id: int, pw: PrintWrapper, custom_data: bytea
current_idx = 0 current_idx = 0
dir_name_short = custom_data[current_idx : current_idx + 12].decode("utf-8") dir_name_short = custom_data[current_idx : current_idx + 12].decode("utf-8")
current_idx += 12 current_idx += 12
num_elements = struct.unpack("!I", custom_data[current_idx : current_idx + 4])[0] num_elements = struct.unpack("!I", custom_data[current_idx : current_idx + 4])[
0
]
elem_names = [] elem_names = []
elem_attrs = [] elem_attrs = []
elem_sizes = [] elem_sizes = []

View File

@ -1,14 +1,11 @@
import enum import enum
from typing import Dict, Tuple from typing import Dict, Tuple
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import PL_SUBSYSTEM_ID from eive_tmtc.config.object_ids import PL_SUBSYSTEM_ID
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand
from eive_tmtc.tmtc.common import pack_mode_cmd_with_info from eive_tmtc.tmtc.common import pack_mode_cmd_with_info
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry from tmtccmd.config import CmdTreeNode
from tmtccmd.config.tmtc import tmtc_definitions_provider from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.tmtc import service_provider
from tmtccmd.tmtc.decorator import ServiceProviderParams
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice
@ -36,12 +33,9 @@ HANDLER_LIST: Dict[str, Tuple[int, str]] = {
} }
@service_provider(CustomServiceList.PL_SS) def build_acs_subsystem_cmd(q: DefaultPusQueueHelper, cmd_str: str):
def build_acs_subsystem_cmd(p: ServiceProviderParams):
op_code = p.op_code
q = p.queue_helper
info_prefix = "ACS Subsystem" info_prefix = "ACS Subsystem"
if op_code in OpCode.REPORT_ALL_MODES: if cmd_str == OpCode.REPORT_ALL_MODES:
q.add_log_cmd(f"{info_prefix}: {Info.REPORT_ALL_MODES}") q.add_log_cmd(f"{info_prefix}: {Info.REPORT_ALL_MODES}")
q.add_pus_tc( q.add_pus_tc(
PusTelecommand( PusTelecommand(
@ -50,7 +44,7 @@ def build_acs_subsystem_cmd(p: ServiceProviderParams):
app_data=PL_SUBSYSTEM_ID, app_data=PL_SUBSYSTEM_ID,
) )
) )
mode_info_tup = HANDLER_LIST.get(op_code) mode_info_tup = HANDLER_LIST.get(cmd_str)
if mode_info_tup is None: if mode_info_tup is None:
return return
pack_mode_cmd_with_info( pack_mode_cmd_with_info(
@ -62,9 +56,8 @@ def build_acs_subsystem_cmd(p: ServiceProviderParams):
) )
@tmtc_definitions_provider def create_payload_subsystem_node() -> CmdTreeNode:
def add_payload_subsystem_cmds(defs: TmtcDefinitionWrapper): payload_node = CmdTreeNode("payload", "Payload Subsystem")
oce = OpCodeEntry() payload_node.add_child(CmdTreeNode(OpCode.OFF, Info.OFF))
oce.add(OpCode.OFF, Info.OFF) payload_node.add_child(CmdTreeNode(OpCode.REPORT_ALL_MODES, Info.REPORT_ALL_MODES))
oce.add(OpCode.REPORT_ALL_MODES, Info.REPORT_ALL_MODES) return payload_node
defs.add_service(CustomServiceList.PL_SS, "Payload Subsystem", oce)

View File

@ -1,2 +0,0 @@
from .subsystem import add_eps_subsystem_cmds
from .pwr_ctrl import pwr_cmd_defs

View File

@ -1,23 +1,19 @@
import enum import enum
import struct import struct
from eive_tmtc.pus_tm.defs import PrintWrapper
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import BPX_HANDLER_ID
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
tmtc_definitions_provider, CmdTreeNode,
TmtcDefinitionWrapper,
OpCodeEntry,
) )
from tmtccmd.tmtc import service_provider
from tmtccmd.tmtc.decorator import ServiceProviderParams
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices
from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.tmtc import DefaultPusQueueHelper
from eive_tmtc.config.object_ids import BPX_HANDLER_ID
from eive_tmtc.pus_tm.defs import PrintWrapper
class BpxSetId(enum.IntEnum): class BpxSetId(enum.IntEnum):
@ -54,38 +50,38 @@ class BpxOpCode:
REBOOT = "reboot" REBOOT = "reboot"
@tmtc_definitions_provider def create_bpx_batt_node() -> CmdTreeNode:
def add_bpx_cmd_definitions(defs: TmtcDefinitionWrapper): node = CmdTreeNode("bat", "BPX battery device")
oce = OpCodeEntry() node.add_child(CmdTreeNode(BpxOpCode.ON, "ON command"))
oce.add(keys=BpxOpCode.ON, info="On command") node.add_child(CmdTreeNode(BpxOpCode.OFF, "OFF command"))
oce.add(keys=BpxOpCode.OFF, info="Off command") node.add_child(CmdTreeNode(BpxOpCode.HK, "HK command"))
oce.add(keys=BpxOpCode.HK, info="Request BPX HK") node.add_child(CmdTreeNode(BpxOpCode.RST_BOOT_CNT, "Reset boot count"))
oce.add(keys=BpxOpCode.RST_BOOT_CNT, info="Reset Boot Count") node.add_child(
oce.add(keys=BpxOpCode.RST_CFG, info="Reset Config to stored default settings") CmdTreeNode(BpxOpCode.RST_CFG, "Reset Config to stored default settings")
oce.add(keys=BpxOpCode.SET_CFG, info="Set BPX configuration")
oce.add(keys=BpxOpCode.MAN_HEATER_ON, info="Manual heater on")
oce.add(keys=BpxOpCode.MAN_HEATER_OFF, info="Manual heater off")
oce.add(keys=BpxOpCode.REQUEST_CFG, info="Request Configuration Struct (Step 1)")
oce.add(
keys=BpxOpCode.REQUEST_CFG_HK, info="Request Configuration Struct HK (Step 2)"
) )
oce.add(keys=BpxOpCode.REBOOT, info="Reboot Command") node.add_child(CmdTreeNode(BpxOpCode.SET_CFG, "Set BPX configuration"))
defs.add_service( node.add_child(CmdTreeNode(BpxOpCode.MAN_HEATER_ON, "Manual heater on"))
name=CustomServiceList.BPX_BATTERY.value, node.add_child(CmdTreeNode(BpxOpCode.MAN_HEATER_OFF, "Manual heater off"))
info="BPX Battery Handler", node.add_child(
op_code_entry=oce, CmdTreeNode(BpxOpCode.REQUEST_CFG, "Request Configuration Struct (Step 1)")
) )
node.add_child(
CmdTreeNode(
BpxOpCode.REQUEST_CFG_HK, "Request Configuration Struct HK (Step 2)"
)
)
node.add_child(CmdTreeNode(BpxOpCode.REBOOT, "Reboot Command"))
return node
@service_provider(CustomServiceList.BPX_BATTERY.value) def pack_bpx_commands(
def pack_bpx_commands(p: ServiceProviderParams): # noqa C901: Complexity is okay here. q: DefaultPusQueueHelper, cmd_str: str
op_code = p.op_code ): # noqa C901: Complexity is okay here.
q = p.queue_helper if cmd_str == BpxOpCode.HK:
if op_code == BpxOpCode.HK:
q.add_log_cmd("Requesting BPX battery HK set") q.add_log_cmd("Requesting BPX battery HK set")
sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetId.GET_HK_SET) sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetId.GET_HK_SET)
q.add_pus_tc(generate_one_hk_command(sid=sid)) q.add_pus_tc(generate_one_hk_command(sid=sid))
if op_code == BpxOpCode.OFF: if cmd_str == BpxOpCode.OFF:
q.add_log_cmd("Off mode") q.add_log_cmd("Off mode")
mode_cmd = pack_mode_data(BPX_HANDLER_ID, Mode.OFF, 0) mode_cmd = pack_mode_data(BPX_HANDLER_ID, Mode.OFF, 0)
q.add_pus_tc( q.add_pus_tc(
@ -95,7 +91,7 @@ def pack_bpx_commands(p: ServiceProviderParams): # noqa C901: Complexity is oka
app_data=mode_cmd, app_data=mode_cmd,
) )
) )
if op_code == BpxOpCode.ON: if cmd_str == BpxOpCode.ON:
q.add_log_cmd("On mode") q.add_log_cmd("On mode")
mode_cmd = pack_mode_data(BPX_HANDLER_ID, Mode.ON, 0) mode_cmd = pack_mode_data(BPX_HANDLER_ID, Mode.ON, 0)
q.add_pus_tc( q.add_pus_tc(
@ -105,21 +101,21 @@ def pack_bpx_commands(p: ServiceProviderParams): # noqa C901: Complexity is oka
app_data=mode_cmd, app_data=mode_cmd,
) )
) )
if op_code == BpxOpCode.RST_BOOT_CNT: if cmd_str == BpxOpCode.RST_BOOT_CNT:
q.add_log_cmd("Resetting reboot counters") q.add_log_cmd("Resetting reboot counters")
q.add_pus_tc( q.add_pus_tc(
create_action_cmd( create_action_cmd(
object_id=BPX_HANDLER_ID, action_id=BpxActionId.RESET_COUNTERS object_id=BPX_HANDLER_ID, action_id=BpxActionId.RESET_COUNTERS
) )
) )
if op_code == BpxOpCode.RST_CFG: if cmd_str == BpxOpCode.RST_CFG:
q.add_log_cmd("Reset BPX configuration") q.add_log_cmd("Reset BPX configuration")
q.add_pus_tc( q.add_pus_tc(
create_action_cmd( create_action_cmd(
object_id=BPX_HANDLER_ID, action_id=BpxActionId.CONFIG_CMD object_id=BPX_HANDLER_ID, action_id=BpxActionId.CONFIG_CMD
) )
) )
if op_code == BpxOpCode.SET_CFG: if cmd_str == BpxOpCode.SET_CFG:
q.add_log_cmd("Setting BPX configuration") q.add_log_cmd("Setting BPX configuration")
user_data = bytearray() user_data = bytearray()
batt_mode = BpxHeaterModeSelect( batt_mode = BpxHeaterModeSelect(
@ -137,21 +133,21 @@ def pack_bpx_commands(p: ServiceProviderParams): # noqa C901: Complexity is oka
user_data=user_data, user_data=user_data,
) )
) )
if op_code == BpxOpCode.REQUEST_CFG: if cmd_str == BpxOpCode.REQUEST_CFG:
q.add_log_cmd("Requesting configuration struct") q.add_log_cmd("Requesting configuration struct")
q.add_pus_tc( q.add_pus_tc(
create_action_cmd(object_id=BPX_HANDLER_ID, action_id=BpxActionId.GET_CFG) create_action_cmd(object_id=BPX_HANDLER_ID, action_id=BpxActionId.GET_CFG)
) )
if op_code == BpxOpCode.REQUEST_CFG_HK: if cmd_str == BpxOpCode.REQUEST_CFG_HK:
q.add_log_cmd("Requesting configuration struct HK") q.add_log_cmd("Requesting configuration struct HK")
sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetId.GET_CFG_SET) sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetId.GET_CFG_SET)
q.add_pus_tc(generate_one_hk_command(sid=sid)) q.add_pus_tc(generate_one_hk_command(sid=sid))
if op_code == BpxOpCode.REBOOT: if cmd_str == BpxOpCode.REBOOT:
q.add_log_cmd("Rebooting BPX battery") q.add_log_cmd("Rebooting BPX battery")
q.add_pus_tc( q.add_pus_tc(
create_action_cmd(object_id=BPX_HANDLER_ID, action_id=BpxActionId.REBOOT) create_action_cmd(object_id=BPX_HANDLER_ID, action_id=BpxActionId.REBOOT)
) )
if op_code == BpxOpCode.MAN_HEATER_ON: if cmd_str == BpxOpCode.MAN_HEATER_ON:
q.add_log_cmd("BPX manual heater on with seconds burntime") q.add_log_cmd("BPX manual heater on with seconds burntime")
burn_time = int(input("BPX heater burn time in seconds [1-65535]: ")) burn_time = int(input("BPX heater burn time in seconds [1-65535]: "))
if burn_time < 1 or burn_time > 65535: if burn_time < 1 or burn_time > 65535:
@ -163,7 +159,7 @@ def pack_bpx_commands(p: ServiceProviderParams): # noqa C901: Complexity is oka
user_data=struct.pack("!H", burn_time), user_data=struct.pack("!H", burn_time),
) )
) )
if op_code == BpxOpCode.MAN_HEATER_OFF: if cmd_str == BpxOpCode.MAN_HEATER_OFF:
q.add_log_cmd("BPX manual heater off") q.add_log_cmd("BPX manual heater off")
q.add_pus_tc( q.add_pus_tc(
create_action_cmd( create_action_cmd(

View File

@ -3,32 +3,26 @@ import enum
import logging import logging
import struct import struct
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import PWR_CONTROLLER
from eive_tmtc.pus_tm.defs import PrintWrapper
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
tmtc_definitions_provider, CmdTreeNode,
TmtcDefinitionWrapper,
OpCodeEntry,
)
from tmtccmd.tmtc import service_provider
from tmtccmd.tmtc.queue import DefaultPusQueueHelper
from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_command
from tmtccmd.tmtc.decorator import ServiceProviderParams
from tmtccmd.pus.tc.s3_fsfw_hk import (
generate_one_hk_command,
make_sid,
enable_periodic_hk_command_with_interval,
disable_periodic_hk_command,
) )
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.pus.s20_fsfw_param import create_load_param_cmd from tmtccmd.pus.s20_fsfw_param import create_load_param_cmd
from tmtccmd.pus.s20_fsfw_param_defs import ( from tmtccmd.pus.s20_fsfw_param_defs import (
create_scalar_float_parameter,
create_scalar_double_parameter, create_scalar_double_parameter,
create_scalar_float_parameter,
) )
from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_command
from tmtccmd.pus.tc.s3_fsfw_hk import (
disable_periodic_hk_command,
enable_periodic_hk_command_with_interval,
generate_one_hk_command,
make_sid,
)
from tmtccmd.tmtc.queue import DefaultPusQueueHelper
from eive_tmtc.config.object_ids import PWR_CONTROLLER
from eive_tmtc.pus_tm.defs import PrintWrapper
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -51,17 +45,17 @@ class ParamId(enum.IntEnum):
HIGHER_MODES_LIMIT = 6 HIGHER_MODES_LIMIT = 6
class OpCodes: class OpCode:
OFF = ["mode_off"] OFF = "off"
ON = ["mode_on"] ON = "on"
NML = ["mode_normal"] NML = "normal"
SET_PARAMETER = ["set_parameter"] SET_PARAMETER = "set_parameter"
REQUEST_CORE_HK = ["core_hk"] REQUEST_CORE_HK = "core_hk"
ENABLE_CORE_HK = ["core_enable_hk"] ENABLE_CORE_HK = "core_enable_hk"
DISABLE_CORE_HK = ["core_disable_hk"] DISABLE_CORE_HK = "core_disable_hk"
REQUEST_ENABLE_PL_HK = ["enable_pl_hk"] REQUEST_ENABLE_PL_HK = "enable_pl_hk"
ENABLE_ENABLE_PL_HK = ["enable_pl_enable_hk"] ENABLE_ENABLE_PL_HK = "enable_pl_enable_hk"
DISABLE_ENABLE_PL_HK = ["enable_pl_disable_hk"] DISABLE_ENABLE_PL_HK = "enable_pl_disable_hk"
class Info: class Info:
@ -77,46 +71,37 @@ class Info:
DISABLE_ENABLE_PL_HK = "Disable Enable PL HK Data Generation" DISABLE_ENABLE_PL_HK = "Disable Enable PL HK Data Generation"
@tmtc_definitions_provider def create_pwr_ctrl_node() -> CmdTreeNode:
def pwr_cmd_defs(defs: TmtcDefinitionWrapper): op_code_strs = [
oce = OpCodeEntry() getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__")
oce.add(keys=OpCodes.OFF, info=Info.OFF) ]
oce.add(keys=OpCodes.ON, info=Info.ON) info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
oce.add(keys=OpCodes.NML, info=Info.NML) combined_dict = dict(zip(op_code_strs, info_strs))
oce.add(keys=OpCodes.SET_PARAMETER, info=Info.SET_PARAMETER) node = CmdTreeNode("pwr_ctrl", "Power Controller")
oce.add(keys=OpCodes.REQUEST_CORE_HK, info=Info.REQUEST_CORE_HK) for op_code, info in combined_dict.items():
oce.add(keys=OpCodes.ENABLE_CORE_HK, info=Info.ENABLE_CORE_HK) node.add_child(CmdTreeNode(op_code, info))
oce.add(keys=OpCodes.DISABLE_CORE_HK, info=Info.DISABLE_CORE_HK) return node
oce.add(keys=OpCodes.REQUEST_ENABLE_PL_HK, info=Info.REQUEST_ENABLE_PL_HK)
oce.add(keys=OpCodes.ENABLE_ENABLE_PL_HK, info=Info.ENABLE_ENABLE_PL_HK)
oce.add(keys=OpCodes.DISABLE_ENABLE_PL_HK, info=Info.DISABLE_ENABLE_PL_HK)
defs.add_service(
name=CustomServiceList.PWR_CTRL.value, info="PWR Controller", op_code_entry=oce
)
@service_provider(CustomServiceList.PWR_CTRL.value) def pack_acs_ctrl_command(q: DefaultPusQueueHelper, cmd_str: str):
def pack_acs_ctrl_command(p: ServiceProviderParams): if cmd_str == OpCode.OFF:
op_code = p.op_code
q = p.queue_helper
if op_code in OpCodes.OFF:
q.add_log_cmd(f"{Info.OFF}") q.add_log_cmd(f"{Info.OFF}")
q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.OFF, 0)) q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.OFF, 0))
elif op_code in OpCodes.ON: elif cmd_str == OpCode.ON:
q.add_log_cmd(f"{Info.ON}") q.add_log_cmd(f"{Info.ON}")
q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.ON, 0)) q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.ON, 0))
elif op_code in OpCodes.NML: elif cmd_str == OpCode.NML:
q.add_log_cmd(f"{Info.NML}") q.add_log_cmd(f"{Info.NML}")
q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.NORMAL, 0)) q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.NORMAL, 0))
elif op_code in OpCodes.SET_PARAMETER: elif cmd_str in OpCode.SET_PARAMETER:
q.add_log_cmd(f"{Info.SET_PARAMETER}") q.add_log_cmd(f"{Info.SET_PARAMETER}")
set_pwr_ctrl_param(q) set_pwr_ctrl_param(q)
elif op_code in OpCodes.REQUEST_CORE_HK: elif cmd_str == OpCode.REQUEST_CORE_HK:
q.add_log_cmd(Info.REQUEST_CORE_HK) q.add_log_cmd(Info.REQUEST_CORE_HK)
q.add_pus_tc( q.add_pus_tc(
generate_one_hk_command(make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET)) generate_one_hk_command(make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET))
) )
elif op_code in OpCodes.ENABLE_CORE_HK: elif cmd_str == OpCode.ENABLE_CORE_HK:
interval = float(input("Please specify interval in floating point seconds: ")) interval = float(input("Please specify interval in floating point seconds: "))
q.add_log_cmd(Info.ENABLE_CORE_HK) q.add_log_cmd(Info.ENABLE_CORE_HK)
cmd_tuple = enable_periodic_hk_command_with_interval( cmd_tuple = enable_periodic_hk_command_with_interval(
@ -124,19 +109,19 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
) )
q.add_pus_tc(cmd_tuple[0]) q.add_pus_tc(cmd_tuple[0])
q.add_pus_tc(cmd_tuple[1]) q.add_pus_tc(cmd_tuple[1])
elif op_code in OpCodes.DISABLE_CORE_HK: elif cmd_str == OpCode.DISABLE_CORE_HK:
q.add_log_cmd(Info.DISABLE_CORE_HK) q.add_log_cmd(Info.DISABLE_CORE_HK)
q.add_pus_tc( q.add_pus_tc(
disable_periodic_hk_command( disable_periodic_hk_command(
False, make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET) False, make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET)
) )
) )
elif op_code in OpCodes.REQUEST_ENABLE_PL_HK: elif cmd_str == OpCode.REQUEST_ENABLE_PL_HK:
q.add_log_cmd(Info.REQUEST_ENABLE_PL_HK) q.add_log_cmd(Info.REQUEST_ENABLE_PL_HK)
q.add_pus_tc( q.add_pus_tc(
generate_one_hk_command(make_sid(PWR_CONTROLLER, SetId.ENABLE_PL_SET)) generate_one_hk_command(make_sid(PWR_CONTROLLER, SetId.ENABLE_PL_SET))
) )
elif op_code in OpCodes.ENABLE_ENABLE_PL_HK: elif cmd_str == OpCode.ENABLE_ENABLE_PL_HK:
interval = float(input("Please specify interval in floating point seconds: ")) interval = float(input("Please specify interval in floating point seconds: "))
q.add_log_cmd(Info.ENABLE_ENABLE_PL_HK) q.add_log_cmd(Info.ENABLE_ENABLE_PL_HK)
cmd_tuple = enable_periodic_hk_command_with_interval( cmd_tuple = enable_periodic_hk_command_with_interval(
@ -144,7 +129,7 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
) )
q.add_pus_tc(cmd_tuple[0]) q.add_pus_tc(cmd_tuple[0])
q.add_pus_tc(cmd_tuple[1]) q.add_pus_tc(cmd_tuple[1])
elif op_code in OpCodes.DISABLE_ENABLE_PL_HK: elif cmd_str == OpCode.DISABLE_ENABLE_PL_HK:
q.add_log_cmd(Info.DISABLE_ENABLE_PL_HK) q.add_log_cmd(Info.DISABLE_ENABLE_PL_HK)
q.add_pus_tc( q.add_pus_tc(
disable_periodic_hk_command( disable_periodic_hk_command(

View File

@ -1,18 +1,16 @@
import enum import enum
from typing import Tuple, Dict from typing import Dict, Tuple
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand
from eive_tmtc.tmtc.common import pack_mode_cmd_with_info
from eive_tmtc.config.object_ids import EPS_SUBSYSTEM_ID
from eive_tmtc.config.definitions import CustomServiceList
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
tmtc_definitions_provider, CmdTreeNode,
TmtcDefinitionWrapper,
OpCodeEntry,
) )
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices, Mode from tmtccmd.pus.s200_fsfw_mode import Mode
from tmtccmd.tmtc import service_provider from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices
from tmtccmd.tmtc.decorator import ServiceProviderParams from tmtccmd.tmtc import DefaultPusQueueHelper
from eive_tmtc.config.object_ids import EPS_SUBSYSTEM_ID
from eive_tmtc.tmtc.common import pack_mode_cmd_with_info
class OpCode(str, enum.Enum): class OpCode(str, enum.Enum):
@ -33,12 +31,9 @@ HANDLER_LIST: Dict[str, Tuple[int, int, str]] = {
} }
@service_provider(CustomServiceList.EPS_SS.value) def build_eps_subsystem_cmd(q: DefaultPusQueueHelper, cmd_str: str):
def build_eps_subsystem_cmd(p: ServiceProviderParams):
op_code = p.op_code
q = p.queue_helper
info_prefix = "EPS Subsystem" info_prefix = "EPS Subsystem"
if op_code in OpCode.REPORT_ALL_MODES: if cmd_str in OpCode.REPORT_ALL_MODES:
q.add_log_cmd(f"{info_prefix}: {Info.REPORT_ALL_MODES}") q.add_log_cmd(f"{info_prefix}: {Info.REPORT_ALL_MODES}")
q.add_pus_tc( q.add_pus_tc(
PusTelecommand( PusTelecommand(
@ -47,7 +42,7 @@ def build_eps_subsystem_cmd(p: ServiceProviderParams):
app_data=EPS_SUBSYSTEM_ID, app_data=EPS_SUBSYSTEM_ID,
) )
) )
mode_info_tup = HANDLER_LIST.get(op_code) mode_info_tup = HANDLER_LIST.get(cmd_str)
if mode_info_tup is None: if mode_info_tup is None:
return return
pack_mode_cmd_with_info( pack_mode_cmd_with_info(
@ -59,10 +54,9 @@ def build_eps_subsystem_cmd(p: ServiceProviderParams):
) )
@tmtc_definitions_provider def create_eps_subsystem_node() -> CmdTreeNode:
def add_eps_subsystem_cmds(defs: TmtcDefinitionWrapper): eps_node = CmdTreeNode("eps", "EPS Subsystem")
oce = OpCodeEntry() for cmd_str, (_, _, info) in HANDLER_LIST.items():
for op_code, (_, _, info) in HANDLER_LIST.items(): eps_node.add_child(CmdTreeNode(cmd_str, info))
oce.add(op_code, info) eps_node.add_child(CmdTreeNode(OpCode.REPORT_ALL_MODES, Info.REPORT_ALL_MODES))
oce.add(OpCode.REPORT_ALL_MODES, Info.REPORT_ALL_MODES) return eps_node
defs.add_service(CustomServiceList.EPS_SS, "EPS Subsystem", oce)

View File

@ -3,18 +3,18 @@ import enum
from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.tmtc.acs.subsystem import AcsMode from eive_tmtc.tmtc.acs.subsystem import AcsMode
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
CmdTreeNode,
tmtc_definitions_provider, tmtc_definitions_provider,
TmtcDefinitionWrapper, TmtcDefinitionWrapper,
OpCodeEntry, OpCodeEntry,
) )
from tmtccmd.tmtc import service_provider from tmtccmd.tmtc import DefaultPusQueueHelper
from eive_tmtc.config.object_ids import EIVE_SYSTEM_ID from eive_tmtc.config.object_ids import EIVE_SYSTEM_ID
from tmtccmd.pus.s200_fsfw_mode import ( from tmtccmd.pus.s200_fsfw_mode import (
create_mode_command, create_mode_command,
create_announce_mode_recursive_command, create_announce_mode_recursive_command,
) )
from tmtccmd.pus.s8_fsfw_action import create_action_cmd from tmtccmd.pus.s8_fsfw_action import create_action_cmd
from tmtccmd.tmtc.decorator import ServiceProviderParams
class SystemMode: class SystemMode:
@ -55,40 +55,49 @@ class Info:
REBOOT_I2C = "Reboot I2C bus" REBOOT_I2C = "Reboot I2C bus"
@service_provider(CustomServiceList.SYSTEM.value) def build_system_cmds(q: DefaultPusQueueHelper, cmd_str: str):
def build_system_cmds(p: ServiceProviderParams):
o = p.op_code
q = p.queue_helper
prefix = "EIVE System" prefix = "EIVE System"
if o == OpCode.SAFE_MODE: if cmd_str == OpCode.SAFE_MODE:
q.add_log_cmd(f"{prefix}: {Info.SAFE_MODE}") q.add_log_cmd(f"{prefix}: {Info.SAFE_MODE}")
q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.SAFE, 0)) q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.SAFE, 0))
elif o == OpCode.IDLE_MODE: elif cmd_str == OpCode.IDLE_MODE:
q.add_log_cmd(f"{prefix}: {Info.IDLE_MODE}") q.add_log_cmd(f"{prefix}: {Info.IDLE_MODE}")
q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.IDLE, 0)) q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.IDLE, 0))
elif o == OpCode.NADIR_MODE: elif cmd_str == OpCode.NADIR_MODE:
q.add_log_cmd(f"{prefix}: {Info.NADIR_MODE}") q.add_log_cmd(f"{prefix}: {Info.NADIR_MODE}")
q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.PTG_NADIR, 0)) q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.PTG_NADIR, 0))
elif o == OpCode.TARGET_MODE: elif cmd_str == OpCode.TARGET_MODE:
q.add_log_cmd(f"{prefix}: {Info.TARGET_MODE}") q.add_log_cmd(f"{prefix}: {Info.TARGET_MODE}")
q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.PTG_TARGET, 0)) q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.PTG_TARGET, 0))
elif o == OpCode.TARGET_GS_MODE: elif cmd_str == OpCode.TARGET_GS_MODE:
q.add_log_cmd(f"{prefix}: {Info.TARGET_GS_MODE}") q.add_log_cmd(f"{prefix}: {Info.TARGET_GS_MODE}")
q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.PTG_TARGET_GS, 0)) q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.PTG_TARGET_GS, 0))
elif o == OpCode.INERTIAL_MODE: elif cmd_str == OpCode.INERTIAL_MODE:
q.add_log_cmd(f"{prefix}: {Info.INERTIAL_MODE}") q.add_log_cmd(f"{prefix}: {Info.INERTIAL_MODE}")
q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.PTG_INERTIAL, 0)) q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.PTG_INERTIAL, 0))
elif o == OpCode.ANNOUNCE_MODES: elif cmd_str == OpCode.ANNOUNCE_MODES:
q.add_log_cmd(f"{prefix}: {Info.ANNOUNCE_MODES}") q.add_log_cmd(f"{prefix}: {Info.ANNOUNCE_MODES}")
q.add_pus_tc(create_announce_mode_recursive_command(EIVE_SYSTEM_ID)) q.add_pus_tc(create_announce_mode_recursive_command(EIVE_SYSTEM_ID))
elif o == OpCode.BOOT_MODE: elif cmd_str == OpCode.BOOT_MODE:
q.add_log_cmd(f"{prefix}: {Info.BOOT_MODE}") q.add_log_cmd(f"{prefix}: {Info.BOOT_MODE}")
q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.BOOT, 0)) q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, SystemMode.BOOT, 0))
elif o == OpCode.REBOOT_I2C: elif cmd_str == OpCode.REBOOT_I2C:
q.add_log_cmd(f"{prefix}: {Info.REBOOT_I2C}") q.add_log_cmd(f"{prefix}: {Info.REBOOT_I2C}")
q.add_pus_tc(create_action_cmd(EIVE_SYSTEM_ID, ActionId.EXECUTE_I2C_REBOOT)) q.add_pus_tc(create_action_cmd(EIVE_SYSTEM_ID, ActionId.EXECUTE_I2C_REBOOT))
def create_system_node() -> CmdTreeNode:
op_code_strs = [
getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__")
]
info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
combined_dict = dict(zip(op_code_strs, info_strs))
node = CmdTreeNode("system", "EIVE System")
for op_code, info in combined_dict.items():
node.add_child(CmdTreeNode(op_code, info))
return node
@tmtc_definitions_provider @tmtc_definitions_provider
def add_system_cmd_defs(defs: TmtcDefinitionWrapper): def add_system_cmd_defs(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry() oce = OpCodeEntry()

View File

@ -1,16 +1,10 @@
from spacepackets.ecss import PusTelecommand, PusService from spacepackets.ecss import PusService, PusTelecommand
from tmtccmd.config import CoreServiceList from tmtccmd.config import CmdTreeNode
from tmtccmd.config.tmtc import (
tmtc_definitions_provider,
TmtcDefinitionWrapper,
OpCodeEntry,
)
from tmtccmd.pus.s17_test import create_service_17_ping_command from tmtccmd.pus.s17_test import create_service_17_ping_command
from tmtccmd.tmtc import service_provider from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.tmtc.decorator import ServiceProviderParams
class OpCodes: class OpCode:
PING = "ping" PING = "ping"
TRIGGER_EVENT = "trig_event" TRIGGER_EVENT = "trig_event"
PING_WITH_DATA = "ping_with_data" PING_WITH_DATA = "ping_with_data"
@ -22,31 +16,22 @@ class Info:
PING_WITH_DATA = "Ping with data. Size of sent data is sent back" PING_WITH_DATA = "Ping with data. Size of sent data is sent back"
@tmtc_definitions_provider def create_test_node() -> CmdTreeNode:
def add_test_defs(defs: TmtcDefinitionWrapper): node = CmdTreeNode("test", "Test Commands")
oce = OpCodeEntry() node.add_child(CmdTreeNode(OpCode.PING, Info.PING))
oce.add(keys=OpCodes.PING, info=Info.PING) node.add_child(CmdTreeNode(OpCode.TRIGGER_EVENT, Info.TRIGGER_EVENT))
oce.add(keys=OpCodes.TRIGGER_EVENT, info=Info.TRIGGER_EVENT) node.add_child(CmdTreeNode(OpCode.PING_WITH_DATA, Info.PING_WITH_DATA))
oce.add(keys=OpCodes.PING_WITH_DATA, info=Info.PING_WITH_DATA) return node
defs.add_service(
name=CoreServiceList.SERVICE_17_ALT,
info="PUS 17 Test Service",
op_code_entry=oce,
)
@service_provider(CoreServiceList.SERVICE_17_ALT) def pack_test_command(q: DefaultPusQueueHelper, cmd_path: str):
def pack_test_command(p: ServiceProviderParams): if cmd_path == OpCode.PING:
info = p.info
q = p.queue_helper
if info.op_code == OpCodes.PING:
q.add_log_cmd("Sending PUS TC [17,1]") q.add_log_cmd("Sending PUS TC [17,1]")
q.add_pus_tc(create_service_17_ping_command()) q.add_pus_tc(create_service_17_ping_command())
if info.op_code == OpCodes.TRIGGER_EVENT: if cmd_path == OpCode.TRIGGER_EVENT:
q.add_log_cmd("Sending PUS TC Event Trigger [17, 128]") q.add_log_cmd("Sending PUS TC Event Trigger [17, 128]")
q.add_pus_tc(PusTelecommand(service=PusService.S17_TEST, subservice=128)) q.add_pus_tc(PusTelecommand(service=PusService.S17_TEST, subservice=128))
if info.op_code == OpCodes.PING_WITH_DATA: if cmd_path == OpCode.PING_WITH_DATA:
q.add_log_cmd("Sending Ping With Data, Size Reported Back [17, 129]") q.add_log_cmd("Sending Ping With Data, Size Reported Back [17, 129]")
while True: while True:
data_size = int(input("Please specify data size [0-1024]: ")) data_size = int(input("Please specify data size [0-1024]: "))
@ -55,7 +40,7 @@ def pack_test_command(p: ServiceProviderParams):
break break
dummy_data = bytearray() dummy_data = bytearray()
next_byte = True next_byte = True
for i in range(data_size): for _ in range(data_size):
dummy_data.append(int(next_byte)) dummy_data.append(int(next_byte))
next_byte = not next_byte next_byte = not next_byte
q.add_pus_tc( q.add_pus_tc(

View File

@ -3,26 +3,25 @@ import enum
import logging import logging
import math import math
import struct import struct
from typing import Tuple
from dateutil.parser import parse
from spacepackets.ecss.pus_15_tm_storage import Subservice
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config import CmdTreeNode, TmtcDefinitionWrapper
from tmtccmd.config.tmtc import OpCodeEntry
from tmtccmd.tmtc.queue import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import ( from eive_tmtc.config.object_ids import (
CFDP_TM_STORE,
HK_TM_STORE, HK_TM_STORE,
MISC_TM_STORE, MISC_TM_STORE,
OK_TM_STORE,
NOT_OK_TM_STORE, NOT_OK_TM_STORE,
CFDP_TM_STORE, OK_TM_STORE,
get_object_ids, get_object_ids,
) )
from eive_tmtc.config.definitions import CustomServiceList
from tmtccmd.config import TmtcDefinitionWrapper
from tmtccmd.config.tmtc import tmtc_definitions_provider, OpCodeEntry
from tmtccmd.tmtc import service_provider
from tmtccmd.tmtc.decorator import ServiceProviderParams
from dateutil.parser import parse
from spacepackets.ecss import PusService # noqa
from spacepackets.ecss.tc import PusTelecommand
from spacepackets.ecss.pus_15_tm_storage import Subservice
from tmtccmd.util import ObjectIdU32
class OpCode: class OpCode:
@ -38,11 +37,8 @@ class Info:
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@service_provider(CustomServiceList.TM_STORE) def pack_tm_store_commands(q: DefaultPusQueueHelper, cmd_path: str):
def pack_tm_store_commands(p: ServiceProviderParams): if cmd_path == OpCode.DELETE_UP_TO:
q = p.queue_helper
o = p.op_code
if o == OpCode.DELETE_UP_TO:
obj_id, store_string = store_select_prompt() obj_id, store_string = store_select_prompt()
app_data = bytearray(obj_id.as_bytes) app_data = bytearray(obj_id.as_bytes)
delete_up_to_time = time_prompt("Determining deletion end time") delete_up_to_time = time_prompt("Determining deletion end time")
@ -56,7 +52,7 @@ def pack_tm_store_commands(p: ServiceProviderParams):
service=15, subservice=Subservice.DELETE_UP_TO, app_data=app_data service=15, subservice=Subservice.DELETE_UP_TO, app_data=app_data
) )
) )
elif o == OpCode.RETRIEVAL_BY_TIME_RANGE: elif cmd_path == OpCode.RETRIEVAL_BY_TIME_RANGE:
q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE) q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE)
obj_id, store_string = store_select_prompt() obj_id, store_string = store_select_prompt()
app_data = bytearray(obj_id.as_bytes) app_data = bytearray(obj_id.as_bytes)
@ -80,7 +76,15 @@ def pack_tm_store_commands(p: ServiceProviderParams):
) )
@tmtc_definitions_provider def create_persistent_tm_store_node() -> CmdTreeNode:
node = CmdTreeNode("tm_store", "Persistent TM Store")
node.add_child(CmdTreeNode(OpCode.DELETE_UP_TO, Info.DELETE_UP_TO))
node.add_child(
CmdTreeNode(OpCode.RETRIEVAL_BY_TIME_RANGE, Info.RETRIEVAL_BY_TIME_RANGE)
)
return node
def add_persistent_tm_store_cmd_defs(defs: TmtcDefinitionWrapper): def add_persistent_tm_store_cmd_defs(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry() oce = OpCodeEntry()
oce.add(keys=OpCode.DELETE_UP_TO, info=Info.DELETE_UP_TO) oce.add(keys=OpCode.DELETE_UP_TO, info=Info.DELETE_UP_TO)
@ -130,6 +134,7 @@ def time_prompt(info_str: str) -> datetime.datetime:
return time_prompt_fully_manually() return time_prompt_fully_manually()
elif time_input_key == 2: elif time_input_key == 2:
return time_prompt_offset_from_now() return time_prompt_offset_from_now()
raise ValueError("can not determine datetime")
def time_prompt_fully_manually() -> datetime.datetime: def time_prompt_fully_manually() -> datetime.datetime:
@ -160,7 +165,7 @@ def time_prompt_offset_from_now() -> datetime.datetime:
return time_now_with_offset return time_now_with_offset
def store_select_prompt() -> (ObjectIdU32, str): def store_select_prompt() -> Tuple[ObjectIdU32, str]:
obj_id_dict = get_object_ids() obj_id_dict = get_object_ids()
print("Available TM stores:") print("Available TM stores:")
for k, v in STORE_DICT.items(): for k, v in STORE_DICT.items():