cleaning up
EIVE/-/pipeline/pr-main This commit looks good Details

This commit is contained in:
Robin Müller 2023-11-22 19:13:22 +01:00
parent 21a38ae76a
commit 688602e486
Signed by: muellerr
GPG Key ID: A649FB78196E3849
2 changed files with 151 additions and 106 deletions

View File

@ -1,7 +1,7 @@
"""Hook function which packs telecommands based on service and operation code string """Hook function which packs telecommands based on service and operation code string
""" """
import logging import logging
from typing import cast from typing import cast, List
from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd
@ -70,53 +70,48 @@ from tmtccmd.util import ObjectIdU32
from eive_tmtc.utility.input_helper import InputHelper from eive_tmtc.utility.input_helper import InputHelper
def handle_default_procedure( # noqa C901: Complexity okay here. def handle_eps_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]):
info: DefaultProcedureInfo,
queue_helper: DefaultPusQueueHelper,
):
cmd_path = info.cmd_path
assert cmd_path is not None
cmd_path_list = cmd_path.split("/")
if cmd_path_list[0] == "/":
cmd_path_list = cmd_path_list[1:]
if len(cmd_path_list) == 0:
raise ValueError(
"command path list empty. Full command path {cmd_path} might have invalid format"
)
obj_id_man = get_object_ids() obj_id_man = get_object_ids()
if cmd_path_list == ["eps", "p60_dock"]: assert len(cmd_path_list) >= 1
if cmd_path_list[1] == "p60_dock":
object_id = cast(ObjectIdU32, obj_id_man.get(P60_DOCK_HANDLER)) object_id = cast(ObjectIdU32, obj_id_man.get(P60_DOCK_HANDLER))
assert len(cmd_path_list) >= 2 assert len(cmd_path_list) >= 2
return pack_p60dock_cmds( return pack_p60dock_cmds(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
) )
if cmd_path_list == ["tcs", "rtd_devs"]: if cmd_path_list[1] == "pdu1":
assert len(cmd_path_list) >= 2
return pack_rtd_commands(
object_id=None, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list == ["eps", "pdu1"]:
object_id = cast(ObjectIdU32, obj_id_man.get(PDU_1_HANDLER_ID)) object_id = cast(ObjectIdU32, obj_id_man.get(PDU_1_HANDLER_ID))
return pack_pdu1_commands( return pack_pdu1_commands(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
) )
if cmd_path_list == ["eps", "pdu2"]: if cmd_path_list[1] == "pdu2":
object_id = cast(ObjectIdU32, obj_id_man.get(PDU_2_HANDLER_ID)) object_id = cast(ObjectIdU32, obj_id_man.get(PDU_2_HANDLER_ID))
return pack_pdu2_commands( return pack_pdu2_commands(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
) )
if cmd_path_list == ["eps", "acu"]: if cmd_path_list[1] == "acu":
object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID)) object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID))
return pack_acu_commands( return pack_acu_commands(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
) )
if cmd_path_list == ["tcs"]: assert len(cmd_path_list) >= 2
assert len(cmd_path_list) >= 1 return pack_power_commands(queue_helper, cmd_path_list[1])
return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[1])
if cmd_path_list == ["tcs", "tcs_ctrl"]:
def handle_tcs_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]):
obj_id_man = get_object_ids()
if cmd_path_list[1] == "tcs_brd_assy":
assert len(cmd_path_list) >= 3
return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list[1] == "rtd_devs":
assert len(cmd_path_list) >= 2
return pack_rtd_commands(
object_id=None, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list[1] == "tcs_ctrl":
assert len(cmd_path_list) >= 2 assert len(cmd_path_list) >= 2
return pack_tcs_ctrl_commands(q=queue_helper, cmd_str=cmd_path_list[2]) return pack_tcs_ctrl_commands(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list == ["tcs", "tmp1075_devs"]: if cmd_path_list[1] == "tmp1075_devs":
menu_dict = { menu_dict = {
"0": ("TMP1075 TCS Board 0", TMP1075_HANDLER_TCS_BRD_0_ID), "0": ("TMP1075 TCS Board 0", TMP1075_HANDLER_TCS_BRD_0_ID),
"1": ("TMP1075 TCS Board 1", TMP1075_HANDLER_TCS_BRD_1_ID), "1": ("TMP1075 TCS Board 1", TMP1075_HANDLER_TCS_BRD_1_ID),
@ -132,113 +127,163 @@ def handle_default_procedure( # noqa C901: Complexity okay here.
return pack_tmp1075_test_into( return pack_tmp1075_test_into(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
) )
if cmd_path_list == ["tcs", "heaters"]: if cmd_path_list[1] == "heaters":
object_id = HEATER_CONTROLLER_ID object_id = HEATER_CONTROLLER_ID
return pack_heater_cmds( return pack_heater_cmds(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
) )
if cmd_path_list == ["acs", "mgt"]: return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[1])
def handle_acs_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]):
obj_id_man = get_object_ids()
assert len(cmd_path_list) >= 2
if cmd_path_list[1] == "mgt":
object_id = cast(ObjectIdU32, obj_id_man.get(IMTQ_HANDLER_ID)) object_id = cast(ObjectIdU32, obj_id_man.get(IMTQ_HANDLER_ID))
return create_imtq_command( return create_imtq_command(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
) )
if cmd_path_list == ["acs", "rw_assy", "rw_1"]: if cmd_path_list[1] == "str":
assert len(cmd_path_list) >= 4
return create_single_rw_cmd(
object_id=RW1_ID, rw_idx=1, q=queue_helper, cmd_str=cmd_path_list[3]
)
if cmd_path_list == ["acs", "rw_assy", "rw_2"]:
assert len(cmd_path_list) >= 4
return create_single_rw_cmd(
object_id=RW2_ID, rw_idx=2, q=queue_helper, cmd_str=cmd_path_list[3]
)
if cmd_path_list == ["acs", "rw_assy", "rw_3"]:
return create_single_rw_cmd(
object_id=RW3_ID, rw_idx=3, q=queue_helper, cmd_str=cmd_path_list[3]
)
if cmd_path_list == ["acs", "rw_assy", "rw_4"]:
return create_single_rw_cmd(
object_id=RW4_ID, rw_idx=4, q=queue_helper, cmd_str=cmd_path_list[3]
)
if cmd_path_list == ["acs", "acs_brd_assy", "mgm_devs"]:
assert len(cmd_path_list) >= 4
return handle_mgm_cmd(q=queue_helper, cmd_str=cmd_path_list[3])
if cmd_path_list == ["payload", "rad_sensor"]:
assert len(cmd_path_list) >= 3
object_id = cast(ObjectIdU32, obj_id_man.get(RAD_SENSOR_ID))
return create_rad_sensor_cmd(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list == ["acs", "str"]:
object_id = cast(ObjectIdU32, obj_id_man.get(STAR_TRACKER_ID)) object_id = cast(ObjectIdU32, obj_id_man.get(STAR_TRACKER_ID))
assert len(cmd_path_list) >= 3 assert len(cmd_path_list) >= 3
return pack_star_tracker_commands( return pack_star_tracker_commands(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2] object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
) )
if cmd_path_list == ["acs", "str_img_helper"]:
if cmd_path_list[1] == "rws":
assert len(cmd_path_list) >= 3
if cmd_path_list[2] == "rw_assy":
assert len(cmd_path_list) >= 4
return pack_rw_ass_cmds(
q=queue_helper, object_id=RW_ASSEMBLY, cmd_str=cmd_path_list[3]
)
if cmd_path_list[2] == "rw_1":
assert len(cmd_path_list) >= 4
return create_single_rw_cmd(
object_id=RW1_ID, rw_idx=1, q=queue_helper, cmd_str=cmd_path_list[3]
)
if cmd_path_list[2] == "rw_2":
assert len(cmd_path_list) >= 4
return create_single_rw_cmd(
object_id=RW2_ID, rw_idx=2, q=queue_helper, cmd_str=cmd_path_list[3]
)
if cmd_path_list[2] == "rw_3":
assert len(cmd_path_list) >= 4
return create_single_rw_cmd(
object_id=RW3_ID, rw_idx=3, q=queue_helper, cmd_str=cmd_path_list[3]
)
if cmd_path_list[2] == "rw_4":
assert len(cmd_path_list) >= 4
return create_single_rw_cmd(
object_id=RW4_ID, rw_idx=4, q=queue_helper, cmd_str=cmd_path_list[3]
)
if cmd_path_list[1] == "gnss_devs":
assert len(cmd_path_list) >= 3
return pack_gps_command(
object_id=oids.GPS_CONTROLLER, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list[1:] == ["acs_brd_assy", "mgm_devs"]:
assert len(cmd_path_list) >= 4
return handle_mgm_cmd(q=queue_helper, cmd_str=cmd_path_list[3])
if cmd_path_list[1] == "str_img_helper":
assert len(cmd_path_list) >= 3 assert len(cmd_path_list) >= 3
object_id = cast(ObjectIdU32, obj_id_man.get(STR_IMG_HELPER_ID)) object_id = cast(ObjectIdU32, obj_id_man.get(STR_IMG_HELPER_ID))
return pack_str_img_helper_command( return pack_str_img_helper_command(
object_id=object_id, q=queue_helper, op_code=cmd_path_list[2] object_id=object_id, q=queue_helper, op_code=cmd_path_list[2]
) )
if cmd_path_list == ["acs", "acs_ctrl"]: if cmd_path_list[1] == "acs_ctrl":
assert len(cmd_path_list) >= 3 assert len(cmd_path_list) >= 3
return pack_acs_ctrl_command(queue_helper, cmd_path_list[2]) return pack_acs_ctrl_command(queue_helper, cmd_path_list[2])
if cmd_path_list == ["obdh", "core"]: if cmd_path_list[1] == "gyro_devs":
assert len(cmd_path_list) >= 3
return pack_core_commands(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list == ["eps"]:
assert len(cmd_path_list) >= 2
return pack_power_commands(queue_helper, cmd_path_list[2])
if cmd_path_list == ["acs"]:
assert len(cmd_path_list) >= 2
return pack_acs_command(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list == ["acs", "gnss_devs"]:
assert len(cmd_path_list) >= 3
return pack_gps_command(
object_id=oids.GPS_CONTROLLER, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list == ["com", "ccsds"]:
object_id = cast(ObjectIdU32, obj_id_man.get(CCSDS_HANDLER_ID))
assert len(cmd_path_list) >= 3
return pack_ccsds_handler_command(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list == ["com", "pdec"]:
assert len(cmd_path_list) >= 3
return pack_pdec_handler_commands(
object_id=PDEC_HANDLER_ID, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list == ["com", "syrlinks"]:
object_id = cast(ObjectIdU32, obj_id_man.get(SYRLINKS_HANDLER_ID))
assert len(cmd_path_list) >= 3
return pack_syrlinks_command(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list == ["acs", "gyro_devs"]:
assert len(cmd_path_list) >= 3 assert len(cmd_path_list) >= 3
return handle_gyr_cmd(q=queue_helper, cmd_str=cmd_path_list[2]) return handle_gyr_cmd(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list == ["payload", "pl_pcdu"]: return pack_acs_command(q=queue_helper, cmd_str=cmd_path_list[2])
def handle_payload_procedure(
queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]
):
obj_id_man = get_object_ids()
assert len(cmd_path_list) >= 2
if cmd_path_list == "rad_sensor":
assert len(cmd_path_list) >= 3
object_id = cast(ObjectIdU32, obj_id_man.get(RAD_SENSOR_ID))
return create_rad_sensor_cmd(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list[1] == "pl_pcdu":
assert len(cmd_path_list) >= 3 assert len(cmd_path_list) >= 3
return pack_pl_pcdu_commands(q=queue_helper, cmd_str=cmd_path_list[2]) return pack_pl_pcdu_commands(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list == ["tcs", "tcs_brd_assy"]: if cmd_path_list[1] == "scex":
assert len(cmd_path_list) >= 3
return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list == ["acs", "rws", "rw_assy"]:
assert len(cmd_path_list) >= 4
return pack_rw_ass_cmds(
q=queue_helper, object_id=RW_ASSEMBLY, cmd_str=cmd_path_list[3]
)
if cmd_path_list == ["payload", "scex"]:
assert len(cmd_path_list) >= 3 assert len(cmd_path_list) >= 3
return pack_scex_cmds( return pack_scex_cmds(
cmd_str=cmd_path_list[2], cmd_str=cmd_path_list[2],
q=queue_helper, q=queue_helper,
) )
if cmd_path_list == ["core", "xiphos_wdt"]:
def handle_obdh_procedure(
queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]
):
assert len(cmd_path_list) >= 2
if cmd_path_list[1] == "core":
assert len(cmd_path_list) >= 3
return pack_core_commands(q=queue_helper, cmd_str=cmd_path_list[2])
if cmd_path_list[1] == "xiphos_wdt":
return pack_wdt_commands(queue_helper, cmd_path_list[2]) return pack_wdt_commands(queue_helper, cmd_path_list[2])
if "time" in cmd_path_list: if cmd_path_list[1] == "time":
return pack_time_management_cmd(queue_helper, cmd_path_list[1]) return pack_time_management_cmd(queue_helper, cmd_path_list[1])
def handle_com_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]):
obj_id_man = get_object_ids()
assert len(cmd_path_list) >= 2
if cmd_path_list[1] == "ccsds":
object_id = cast(ObjectIdU32, obj_id_man.get(CCSDS_HANDLER_ID))
assert len(cmd_path_list) >= 3
return pack_ccsds_handler_command(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list[1] == "pdec":
assert len(cmd_path_list) >= 3
return pack_pdec_handler_commands(
object_id=PDEC_HANDLER_ID, q=queue_helper, cmd_str=cmd_path_list[2]
)
if cmd_path_list[1] == "syrlinks":
object_id = cast(ObjectIdU32, obj_id_man.get(SYRLINKS_HANDLER_ID))
assert len(cmd_path_list) >= 3
return pack_syrlinks_command(
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
)
def handle_default_procedure( # noqa C901: Complexity okay here.
info: DefaultProcedureInfo,
queue_helper: DefaultPusQueueHelper,
):
cmd_path = info.cmd_path
assert cmd_path is not None
cmd_path_list = cmd_path.split("/")
if cmd_path_list[0] == "/":
cmd_path_list = cmd_path_list[1:]
if len(cmd_path_list) == 0:
raise ValueError(
"command path list empty. Full command path {cmd_path} might have invalid format"
)
if cmd_path_list[0] == "eps":
return handle_eps_procedure(queue_helper, cmd_path_list[1:])
if cmd_path_list[0] == "tcs":
return handle_tcs_procedure(queue_helper, cmd_path_list[1:])
if cmd_path_list[0] == "acs":
return handle_acs_procedure(queue_helper, cmd_path_list[1:])
if cmd_path_list[0] == "payload":
return handle_payload_procedure(queue_helper, cmd_path_list[1:])
if cmd_path_list[0] == "obdh":
return handle_obdh_procedure(queue_helper, cmd_path_list[1:])
if cmd_path_list[0] == "com":
return handle_com_procedure(queue_helper, cmd_path_list[1:])
logging.getLogger(__name__).warning( logging.getLogger(__name__).warning(
f"invalid or unimplemented command path {cmd_path}" f"invalid or unimplemented command path {cmd_path}"
) )

View File

@ -6,7 +6,7 @@ from eive_tmtc.config.definitions import (
PUS_APID, PUS_APID,
CFDP_LOCAL_ENTITY_ID, CFDP_LOCAL_ENTITY_ID,
) )
from eive_tmtc.pus_tc.procedure_packer import handle_default_procedure from eive_tmtc.pus_tc.cmd_demux import handle_default_procedure
from eive_tmtc.cfdp.handler import CfdpInCcsdsHandler from eive_tmtc.cfdp.handler import CfdpInCcsdsHandler
from tmtccmd import TcHandlerBase, ProcedureWrapper from tmtccmd import TcHandlerBase, ProcedureWrapper
from tmtccmd.cfdp.defs import CfdpRequestType from tmtccmd.cfdp.defs import CfdpRequestType
@ -69,7 +69,7 @@ class TcHandler(TcHandlerBase):
def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper):
self.queue_helper.queue_wrapper = wrapper.queue_wrapper self.queue_helper.queue_wrapper = wrapper.queue_wrapper
if info.proc_type == TcProcedureType.DEFAULT: if info.proc_type == TcProcedureType.DEFAULT:
handle_default_procedure(self, info.to_def_procedure(), self.queue_helper) handle_default_procedure(info.to_def_procedure(), self.queue_helper)
elif info.proc_type == TcProcedureType.CFDP: elif info.proc_type == TcProcedureType.CFDP:
self.handle_cfdp_procedure(info) self.handle_cfdp_procedure(info)