From f756df43a24f77db1badfb52de6174cb8f256848 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 22 Nov 2023 19:22:51 +0100 Subject: [PATCH] last tests --- eive_tmtc/pus_tc/cmd_demux.py | 64 ++++++++++++++++++---------------- eive_tmtc/pus_tc/tc_handler.py | 4 +-- 2 files changed, 36 insertions(+), 32 deletions(-) diff --git a/eive_tmtc/pus_tc/cmd_demux.py b/eive_tmtc/pus_tc/cmd_demux.py index 1d7d286..81b8213 100644 --- a/eive_tmtc/pus_tc/cmd_demux.py +++ b/eive_tmtc/pus_tc/cmd_demux.py @@ -6,6 +6,7 @@ from typing import cast, List from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd from eive_tmtc.tmtc.acs.acs_ctrl import pack_acs_ctrl_command +from eive_tmtc.tmtc.test import pack_test_command from eive_tmtc.tmtc.acs.mgms import handle_mgm_cmd from eive_tmtc.tmtc.power.power import pack_power_commands from eive_tmtc.tmtc.tcs.ctrl import pack_tcs_ctrl_commands @@ -70,6 +71,39 @@ from tmtccmd.util import ObjectIdU32 from eive_tmtc.utility.input_helper import InputHelper +def handle_pus_procedure( # noqa C901: Complexity okay here. + info: DefaultProcedureInfo, + queue_helper: DefaultPusQueueHelper, +): + cmd_path = info.cmd_path + assert cmd_path is not None + cmd_path_list = cmd_path.split("/") + if cmd_path_list[0] == "/": + cmd_path_list = cmd_path_list[1:] + if len(cmd_path_list) == 0: + raise ValueError( + "command path list empty. Full command path {cmd_path} might have invalid format" + ) + if cmd_path_list[0] == "eps": + return handle_eps_procedure(queue_helper, cmd_path_list[1:]) + if cmd_path_list[0] == "tcs": + return handle_tcs_procedure(queue_helper, cmd_path_list[1:]) + if cmd_path_list[0] == "acs": + return handle_acs_procedure(queue_helper, cmd_path_list[1:]) + if cmd_path_list[0] == "payload": + return handle_payload_procedure(queue_helper, cmd_path_list[1:]) + if cmd_path_list[0] == "obdh": + return handle_obdh_procedure(queue_helper, cmd_path_list[1:]) + if cmd_path_list[0] == "test": + assert len(cmd_path_list) >= 1 + return pack_test_command(queue_helper, cmd_path_list[1]) + if cmd_path_list[0] == "com": + return handle_com_procedure(queue_helper, cmd_path_list[1:]) + logging.getLogger(__name__).warning( + f"invalid or unimplemented command path {cmd_path}" + ) + + def handle_eps_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]): obj_id_man = get_object_ids() assert len(cmd_path_list) >= 1 @@ -257,33 +291,3 @@ def handle_com_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: Lis return pack_syrlinks_command( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) - - -def handle_default_procedure( # noqa C901: Complexity okay here. - info: DefaultProcedureInfo, - queue_helper: DefaultPusQueueHelper, -): - cmd_path = info.cmd_path - assert cmd_path is not None - cmd_path_list = cmd_path.split("/") - if cmd_path_list[0] == "/": - cmd_path_list = cmd_path_list[1:] - if len(cmd_path_list) == 0: - raise ValueError( - "command path list empty. Full command path {cmd_path} might have invalid format" - ) - if cmd_path_list[0] == "eps": - return handle_eps_procedure(queue_helper, cmd_path_list[1:]) - if cmd_path_list[0] == "tcs": - return handle_tcs_procedure(queue_helper, cmd_path_list[1:]) - if cmd_path_list[0] == "acs": - return handle_acs_procedure(queue_helper, cmd_path_list[1:]) - if cmd_path_list[0] == "payload": - return handle_payload_procedure(queue_helper, cmd_path_list[1:]) - if cmd_path_list[0] == "obdh": - return handle_obdh_procedure(queue_helper, cmd_path_list[1:]) - if cmd_path_list[0] == "com": - return handle_com_procedure(queue_helper, cmd_path_list[1:]) - logging.getLogger(__name__).warning( - f"invalid or unimplemented command path {cmd_path}" - ) diff --git a/eive_tmtc/pus_tc/tc_handler.py b/eive_tmtc/pus_tc/tc_handler.py index 371c7fb..68fad09 100644 --- a/eive_tmtc/pus_tc/tc_handler.py +++ b/eive_tmtc/pus_tc/tc_handler.py @@ -6,7 +6,7 @@ from eive_tmtc.config.definitions import ( PUS_APID, CFDP_LOCAL_ENTITY_ID, ) -from eive_tmtc.pus_tc.cmd_demux import handle_default_procedure +from eive_tmtc.pus_tc.cmd_demux import handle_pus_procedure from eive_tmtc.cfdp.handler import CfdpInCcsdsHandler from tmtccmd import TcHandlerBase, ProcedureWrapper from tmtccmd.cfdp.defs import CfdpRequestType @@ -69,7 +69,7 @@ class TcHandler(TcHandlerBase): def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): self.queue_helper.queue_wrapper = wrapper.queue_wrapper if info.proc_type == TcProcedureType.DEFAULT: - handle_default_procedure(info.to_def_procedure(), self.queue_helper) + handle_pus_procedure(info.to_def_procedure(), self.queue_helper) elif info.proc_type == TcProcedureType.CFDP: self.handle_cfdp_procedure(info)