From 7f872c92c44be0de76eaedbdd435e27e6467a678 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 6 Mar 2023 12:03:51 +0100 Subject: [PATCH] add object prompt to test assy --- eive_tmtc/tmtc/health.py | 7 ++++++- eive_tmtc/tmtc/obj_prompt.py | 38 ++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 eive_tmtc/tmtc/obj_prompt.py diff --git a/eive_tmtc/tmtc/health.py b/eive_tmtc/tmtc/health.py index 4195004..8ba4daa 100644 --- a/eive_tmtc/tmtc/health.py +++ b/eive_tmtc/tmtc/health.py @@ -11,11 +11,13 @@ from tmtccmd.tc.decorator import ServiceProviderParams class OpCode: + SET_HEALTH = "set_health" ANNOUNCE_HEALTH_ALL = "read_health_all" ANNOUNCE_HEALTH = "read_health" class Info: + SET_HEALTH = "Set health of specific object" ANNOUNCE_HEALTH_ALL = "Read all health states" ANNOUNCE_HEALTH = "Read health state of one object" @@ -24,8 +26,10 @@ class Info: def pack_test_command(p: ServiceProviderParams): o = p.op_code q = p.queue_helper + if o == OpCode.SET_HEALTH: + raise NotImplementedError if o == OpCode.ANNOUNCE_HEALTH: - raise NotImplementedError() + raise NotImplementedError elif o == OpCode.ANNOUNCE_HEALTH_ALL: q.add_log_cmd(Info.ANNOUNCE_HEALTH_ALL) q.add_pus_tc( @@ -40,4 +44,5 @@ def add_health_cmd_defs(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(OpCode.ANNOUNCE_HEALTH_ALL, Info.ANNOUNCE_HEALTH_ALL) oce.add(OpCode.ANNOUNCE_HEALTH, Info.ANNOUNCE_HEALTH) + oce.add(OpCode.SET_HEALTH, Info.SET_HEALTH) defs.add_service(CustomServiceList.HEALTH, info="Health Service", op_code_entry=oce) diff --git a/eive_tmtc/tmtc/obj_prompt.py b/eive_tmtc/tmtc/obj_prompt.py new file mode 100644 index 0000000..db7d6bd --- /dev/null +++ b/eive_tmtc/tmtc/obj_prompt.py @@ -0,0 +1,38 @@ +from eive_tmtc.config.object_ids import ( + ACS_SUBSYSTEM_ID, + ACS_CONTROLLER, + IMTQ_HANDLER_ID, + ACS_BOARD_ASS_ID, + RW_ASSEMBLY, + SUS_BOARD_ASS_ID +) + +SUBSYSTEM_DICT = { + 0: "acs", + 1: "tcs", + 2: "com", +} + +ACS_OBJ_DICT = { + 0: ("Subsystem", ACS_SUBSYSTEM_ID), + 1: ("SUS Assembly", SUS_BOARD_ASS_ID), + 2: ("ACS Board Assembly", ACS_BOARD_ASS_ID), + 3: ("RW Assembly", RW_ASSEMBLY), + 4: ("iMTQ MGT", IMTQ_HANDLER_ID), +} + +def prompt_object() -> bytes: + for k, v in SUBSYSTEM_DICT: + print(f"{k}: {v}") + subsystem_key = int(input("Please specify target subsystem by key: ")) + subsystem = SUBSYSTEM_DICT[subsystem_key] + if subsystem is None: + raise ValueError("invalid key") + if subsystem == "acs": + for k, v in ACS_OBJ_DICT: + print(f"{k}: {v[0]}") + obj_key = int(input("Please specify target object by key: ")) + acs_obj = ACS_OBJ_DICT[obj_key] + if acs_obj is None: + raise ValueError("invalid key") + return acs_obj[1]