add object prompt to test assy
This commit is contained in:
parent
82352bc5fa
commit
7f872c92c4
@ -11,11 +11,13 @@ from tmtccmd.tc.decorator import ServiceProviderParams
|
|||||||
|
|
||||||
|
|
||||||
class OpCode:
|
class OpCode:
|
||||||
|
SET_HEALTH = "set_health"
|
||||||
ANNOUNCE_HEALTH_ALL = "read_health_all"
|
ANNOUNCE_HEALTH_ALL = "read_health_all"
|
||||||
ANNOUNCE_HEALTH = "read_health"
|
ANNOUNCE_HEALTH = "read_health"
|
||||||
|
|
||||||
|
|
||||||
class Info:
|
class Info:
|
||||||
|
SET_HEALTH = "Set health of specific object"
|
||||||
ANNOUNCE_HEALTH_ALL = "Read all health states"
|
ANNOUNCE_HEALTH_ALL = "Read all health states"
|
||||||
ANNOUNCE_HEALTH = "Read health state of one object"
|
ANNOUNCE_HEALTH = "Read health state of one object"
|
||||||
|
|
||||||
@ -24,8 +26,10 @@ class Info:
|
|||||||
def pack_test_command(p: ServiceProviderParams):
|
def pack_test_command(p: ServiceProviderParams):
|
||||||
o = p.op_code
|
o = p.op_code
|
||||||
q = p.queue_helper
|
q = p.queue_helper
|
||||||
|
if o == OpCode.SET_HEALTH:
|
||||||
|
raise NotImplementedError
|
||||||
if o == OpCode.ANNOUNCE_HEALTH:
|
if o == OpCode.ANNOUNCE_HEALTH:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError
|
||||||
elif o == OpCode.ANNOUNCE_HEALTH_ALL:
|
elif o == OpCode.ANNOUNCE_HEALTH_ALL:
|
||||||
q.add_log_cmd(Info.ANNOUNCE_HEALTH_ALL)
|
q.add_log_cmd(Info.ANNOUNCE_HEALTH_ALL)
|
||||||
q.add_pus_tc(
|
q.add_pus_tc(
|
||||||
@ -40,4 +44,5 @@ def add_health_cmd_defs(defs: TmtcDefinitionWrapper):
|
|||||||
oce = OpCodeEntry()
|
oce = OpCodeEntry()
|
||||||
oce.add(OpCode.ANNOUNCE_HEALTH_ALL, Info.ANNOUNCE_HEALTH_ALL)
|
oce.add(OpCode.ANNOUNCE_HEALTH_ALL, Info.ANNOUNCE_HEALTH_ALL)
|
||||||
oce.add(OpCode.ANNOUNCE_HEALTH, Info.ANNOUNCE_HEALTH)
|
oce.add(OpCode.ANNOUNCE_HEALTH, Info.ANNOUNCE_HEALTH)
|
||||||
|
oce.add(OpCode.SET_HEALTH, Info.SET_HEALTH)
|
||||||
defs.add_service(CustomServiceList.HEALTH, info="Health Service", op_code_entry=oce)
|
defs.add_service(CustomServiceList.HEALTH, info="Health Service", op_code_entry=oce)
|
||||||
|
38
eive_tmtc/tmtc/obj_prompt.py
Normal file
38
eive_tmtc/tmtc/obj_prompt.py
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
from eive_tmtc.config.object_ids import (
|
||||||
|
ACS_SUBSYSTEM_ID,
|
||||||
|
ACS_CONTROLLER,
|
||||||
|
IMTQ_HANDLER_ID,
|
||||||
|
ACS_BOARD_ASS_ID,
|
||||||
|
RW_ASSEMBLY,
|
||||||
|
SUS_BOARD_ASS_ID
|
||||||
|
)
|
||||||
|
|
||||||
|
SUBSYSTEM_DICT = {
|
||||||
|
0: "acs",
|
||||||
|
1: "tcs",
|
||||||
|
2: "com",
|
||||||
|
}
|
||||||
|
|
||||||
|
ACS_OBJ_DICT = {
|
||||||
|
0: ("Subsystem", ACS_SUBSYSTEM_ID),
|
||||||
|
1: ("SUS Assembly", SUS_BOARD_ASS_ID),
|
||||||
|
2: ("ACS Board Assembly", ACS_BOARD_ASS_ID),
|
||||||
|
3: ("RW Assembly", RW_ASSEMBLY),
|
||||||
|
4: ("iMTQ MGT", IMTQ_HANDLER_ID),
|
||||||
|
}
|
||||||
|
|
||||||
|
def prompt_object() -> bytes:
|
||||||
|
for k, v in SUBSYSTEM_DICT:
|
||||||
|
print(f"{k}: {v}")
|
||||||
|
subsystem_key = int(input("Please specify target subsystem by key: "))
|
||||||
|
subsystem = SUBSYSTEM_DICT[subsystem_key]
|
||||||
|
if subsystem is None:
|
||||||
|
raise ValueError("invalid key")
|
||||||
|
if subsystem == "acs":
|
||||||
|
for k, v in ACS_OBJ_DICT:
|
||||||
|
print(f"{k}: {v[0]}")
|
||||||
|
obj_key = int(input("Please specify target object by key: "))
|
||||||
|
acs_obj = ACS_OBJ_DICT[obj_key]
|
||||||
|
if acs_obj is None:
|
||||||
|
raise ValueError("invalid key")
|
||||||
|
return acs_obj[1]
|
Loading…
Reference in New Issue
Block a user