From 688602e4862bf6bbf5574c9bbbb98556b6541e71 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 22 Nov 2023 19:13:22 +0100 Subject: [PATCH] cleaning up --- .../{procedure_packer.py => cmd_demux.py} | 253 +++++++++++------- eive_tmtc/pus_tc/tc_handler.py | 4 +- 2 files changed, 151 insertions(+), 106 deletions(-) rename eive_tmtc/pus_tc/{procedure_packer.py => cmd_demux.py} (66%) diff --git a/eive_tmtc/pus_tc/procedure_packer.py b/eive_tmtc/pus_tc/cmd_demux.py similarity index 66% rename from eive_tmtc/pus_tc/procedure_packer.py rename to eive_tmtc/pus_tc/cmd_demux.py index 52e3daa..1d7d286 100644 --- a/eive_tmtc/pus_tc/procedure_packer.py +++ b/eive_tmtc/pus_tc/cmd_demux.py @@ -1,7 +1,7 @@ """Hook function which packs telecommands based on service and operation code string """ import logging -from typing import cast +from typing import cast, List from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd @@ -70,53 +70,48 @@ from tmtccmd.util import ObjectIdU32 from eive_tmtc.utility.input_helper import InputHelper -def handle_default_procedure( # noqa C901: Complexity okay here. - info: DefaultProcedureInfo, - queue_helper: DefaultPusQueueHelper, -): - cmd_path = info.cmd_path - assert cmd_path is not None - cmd_path_list = cmd_path.split("/") - if cmd_path_list[0] == "/": - cmd_path_list = cmd_path_list[1:] - if len(cmd_path_list) == 0: - raise ValueError( - "command path list empty. Full command path {cmd_path} might have invalid format" - ) +def handle_eps_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]): obj_id_man = get_object_ids() - if cmd_path_list == ["eps", "p60_dock"]: + assert len(cmd_path_list) >= 1 + if cmd_path_list[1] == "p60_dock": object_id = cast(ObjectIdU32, obj_id_man.get(P60_DOCK_HANDLER)) assert len(cmd_path_list) >= 2 return pack_p60dock_cmds( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list == ["tcs", "rtd_devs"]: - assert len(cmd_path_list) >= 2 - return pack_rtd_commands( - object_id=None, q=queue_helper, cmd_str=cmd_path_list[2] - ) - if cmd_path_list == ["eps", "pdu1"]: + if cmd_path_list[1] == "pdu1": object_id = cast(ObjectIdU32, obj_id_man.get(PDU_1_HANDLER_ID)) return pack_pdu1_commands( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list == ["eps", "pdu2"]: + if cmd_path_list[1] == "pdu2": object_id = cast(ObjectIdU32, obj_id_man.get(PDU_2_HANDLER_ID)) return pack_pdu2_commands( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list == ["eps", "acu"]: + if cmd_path_list[1] == "acu": object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID)) return pack_acu_commands( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list == ["tcs"]: - assert len(cmd_path_list) >= 1 - return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[1]) - if cmd_path_list == ["tcs", "tcs_ctrl"]: + assert len(cmd_path_list) >= 2 + return pack_power_commands(queue_helper, cmd_path_list[1]) + + +def handle_tcs_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]): + obj_id_man = get_object_ids() + if cmd_path_list[1] == "tcs_brd_assy": + assert len(cmd_path_list) >= 3 + return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[2]) + if cmd_path_list[1] == "rtd_devs": + assert len(cmd_path_list) >= 2 + return pack_rtd_commands( + object_id=None, q=queue_helper, cmd_str=cmd_path_list[2] + ) + if cmd_path_list[1] == "tcs_ctrl": assert len(cmd_path_list) >= 2 return pack_tcs_ctrl_commands(q=queue_helper, cmd_str=cmd_path_list[2]) - if cmd_path_list == ["tcs", "tmp1075_devs"]: + if cmd_path_list[1] == "tmp1075_devs": menu_dict = { "0": ("TMP1075 TCS Board 0", TMP1075_HANDLER_TCS_BRD_0_ID), "1": ("TMP1075 TCS Board 1", TMP1075_HANDLER_TCS_BRD_1_ID), @@ -132,113 +127,163 @@ def handle_default_procedure( # noqa C901: Complexity okay here. return pack_tmp1075_test_into( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list == ["tcs", "heaters"]: + if cmd_path_list[1] == "heaters": object_id = HEATER_CONTROLLER_ID return pack_heater_cmds( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list == ["acs", "mgt"]: + return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[1]) + + +def handle_acs_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]): + obj_id_man = get_object_ids() + assert len(cmd_path_list) >= 2 + if cmd_path_list[1] == "mgt": object_id = cast(ObjectIdU32, obj_id_man.get(IMTQ_HANDLER_ID)) return create_imtq_command( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list == ["acs", "rw_assy", "rw_1"]: - assert len(cmd_path_list) >= 4 - return create_single_rw_cmd( - object_id=RW1_ID, rw_idx=1, q=queue_helper, cmd_str=cmd_path_list[3] - ) - if cmd_path_list == ["acs", "rw_assy", "rw_2"]: - assert len(cmd_path_list) >= 4 - return create_single_rw_cmd( - object_id=RW2_ID, rw_idx=2, q=queue_helper, cmd_str=cmd_path_list[3] - ) - if cmd_path_list == ["acs", "rw_assy", "rw_3"]: - return create_single_rw_cmd( - object_id=RW3_ID, rw_idx=3, q=queue_helper, cmd_str=cmd_path_list[3] - ) - if cmd_path_list == ["acs", "rw_assy", "rw_4"]: - return create_single_rw_cmd( - object_id=RW4_ID, rw_idx=4, q=queue_helper, cmd_str=cmd_path_list[3] - ) - if cmd_path_list == ["acs", "acs_brd_assy", "mgm_devs"]: - assert len(cmd_path_list) >= 4 - return handle_mgm_cmd(q=queue_helper, cmd_str=cmd_path_list[3]) - if cmd_path_list == ["payload", "rad_sensor"]: - assert len(cmd_path_list) >= 3 - object_id = cast(ObjectIdU32, obj_id_man.get(RAD_SENSOR_ID)) - return create_rad_sensor_cmd( - object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] - ) - if cmd_path_list == ["acs", "str"]: + if cmd_path_list[1] == "str": object_id = cast(ObjectIdU32, obj_id_man.get(STAR_TRACKER_ID)) assert len(cmd_path_list) >= 3 return pack_star_tracker_commands( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list == ["acs", "str_img_helper"]: + + if cmd_path_list[1] == "rws": + assert len(cmd_path_list) >= 3 + if cmd_path_list[2] == "rw_assy": + assert len(cmd_path_list) >= 4 + return pack_rw_ass_cmds( + q=queue_helper, object_id=RW_ASSEMBLY, cmd_str=cmd_path_list[3] + ) + if cmd_path_list[2] == "rw_1": + assert len(cmd_path_list) >= 4 + return create_single_rw_cmd( + object_id=RW1_ID, rw_idx=1, q=queue_helper, cmd_str=cmd_path_list[3] + ) + if cmd_path_list[2] == "rw_2": + assert len(cmd_path_list) >= 4 + return create_single_rw_cmd( + object_id=RW2_ID, rw_idx=2, q=queue_helper, cmd_str=cmd_path_list[3] + ) + if cmd_path_list[2] == "rw_3": + assert len(cmd_path_list) >= 4 + return create_single_rw_cmd( + object_id=RW3_ID, rw_idx=3, q=queue_helper, cmd_str=cmd_path_list[3] + ) + if cmd_path_list[2] == "rw_4": + assert len(cmd_path_list) >= 4 + return create_single_rw_cmd( + object_id=RW4_ID, rw_idx=4, q=queue_helper, cmd_str=cmd_path_list[3] + ) + + if cmd_path_list[1] == "gnss_devs": + assert len(cmd_path_list) >= 3 + return pack_gps_command( + object_id=oids.GPS_CONTROLLER, q=queue_helper, cmd_str=cmd_path_list[2] + ) + if cmd_path_list[1:] == ["acs_brd_assy", "mgm_devs"]: + assert len(cmd_path_list) >= 4 + return handle_mgm_cmd(q=queue_helper, cmd_str=cmd_path_list[3]) + if cmd_path_list[1] == "str_img_helper": assert len(cmd_path_list) >= 3 object_id = cast(ObjectIdU32, obj_id_man.get(STR_IMG_HELPER_ID)) return pack_str_img_helper_command( object_id=object_id, q=queue_helper, op_code=cmd_path_list[2] ) - if cmd_path_list == ["acs", "acs_ctrl"]: + if cmd_path_list[1] == "acs_ctrl": assert len(cmd_path_list) >= 3 return pack_acs_ctrl_command(queue_helper, cmd_path_list[2]) - if cmd_path_list == ["obdh", "core"]: - assert len(cmd_path_list) >= 3 - return pack_core_commands(q=queue_helper, cmd_str=cmd_path_list[2]) - if cmd_path_list == ["eps"]: - assert len(cmd_path_list) >= 2 - return pack_power_commands(queue_helper, cmd_path_list[2]) - if cmd_path_list == ["acs"]: - assert len(cmd_path_list) >= 2 - return pack_acs_command(q=queue_helper, cmd_str=cmd_path_list[2]) - if cmd_path_list == ["acs", "gnss_devs"]: - assert len(cmd_path_list) >= 3 - return pack_gps_command( - object_id=oids.GPS_CONTROLLER, q=queue_helper, cmd_str=cmd_path_list[2] - ) - if cmd_path_list == ["com", "ccsds"]: - object_id = cast(ObjectIdU32, obj_id_man.get(CCSDS_HANDLER_ID)) - assert len(cmd_path_list) >= 3 - return pack_ccsds_handler_command( - object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] - ) - if cmd_path_list == ["com", "pdec"]: - assert len(cmd_path_list) >= 3 - return pack_pdec_handler_commands( - object_id=PDEC_HANDLER_ID, q=queue_helper, cmd_str=cmd_path_list[2] - ) - if cmd_path_list == ["com", "syrlinks"]: - object_id = cast(ObjectIdU32, obj_id_man.get(SYRLINKS_HANDLER_ID)) - assert len(cmd_path_list) >= 3 - return pack_syrlinks_command( - object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] - ) - if cmd_path_list == ["acs", "gyro_devs"]: + if cmd_path_list[1] == "gyro_devs": assert len(cmd_path_list) >= 3 return handle_gyr_cmd(q=queue_helper, cmd_str=cmd_path_list[2]) - if cmd_path_list == ["payload", "pl_pcdu"]: + return pack_acs_command(q=queue_helper, cmd_str=cmd_path_list[2]) + + +def handle_payload_procedure( + queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str] +): + obj_id_man = get_object_ids() + assert len(cmd_path_list) >= 2 + if cmd_path_list == "rad_sensor": + assert len(cmd_path_list) >= 3 + object_id = cast(ObjectIdU32, obj_id_man.get(RAD_SENSOR_ID)) + return create_rad_sensor_cmd( + object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] + ) + if cmd_path_list[1] == "pl_pcdu": assert len(cmd_path_list) >= 3 return pack_pl_pcdu_commands(q=queue_helper, cmd_str=cmd_path_list[2]) - if cmd_path_list == ["tcs", "tcs_brd_assy"]: - assert len(cmd_path_list) >= 3 - return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[2]) - if cmd_path_list == ["acs", "rws", "rw_assy"]: - assert len(cmd_path_list) >= 4 - return pack_rw_ass_cmds( - q=queue_helper, object_id=RW_ASSEMBLY, cmd_str=cmd_path_list[3] - ) - if cmd_path_list == ["payload", "scex"]: + if cmd_path_list[1] == "scex": assert len(cmd_path_list) >= 3 return pack_scex_cmds( cmd_str=cmd_path_list[2], q=queue_helper, ) - if cmd_path_list == ["core", "xiphos_wdt"]: + + +def handle_obdh_procedure( + queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str] +): + assert len(cmd_path_list) >= 2 + if cmd_path_list[1] == "core": + assert len(cmd_path_list) >= 3 + return pack_core_commands(q=queue_helper, cmd_str=cmd_path_list[2]) + if cmd_path_list[1] == "xiphos_wdt": return pack_wdt_commands(queue_helper, cmd_path_list[2]) - if "time" in cmd_path_list: + if cmd_path_list[1] == "time": return pack_time_management_cmd(queue_helper, cmd_path_list[1]) + + +def handle_com_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]): + obj_id_man = get_object_ids() + assert len(cmd_path_list) >= 2 + + if cmd_path_list[1] == "ccsds": + object_id = cast(ObjectIdU32, obj_id_man.get(CCSDS_HANDLER_ID)) + assert len(cmd_path_list) >= 3 + return pack_ccsds_handler_command( + object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] + ) + if cmd_path_list[1] == "pdec": + assert len(cmd_path_list) >= 3 + return pack_pdec_handler_commands( + object_id=PDEC_HANDLER_ID, q=queue_helper, cmd_str=cmd_path_list[2] + ) + if cmd_path_list[1] == "syrlinks": + object_id = cast(ObjectIdU32, obj_id_man.get(SYRLINKS_HANDLER_ID)) + assert len(cmd_path_list) >= 3 + return pack_syrlinks_command( + object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] + ) + + +def handle_default_procedure( # noqa C901: Complexity okay here. + info: DefaultProcedureInfo, + queue_helper: DefaultPusQueueHelper, +): + cmd_path = info.cmd_path + assert cmd_path is not None + cmd_path_list = cmd_path.split("/") + if cmd_path_list[0] == "/": + cmd_path_list = cmd_path_list[1:] + if len(cmd_path_list) == 0: + raise ValueError( + "command path list empty. Full command path {cmd_path} might have invalid format" + ) + if cmd_path_list[0] == "eps": + return handle_eps_procedure(queue_helper, cmd_path_list[1:]) + if cmd_path_list[0] == "tcs": + return handle_tcs_procedure(queue_helper, cmd_path_list[1:]) + if cmd_path_list[0] == "acs": + return handle_acs_procedure(queue_helper, cmd_path_list[1:]) + if cmd_path_list[0] == "payload": + return handle_payload_procedure(queue_helper, cmd_path_list[1:]) + if cmd_path_list[0] == "obdh": + return handle_obdh_procedure(queue_helper, cmd_path_list[1:]) + if cmd_path_list[0] == "com": + return handle_com_procedure(queue_helper, cmd_path_list[1:]) logging.getLogger(__name__).warning( f"invalid or unimplemented command path {cmd_path}" ) diff --git a/eive_tmtc/pus_tc/tc_handler.py b/eive_tmtc/pus_tc/tc_handler.py index 85dbf42..371c7fb 100644 --- a/eive_tmtc/pus_tc/tc_handler.py +++ b/eive_tmtc/pus_tc/tc_handler.py @@ -6,7 +6,7 @@ from eive_tmtc.config.definitions import ( PUS_APID, CFDP_LOCAL_ENTITY_ID, ) -from eive_tmtc.pus_tc.procedure_packer import handle_default_procedure +from eive_tmtc.pus_tc.cmd_demux import handle_default_procedure from eive_tmtc.cfdp.handler import CfdpInCcsdsHandler from tmtccmd import TcHandlerBase, ProcedureWrapper from tmtccmd.cfdp.defs import CfdpRequestType @@ -69,7 +69,7 @@ class TcHandler(TcHandlerBase): def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): self.queue_helper.queue_wrapper = wrapper.queue_wrapper if info.proc_type == TcProcedureType.DEFAULT: - handle_default_procedure(self, info.to_def_procedure(), self.queue_helper) + handle_default_procedure(info.to_def_procedure(), self.queue_helper) elif info.proc_type == TcProcedureType.CFDP: self.handle_cfdp_procedure(info)