diff --git a/eive_tmtc/pus_tc/cmd_demux.py b/eive_tmtc/pus_tc/cmd_demux.py index 7e45679..ec87f79 100644 --- a/eive_tmtc/pus_tc/cmd_demux.py +++ b/eive_tmtc/pus_tc/cmd_demux.py @@ -6,6 +6,7 @@ from typing import cast, List from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd from eive_tmtc.tmtc.acs.acs_ctrl import pack_acs_ctrl_command +from eive_tmtc.tmtc.com.subsystem import build_com_subsystem_procedure from eive_tmtc.tmtc.test import pack_test_command from eive_tmtc.tmtc.acs.mgms import handle_mgm_cmd from eive_tmtc.tmtc.power.power import pack_power_commands @@ -106,46 +107,48 @@ def handle_pus_procedure( # noqa C901: Complexity okay here. def handle_eps_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]): obj_id_man = get_object_ids() - assert len(cmd_path_list) >= 1 - if cmd_path_list[1] == "p60_dock": + if len(cmd_path_list) == 1: + return pack_power_commands(queue_helper, cmd_path_list[0]) + assert len(cmd_path_list) >= 2 + if cmd_path_list[0] == "p60_dock": object_id = cast(ObjectIdU32, obj_id_man.get(P60_DOCK_HANDLER)) assert len(cmd_path_list) >= 2 return pack_p60dock_cmds( - object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] + object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1] ) - if cmd_path_list[1] == "pdu1": + if cmd_path_list[0] == "pdu1": object_id = cast(ObjectIdU32, obj_id_man.get(PDU_1_HANDLER_ID)) return pack_pdu1_commands( - object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] + object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1] ) - if cmd_path_list[1] == "pdu2": + if cmd_path_list[0] == "pdu2": object_id = cast(ObjectIdU32, obj_id_man.get(PDU_2_HANDLER_ID)) return pack_pdu2_commands( - object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] + object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1] ) - if cmd_path_list[1] == "acu": + if cmd_path_list[0] == "acu": object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID)) return pack_acu_commands( - object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] + object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1] ) - assert len(cmd_path_list) >= 2 - return pack_power_commands(queue_helper, cmd_path_list[1]) def handle_tcs_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]): obj_id_man = get_object_ids() - if cmd_path_list[1] == "tcs_brd_assy": - assert len(cmd_path_list) >= 3 - return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[2]) - if cmd_path_list[1] == "rtd_devs": + if len(cmd_path_list) == 1: + return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[0]) + assert len(cmd_path_list) >= 2 + if cmd_path_list[0] == "tcs_brd_assy": + return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[1]) + if cmd_path_list[0] == "rtd_devs": assert len(cmd_path_list) >= 2 return pack_rtd_commands( - object_id=None, q=queue_helper, cmd_str=cmd_path_list[2] + object_id=None, q=queue_helper, cmd_str=cmd_path_list[1] ) - if cmd_path_list[1] == "tcs_ctrl": + if cmd_path_list[0] == "tcs_ctrl": assert len(cmd_path_list) >= 2 return pack_tcs_ctrl_commands(q=queue_helper, cmd_str=cmd_path_list[2]) - if cmd_path_list[1] == "tmp1075_devs": + if cmd_path_list[0] == "tmp1075_devs": menu_dict = { "0": ("TMP1075 TCS Board 0", TMP1075_HANDLER_TCS_BRD_0_ID), "1": ("TMP1075 TCS Board 1", TMP1075_HANDLER_TCS_BRD_1_ID), @@ -159,80 +162,77 @@ def handle_tcs_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: Lis object_id = obj_id_man.get(menu_dict[tmp_select][1]) assert object_id is not None return pack_tmp1075_test_into( - object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] + object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1] ) - if cmd_path_list[1] == "heaters": + if cmd_path_list[0] == "heaters": object_id = HEATER_CONTROLLER_ID return pack_heater_cmds( - object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] + object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1] ) - return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[1]) def handle_acs_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]): obj_id_man = get_object_ids() + if len(cmd_path_list) == 1: + return pack_acs_command(q=queue_helper, cmd_str=cmd_path_list[0]) assert len(cmd_path_list) >= 2 - if cmd_path_list[1] == "mgt": + if cmd_path_list[0] == "mgt": object_id = cast(ObjectIdU32, obj_id_man.get(IMTQ_HANDLER_ID)) return create_imtq_command( - object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] + object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1] ) - if cmd_path_list[1] == "str": + if cmd_path_list[0] == "str": object_id = cast(ObjectIdU32, obj_id_man.get(STAR_TRACKER_ID)) - assert len(cmd_path_list) >= 3 return pack_star_tracker_commands( - object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] + object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1] ) - if cmd_path_list[1] == "rws": + if cmd_path_list[0] == "rws": assert len(cmd_path_list) >= 3 - if cmd_path_list[2] == "rw_assy": + if cmd_path_list[1] == "rw_assy": assert len(cmd_path_list) >= 4 return pack_rw_ass_cmds( - q=queue_helper, object_id=RW_ASSEMBLY, cmd_str=cmd_path_list[3] + q=queue_helper, object_id=RW_ASSEMBLY, cmd_str=cmd_path_list[2] ) - if cmd_path_list[2] == "rw_1": + if cmd_path_list[1] == "rw_1": assert len(cmd_path_list) >= 4 return create_single_rw_cmd( - object_id=RW1_ID, rw_idx=1, q=queue_helper, cmd_str=cmd_path_list[3] + object_id=RW1_ID, rw_idx=1, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list[2] == "rw_2": + if cmd_path_list[1] == "rw_2": assert len(cmd_path_list) >= 4 return create_single_rw_cmd( - object_id=RW2_ID, rw_idx=2, q=queue_helper, cmd_str=cmd_path_list[3] + object_id=RW2_ID, rw_idx=2, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list[2] == "rw_3": + if cmd_path_list[1] == "rw_3": assert len(cmd_path_list) >= 4 return create_single_rw_cmd( - object_id=RW3_ID, rw_idx=3, q=queue_helper, cmd_str=cmd_path_list[3] + object_id=RW3_ID, rw_idx=3, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list[2] == "rw_4": + if cmd_path_list[1] == "rw_4": assert len(cmd_path_list) >= 4 return create_single_rw_cmd( - object_id=RW4_ID, rw_idx=4, q=queue_helper, cmd_str=cmd_path_list[3] + object_id=RW4_ID, rw_idx=4, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list[1] == "gnss_devs": - assert len(cmd_path_list) >= 3 + if cmd_path_list[0] == "gnss_devs": return pack_gps_command( - object_id=oids.GPS_CONTROLLER, q=queue_helper, cmd_str=cmd_path_list[2] + object_id=oids.GPS_CONTROLLER, q=queue_helper, cmd_str=cmd_path_list[1] ) - if cmd_path_list[1:] == ["acs_brd_assy", "mgm_devs"]: - assert len(cmd_path_list) >= 4 - return handle_mgm_cmd(q=queue_helper, cmd_str=cmd_path_list[3]) - if cmd_path_list[1] == "str_img_helper": + if cmd_path_list[0] == "acs_brd_assy": assert len(cmd_path_list) >= 3 + if cmd_path_list[1] == "mgm_devs": + return handle_mgm_cmd(q=queue_helper, cmd_str=cmd_path_list[2]) + if cmd_path_list[1] == "gyro_devs": + assert len(cmd_path_list) >= 3 + return handle_gyr_cmd(q=queue_helper, cmd_str=cmd_path_list[2]) + if cmd_path_list[0] == "str_img_helper": object_id = cast(ObjectIdU32, obj_id_man.get(STR_IMG_HELPER_ID)) return pack_str_img_helper_command( - object_id=object_id, q=queue_helper, op_code=cmd_path_list[2] + object_id=object_id, q=queue_helper, op_code=cmd_path_list[1] ) - if cmd_path_list[1] == "acs_ctrl": - assert len(cmd_path_list) >= 3 - return pack_acs_ctrl_command(queue_helper, cmd_path_list[2]) - if cmd_path_list[1] == "gyro_devs": - assert len(cmd_path_list) >= 3 - return handle_gyr_cmd(q=queue_helper, cmd_str=cmd_path_list[2]) - return pack_acs_command(q=queue_helper, cmd_str=cmd_path_list[2]) + if cmd_path_list[0] == "acs_ctrl": + return pack_acs_ctrl_command(queue_helper, cmd_path_list[1]) def handle_payload_procedure( @@ -240,16 +240,16 @@ def handle_payload_procedure( ): obj_id_man = get_object_ids() assert len(cmd_path_list) >= 2 - if cmd_path_list == "rad_sensor": + if cmd_path_list[0] == "rad_sensor": assert len(cmd_path_list) >= 3 object_id = cast(ObjectIdU32, obj_id_man.get(RAD_SENSOR_ID)) return create_rad_sensor_cmd( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list[1] == "pl_pcdu": + if cmd_path_list[0] == "pl_pcdu": assert len(cmd_path_list) >= 3 return pack_pl_pcdu_commands(q=queue_helper, cmd_str=cmd_path_list[2]) - if cmd_path_list[1] == "scex": + if cmd_path_list[0] == "scex": assert len(cmd_path_list) >= 3 return pack_scex_cmds( cmd_str=cmd_path_list[2], @@ -261,33 +261,30 @@ def handle_obdh_procedure( queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str] ): assert len(cmd_path_list) >= 2 - if cmd_path_list[1] == "core": - assert len(cmd_path_list) >= 3 - return pack_core_commands(q=queue_helper, cmd_str=cmd_path_list[2]) - if cmd_path_list[1] == "xiphos_wdt": - return pack_wdt_commands(queue_helper, cmd_path_list[2]) - if cmd_path_list[1] == "time": + if cmd_path_list[0] == "core": + return pack_core_commands(q=queue_helper, cmd_str=cmd_path_list[1]) + if cmd_path_list[0] == "xiphos_wdt": + return pack_wdt_commands(queue_helper, cmd_path_list[1]) + if cmd_path_list[0] == "time": return pack_time_management_cmd(queue_helper, cmd_path_list[1]) def handle_com_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]): obj_id_man = get_object_ids() + if len(cmd_path_list) == 1: + return build_com_subsystem_procedure(queue_helper, cmd_path_list[0]) assert len(cmd_path_list) >= 2 - - if cmd_path_list[1] == "ccsds": + if cmd_path_list[0] == "ccsds": object_id = cast(ObjectIdU32, obj_id_man.get(CCSDS_HANDLER_ID)) - assert len(cmd_path_list) >= 3 return pack_ccsds_handler_command( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list[1] == "pdec": - assert len(cmd_path_list) >= 3 + if cmd_path_list[0] == "pdec": return pack_pdec_handler_commands( object_id=PDEC_HANDLER_ID, q=queue_helper, cmd_str=cmd_path_list[2] ) - if cmd_path_list[1] == "syrlinks": + if cmd_path_list[0] == "syrlinks": object_id = cast(ObjectIdU32, obj_id_man.get(SYRLINKS_HANDLER_ID)) - assert len(cmd_path_list) >= 3 return pack_syrlinks_command( object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] ) diff --git a/eive_tmtc/tmtc/com/subsystem.py b/eive_tmtc/tmtc/com/subsystem.py index 7e784a9..d8630ed 100644 --- a/eive_tmtc/tmtc/com/subsystem.py +++ b/eive_tmtc/tmtc/com/subsystem.py @@ -55,23 +55,23 @@ class Info: ANNOUNCE_MODE_RECURSIVE = "Announce mode recursively" -def build_com_subsystem_cmd(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901 +def build_com_subsystem_procedure(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901 prefix = "COM Subsystem" if cmd_str == OpCode.RX_ONLY: q.add_log_cmd(Info.RX_ONLY) q.add_pus_tc(create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_ONLY, 0)) elif cmd_str == OpCode.TX_AND_RX_DEF_RATE: - q.add_log_cmd(Info.TX_AND_RX_DEF_DATARATE) + q.add_log_cmd(Info.TX_AND_RX_DEF_RATE) q.add_pus_tc( create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_DEF_DATARATE, 0) ) elif cmd_str == OpCode.TX_AND_RX_LOW_RATE: - q.add_log_cmd(Info.TX_AND_RX_LOW_DATARATE) + q.add_log_cmd(Info.TX_AND_RX_LOW_RATE) q.add_pus_tc( create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_LOW_DATARATE, 0) ) elif cmd_str == OpCode.TX_AND_RX_HIGH_RATE: - q.add_log_cmd(Info.TX_AND_RX_HIGH_DATARATE) + q.add_log_cmd(Info.TX_AND_RX_HIGH_RATE) q.add_pus_tc( create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_HIGH_DATARATE, 0) )