that should be it
EIVE/-/pipeline/pr-main This commit looks good Details

This commit is contained in:
Robin Müller 2023-11-22 19:38:04 +01:00
parent 2bb917eab7
commit 756944cae3
Signed by: muellerr
GPG Key ID: A649FB78196E3849
2 changed files with 70 additions and 73 deletions

View File

@ -6,6 +6,7 @@ from typing import cast, List
from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd
from eive_tmtc.tmtc.acs.acs_ctrl import pack_acs_ctrl_command
from eive_tmtc.tmtc.com.subsystem import build_com_subsystem_procedure
from eive_tmtc.tmtc.test import pack_test_command
from eive_tmtc.tmtc.acs.mgms import handle_mgm_cmd
from eive_tmtc.tmtc.power.power import pack_power_commands
@ -106,46 +107,48 @@ def handle_pus_procedure( # noqa C901: Complexity okay here.
def handle_eps_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]):
obj_id_man = get_object_ids()
assert len(cmd_path_list) >= 1
if cmd_path_list[1] == "p60_dock":
if len(cmd_path_list) == 1:
return pack_power_commands(queue_helper, cmd_path_list[0])
assert len(cmd_path_list) >= 2
if cmd_path_list[0] == "p60_dock":
object_id = cast(ObjectIdU32, obj_id_man.get(P60_DOCK_HANDLER))
assert len(cmd_path_list) >= 2
return pack_p60dock_cmds(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
)
if cmd_path_list[1] == "pdu1":
if cmd_path_list[0] == "pdu1":
object_id = cast(ObjectIdU32, obj_id_man.get(PDU_1_HANDLER_ID))
return pack_pdu1_commands(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
)
if cmd_path_list[1] == "pdu2":
if cmd_path_list[0] == "pdu2":
object_id = cast(ObjectIdU32, obj_id_man.get(PDU_2_HANDLER_ID))
return pack_pdu2_commands(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
)
if cmd_path_list[1] == "acu":
if cmd_path_list[0] == "acu":
object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID))
return pack_acu_commands(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
)
assert len(cmd_path_list) >= 2
return pack_power_commands(queue_helper, cmd_path_list[1])
def handle_tcs_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]):
obj_id_man = get_object_ids()
if cmd_path_list[1] == "tcs_brd_assy":
assert len(cmd_path_list) >= 3
return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list[1] == "rtd_devs":
if len(cmd_path_list) == 1:
return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[0])
assert len(cmd_path_list) >= 2
if cmd_path_list[0] == "tcs_brd_assy":
return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[1])
if cmd_path_list[0] == "rtd_devs":
assert len(cmd_path_list) >= 2
return pack_rtd_commands(
object_id=None, q=queue_helper, cmd_str=cmd_path_list[2]
object_id=None, q=queue_helper, cmd_str=cmd_path_list[1]
)
if cmd_path_list[1] == "tcs_ctrl":
if cmd_path_list[0] == "tcs_ctrl":
assert len(cmd_path_list) >= 2
return pack_tcs_ctrl_commands(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list[1] == "tmp1075_devs":
if cmd_path_list[0] == "tmp1075_devs":
menu_dict = {
"0": ("TMP1075 TCS Board 0", TMP1075_HANDLER_TCS_BRD_0_ID),
"1": ("TMP1075 TCS Board 1", TMP1075_HANDLER_TCS_BRD_1_ID),
@ -159,80 +162,77 @@ def handle_tcs_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: Lis
object_id = obj_id_man.get(menu_dict[tmp_select][1])
assert object_id is not None
return pack_tmp1075_test_into(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
)
if cmd_path_list[1] == "heaters":
if cmd_path_list[0] == "heaters":
object_id = HEATER_CONTROLLER_ID
return pack_heater_cmds(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
)
return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[1])
def handle_acs_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]):
obj_id_man = get_object_ids()
if len(cmd_path_list) == 1:
return pack_acs_command(q=queue_helper, cmd_str=cmd_path_list[0])
assert len(cmd_path_list) >= 2
if cmd_path_list[1] == "mgt":
if cmd_path_list[0] == "mgt":
object_id = cast(ObjectIdU32, obj_id_man.get(IMTQ_HANDLER_ID))
return create_imtq_command(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
)
if cmd_path_list[1] == "str":
if cmd_path_list[0] == "str":
object_id = cast(ObjectIdU32, obj_id_man.get(STAR_TRACKER_ID))
assert len(cmd_path_list) >= 3
return pack_star_tracker_commands(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
)
if cmd_path_list[1] == "rws":
if cmd_path_list[0] == "rws":
assert len(cmd_path_list) >= 3
if cmd_path_list[2] == "rw_assy":
if cmd_path_list[1] == "rw_assy":
assert len(cmd_path_list) >= 4
return pack_rw_ass_cmds(
q=queue_helper, object_id=RW_ASSEMBLY, cmd_str=cmd_path_list[3]
q=queue_helper, object_id=RW_ASSEMBLY, cmd_str=cmd_path_list[2]
)
if cmd_path_list[2] == "rw_1":
if cmd_path_list[1] == "rw_1":
assert len(cmd_path_list) >= 4
return create_single_rw_cmd(
object_id=RW1_ID, rw_idx=1, q=queue_helper, cmd_str=cmd_path_list[3]
object_id=RW1_ID, rw_idx=1, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list[2] == "rw_2":
if cmd_path_list[1] == "rw_2":
assert len(cmd_path_list) >= 4
return create_single_rw_cmd(
object_id=RW2_ID, rw_idx=2, q=queue_helper, cmd_str=cmd_path_list[3]
object_id=RW2_ID, rw_idx=2, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list[2] == "rw_3":
if cmd_path_list[1] == "rw_3":
assert len(cmd_path_list) >= 4
return create_single_rw_cmd(
object_id=RW3_ID, rw_idx=3, q=queue_helper, cmd_str=cmd_path_list[3]
object_id=RW3_ID, rw_idx=3, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list[2] == "rw_4":
if cmd_path_list[1] == "rw_4":
assert len(cmd_path_list) >= 4
return create_single_rw_cmd(
object_id=RW4_ID, rw_idx=4, q=queue_helper, cmd_str=cmd_path_list[3]
object_id=RW4_ID, rw_idx=4, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list[1] == "gnss_devs":
assert len(cmd_path_list) >= 3
if cmd_path_list[0] == "gnss_devs":
return pack_gps_command(
object_id=oids.GPS_CONTROLLER, q=queue_helper, cmd_str=cmd_path_list[2]
object_id=oids.GPS_CONTROLLER, q=queue_helper, cmd_str=cmd_path_list[1]
)
if cmd_path_list[1:] == ["acs_brd_assy", "mgm_devs"]:
assert len(cmd_path_list) >= 4
return handle_mgm_cmd(q=queue_helper, cmd_str=cmd_path_list[3])
if cmd_path_list[1] == "str_img_helper":
if cmd_path_list[0] == "acs_brd_assy":
assert len(cmd_path_list) >= 3
if cmd_path_list[1] == "mgm_devs":
return handle_mgm_cmd(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list[1] == "gyro_devs":
assert len(cmd_path_list) >= 3
return handle_gyr_cmd(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list[0] == "str_img_helper":
object_id = cast(ObjectIdU32, obj_id_man.get(STR_IMG_HELPER_ID))
return pack_str_img_helper_command(
object_id=object_id, q=queue_helper, op_code=cmd_path_list[2]
object_id=object_id, q=queue_helper, op_code=cmd_path_list[1]
)
if cmd_path_list[1] == "acs_ctrl":
assert len(cmd_path_list) >= 3
return pack_acs_ctrl_command(queue_helper, cmd_path_list[2])
if cmd_path_list[1] == "gyro_devs":
assert len(cmd_path_list) >= 3
return handle_gyr_cmd(q=queue_helper, cmd_str=cmd_path_list[2])
return pack_acs_command(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list[0] == "acs_ctrl":
return pack_acs_ctrl_command(queue_helper, cmd_path_list[1])
def handle_payload_procedure(
@ -240,16 +240,16 @@ def handle_payload_procedure(
):
obj_id_man = get_object_ids()
assert len(cmd_path_list) >= 2
if cmd_path_list == "rad_sensor":
if cmd_path_list[0] == "rad_sensor":
assert len(cmd_path_list) >= 3
object_id = cast(ObjectIdU32, obj_id_man.get(RAD_SENSOR_ID))
return create_rad_sensor_cmd(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list[1] == "pl_pcdu":
if cmd_path_list[0] == "pl_pcdu":
assert len(cmd_path_list) >= 3
return pack_pl_pcdu_commands(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list[1] == "scex":
if cmd_path_list[0] == "scex":
assert len(cmd_path_list) >= 3
return pack_scex_cmds(
cmd_str=cmd_path_list[2],
@ -261,33 +261,30 @@ def handle_obdh_procedure(
queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]
):
assert len(cmd_path_list) >= 2
if cmd_path_list[1] == "core":
assert len(cmd_path_list) >= 3
return pack_core_commands(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list[1] == "xiphos_wdt":
return pack_wdt_commands(queue_helper, cmd_path_list[2])
if cmd_path_list[1] == "time":
if cmd_path_list[0] == "core":
return pack_core_commands(q=queue_helper, cmd_str=cmd_path_list[1])
if cmd_path_list[0] == "xiphos_wdt":
return pack_wdt_commands(queue_helper, cmd_path_list[1])
if cmd_path_list[0] == "time":
return pack_time_management_cmd(queue_helper, cmd_path_list[1])
def handle_com_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]):
obj_id_man = get_object_ids()
if len(cmd_path_list) == 1:
return build_com_subsystem_procedure(queue_helper, cmd_path_list[0])
assert len(cmd_path_list) >= 2
if cmd_path_list[1] == "ccsds":
if cmd_path_list[0] == "ccsds":
object_id = cast(ObjectIdU32, obj_id_man.get(CCSDS_HANDLER_ID))
assert len(cmd_path_list) >= 3
return pack_ccsds_handler_command(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list[1] == "pdec":
assert len(cmd_path_list) >= 3
if cmd_path_list[0] == "pdec":
return pack_pdec_handler_commands(
object_id=PDEC_HANDLER_ID, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list[1] == "syrlinks":
if cmd_path_list[0] == "syrlinks":
object_id = cast(ObjectIdU32, obj_id_man.get(SYRLINKS_HANDLER_ID))
assert len(cmd_path_list) >= 3
return pack_syrlinks_command(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
)

View File

@ -55,23 +55,23 @@ class Info:
ANNOUNCE_MODE_RECURSIVE = "Announce mode recursively"
def build_com_subsystem_cmd(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901
def build_com_subsystem_procedure(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901
prefix = "COM Subsystem"
if cmd_str == OpCode.RX_ONLY:
q.add_log_cmd(Info.RX_ONLY)
q.add_pus_tc(create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_ONLY, 0))
elif cmd_str == OpCode.TX_AND_RX_DEF_RATE:
q.add_log_cmd(Info.TX_AND_RX_DEF_DATARATE)
q.add_log_cmd(Info.TX_AND_RX_DEF_RATE)
q.add_pus_tc(
create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_DEF_DATARATE, 0)
)
elif cmd_str == OpCode.TX_AND_RX_LOW_RATE:
q.add_log_cmd(Info.TX_AND_RX_LOW_DATARATE)
q.add_log_cmd(Info.TX_AND_RX_LOW_RATE)
q.add_pus_tc(
create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_LOW_DATARATE, 0)
)
elif cmd_str == OpCode.TX_AND_RX_HIGH_RATE:
q.add_log_cmd(Info.TX_AND_RX_HIGH_DATARATE)
q.add_log_cmd(Info.TX_AND_RX_HIGH_RATE)
q.add_pus_tc(
create_mode_command(COM_SUBSYSTEM_ID, ComMode.RX_AND_TX_HIGH_DATARATE, 0)
)