"""Hook function which packs telecommands based on service and operation code string """ import logging from typing import cast from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd from eive_tmtc.tmtc.acs.acs_ctrl import pack_acs_ctrl_command from eive_tmtc.tmtc.acs.mgms import handle_mgm_cmd from eive_tmtc.tmtc.power.power import pack_power_commands from eive_tmtc.tmtc.tcs.ctrl import pack_tcs_ctrl_commands from eive_tmtc.tmtc.tcs.rtd import pack_rtd_commands from eive_tmtc.tmtc.payload.scex import pack_scex_cmds from eive_tmtc.tmtc.tcs.subsystem import pack_tcs_sys_commands from eive_tmtc.tmtc.time import pack_time_management_cmd from tmtccmd import DefaultProcedureInfo, TcHandlerBase from tmtccmd.tmtc import DefaultPusQueueHelper from eive_tmtc.tmtc.power.p60dock import pack_p60dock_cmds from eive_tmtc.tmtc.power.pdu2 import pack_pdu2_commands from eive_tmtc.tmtc.power.pdu1 import pack_pdu1_commands from eive_tmtc.tmtc.power.acu import pack_acu_commands from eive_tmtc.tmtc.acs.imtq import create_imtq_command from eive_tmtc.tmtc.tcs.heater import pack_heater_cmds from eive_tmtc.tmtc.acs.reaction_wheels import ( create_single_rw_cmd, pack_rw_ass_cmds, ) from eive_tmtc.tmtc.com.ccsds_handler import pack_ccsds_handler_command from eive_tmtc.tmtc.core import pack_core_commands from eive_tmtc.tmtc.acs.star_tracker import pack_star_tracker_commands from eive_tmtc.tmtc.com.syrlinks_handler import pack_syrlinks_command from eive_tmtc.tmtc.com.pdec_handler import pack_pdec_handler_commands from eive_tmtc.tmtc.wdt import pack_wdt_commands from eive_tmtc.tmtc.acs.acs_board import pack_acs_command from eive_tmtc.config.object_ids import ( P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, TMP1075_HANDLER_TCS_BRD_0_ID, TMP1075_HANDLER_TCS_BRD_1_ID, TMP1075_HANDLER_PLPCDU_0_ID, TMP1075_HANDLER_IF_BRD_ID, HEATER_CONTROLLER_ID, IMTQ_HANDLER_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, RAD_SENSOR_ID, STAR_TRACKER_ID, CCSDS_HANDLER_ID, PDEC_HANDLER_ID, STR_IMG_HELPER_ID, SYRLINKS_HANDLER_ID, RW_ASSEMBLY, get_object_ids, ) from eive_tmtc.tmtc.tcs.tmp1075 import pack_tmp1075_test_into from eive_tmtc.tmtc.acs.gps import pack_gps_command from eive_tmtc.tmtc.payload.rad_sensor import create_rad_sensor_cmd from eive_tmtc.tmtc.payload.plpcdu import pack_pl_pcdu_commands from eive_tmtc.tmtc.acs.str_img_helper import pack_str_img_helper_command import eive_tmtc.config.object_ids as oids from tmtccmd.util import ObjectIdU32 from eive_tmtc.utility.input_helper import InputHelper def handle_default_procedure( # noqa C901: Complexity okay here. tc_base: TcHandlerBase, info: DefaultProcedureInfo, queue_helper: DefaultPusQueueHelper, ): cmd_path = info.cmd_path assert cmd_path is not None cmd_path_list = cmd_path.split("/") if cmd_path_list[0] == "/": cmd_path_list = cmd_path_list[1:] if len(cmd_path_list) == 0: raise ValueError( "command path list empty. Full command path {cmd_path} might have invalid format" ) obj_id_man = get_object_ids() if cmd_path_list == ["eps", "p60_dock"]: object_id = cast(ObjectIdU32, obj_id_man.get(P60_DOCK_HANDLER)) assert len(cmd_path_list) >= 2 return pack_p60dock_cmds( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) if cmd_path_list == ["tcs", "rtd_devs"]: assert len(cmd_path_list) >= 2 return pack_rtd_commands( object_id=None, q=queue_helper, cmd_str=cmd_path_list[2] ) if cmd_path_list == ["eps", "pdu1"]: object_id = cast(ObjectIdU32, obj_id_man.get(PDU_1_HANDLER_ID)) return pack_pdu1_commands( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) if cmd_path_list == ["eps", "pdu2"]: object_id = cast(ObjectIdU32, obj_id_man.get(PDU_2_HANDLER_ID)) return pack_pdu2_commands( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) if cmd_path_list == ["eps", "acu"]: object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID)) return pack_acu_commands( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) if cmd_path_list == ["tcs"]: assert len(cmd_path_list) >= 1 return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[1]) if cmd_path_list == ["tcs", "tcs_ctrl"]: assert len(cmd_path_list) >= 2 return pack_tcs_ctrl_commands(q=queue_helper, cmd_str=cmd_path_list[2]) if cmd_path_list == ["tcs", "tmp1075_devs"]: menu_dict = { "0": ("TMP1075 TCS Board 0", TMP1075_HANDLER_TCS_BRD_0_ID), "1": ("TMP1075 TCS Board 1", TMP1075_HANDLER_TCS_BRD_1_ID), "2": ("TMP1075 PL PCDU 0", TMP1075_HANDLER_PLPCDU_0_ID), "3": ("TMP1075 PL PCDU 1", oids.TMP1075_HANDLER_PLPCDU_1_ID), "4": ("TMP1075 IF Board", TMP1075_HANDLER_IF_BRD_ID), } input_helper = InputHelper(menu_dict) tmp_select = input_helper.get_key() assert len(cmd_path_list) >= 2 object_id = obj_id_man.get(menu_dict[tmp_select][1]) assert object_id is not None return pack_tmp1075_test_into( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) if cmd_path_list == ["tcs", "heaters"]: object_id = HEATER_CONTROLLER_ID return pack_heater_cmds( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) if cmd_path_list == ["acs", "mgt"]: object_id = cast(ObjectIdU32, obj_id_man.get(IMTQ_HANDLER_ID)) return create_imtq_command( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) if cmd_path_list == ["acs", "rw_assy", "rw_1"]: assert len(cmd_path_list) >= 4 return create_single_rw_cmd( object_id=RW1_ID, rw_idx=1, q=queue_helper, cmd_str=cmd_path_list[3] ) if cmd_path_list == ["acs", "rw_assy", "rw_2"]: assert len(cmd_path_list) >= 4 return create_single_rw_cmd( object_id=RW2_ID, rw_idx=2, q=queue_helper, cmd_str=cmd_path_list[3] ) if cmd_path_list == ["acs", "rw_assy", "rw_3"]: return create_single_rw_cmd( object_id=RW3_ID, rw_idx=3, q=queue_helper, cmd_str=cmd_path_list[3] ) if cmd_path_list == ["acs", "rw_assy", "rw_4"]: return create_single_rw_cmd( object_id=RW4_ID, rw_idx=4, q=queue_helper, cmd_str=cmd_path_list[3] ) if cmd_path_list == ["acs", "acs_brd_assy", "mgm_devs"]: assert len(cmd_path_list) >= 4 return handle_mgm_cmd(q=queue_helper, cmd_str=cmd_path_list[3]) if cmd_path_list == ["payload", "rad_sensor"]: assert len(cmd_path_list) >= 3 object_id = cast(ObjectIdU32, obj_id_man.get(RAD_SENSOR_ID)) return create_rad_sensor_cmd( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) if cmd_path_list == ["acs", "str"]: object_id = cast(ObjectIdU32, obj_id_man.get(STAR_TRACKER_ID)) assert len(cmd_path_list) >= 3 return pack_star_tracker_commands( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) if cmd_path_list == ["acs", "str_img_helper"]: assert len(cmd_path_list) >= 3 object_id = cast(ObjectIdU32, obj_id_man.get(STR_IMG_HELPER_ID)) return pack_str_img_helper_command( object_id=object_id, q=queue_helper, op_code=cmd_path_list[2] ) if cmd_path_list == ["acs", "acs_ctrl"]: assert len(cmd_path_list) >= 3 return pack_acs_ctrl_command(queue_helper, cmd_path_list[2]) if cmd_path_list == ["obdh", "core"]: assert len(cmd_path_list) >= 3 return pack_core_commands(q=queue_helper, cmd_str=cmd_path_list[2]) if cmd_path_list == ["eps"]: assert len(cmd_path_list) >= 2 return pack_power_commands(queue_helper, cmd_path_list[2]) if cmd_path_list == ["acs"]: assert len(cmd_path_list) >= 2 return pack_acs_command(q=queue_helper, cmd_str=cmd_path_list[2]) if cmd_path_list == ["acs", "gnss_devs"]: assert len(cmd_path_list) >= 3 return pack_gps_command( object_id=oids.GPS_CONTROLLER, q=queue_helper, cmd_str=cmd_path_list[2] ) if cmd_path_list == ["com", "ccsds"]: object_id = cast(ObjectIdU32, obj_id_man.get(CCSDS_HANDLER_ID)) assert len(cmd_path_list) >= 3 return pack_ccsds_handler_command( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) if cmd_path_list == ["com", "pdec"]: assert len(cmd_path_list) >= 3 return pack_pdec_handler_commands( object_id=PDEC_HANDLER_ID, q=queue_helper, cmd_str=cmd_path_list[2] ) if cmd_path_list == ["com", "syrlinks"]: object_id = cast(ObjectIdU32, obj_id_man.get(SYRLINKS_HANDLER_ID)) assert len(cmd_path_list) >= 3 return pack_syrlinks_command( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) if cmd_path_list == ["acs", "gyro_devs"]: assert len(cmd_path_list) >= 3 return handle_gyr_cmd(q=queue_helper, cmd_str=cmd_path_list[2]) if cmd_path_list == ["payload", "pl_pcdu"]: assert len(cmd_path_list) >= 3 return pack_pl_pcdu_commands(q=queue_helper, cmd_str=cmd_path_list[2]) if cmd_path_list == ["tcs", "tcs_brd_assy"]: assert len(cmd_path_list) >= 3 return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[2]) if cmd_path_list == ["acs", "rws", "rw_assy"]: assert len(cmd_path_list) >= 4 return pack_rw_ass_cmds( q=queue_helper, object_id=RW_ASSEMBLY, cmd_str=cmd_path_list[3] ) if cmd_path_list == ["payload", "scex"]: assert len(cmd_path_list) >= 3 return pack_scex_cmds( cmd_str=cmd_path_list[2], q=queue_helper, ) if cmd_path_list == ["core", "xiphos_wdt"]: return pack_wdt_commands(queue_helper, cmd_path_list[2]) if "time" in cmd_path_list: return pack_time_management_cmd(queue_helper, cmd_path_list[1]) logging.getLogger(__name__).warning( f"invalid or unimplemented command path {cmd_path}" )