295 lines
12 KiB
Python
295 lines
12 KiB
Python
"""Hook function which packs telecommands based on service and operation code string
|
|
"""
|
|
import logging
|
|
from typing import List, cast
|
|
|
|
from tmtccmd import DefaultProcedureInfo
|
|
from tmtccmd.tmtc import DefaultPusQueueHelper
|
|
from tmtccmd.util import ObjectIdU32
|
|
|
|
import eive_tmtc.config.object_ids as oids
|
|
from eive_tmtc.config.object_ids import (
|
|
ACU_HANDLER_ID,
|
|
CCSDS_HANDLER_ID,
|
|
HEATER_CONTROLLER_ID,
|
|
IMTQ_HANDLER_ID,
|
|
P60_DOCK_HANDLER,
|
|
PDEC_HANDLER_ID,
|
|
PDU_1_HANDLER_ID,
|
|
PDU_2_HANDLER_ID,
|
|
RAD_SENSOR_ID,
|
|
RW1_ID,
|
|
RW2_ID,
|
|
RW3_ID,
|
|
RW4_ID,
|
|
RW_ASSEMBLY,
|
|
STAR_TRACKER_ID,
|
|
STR_IMG_HELPER_ID,
|
|
SYRLINKS_HANDLER_ID,
|
|
TMP1075_HANDLER_IF_BRD_ID,
|
|
TMP1075_HANDLER_PLPCDU_0_ID,
|
|
TMP1075_HANDLER_TCS_BRD_0_ID,
|
|
TMP1075_HANDLER_TCS_BRD_1_ID,
|
|
get_object_ids,
|
|
)
|
|
from eive_tmtc.tmtc.acs.acs_board import pack_acs_command
|
|
from eive_tmtc.tmtc.acs.acs_ctrl import pack_acs_ctrl_command
|
|
from eive_tmtc.tmtc.acs.gps import pack_gps_command
|
|
from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd
|
|
from eive_tmtc.tmtc.acs.imtq import create_imtq_command
|
|
from eive_tmtc.tmtc.acs.mgms import handle_mgm_cmd
|
|
from eive_tmtc.tmtc.acs.reaction_wheels import (
|
|
create_single_rw_cmd,
|
|
pack_rw_ass_cmds,
|
|
)
|
|
from eive_tmtc.tmtc.acs.star_tracker import pack_star_tracker_commands
|
|
from eive_tmtc.tmtc.acs.str_img_helper import pack_str_img_helper_command
|
|
from eive_tmtc.tmtc.com.ccsds_handler import pack_ccsds_handler_command
|
|
from eive_tmtc.tmtc.com.pdec_handler import pack_pdec_handler_commands
|
|
from eive_tmtc.tmtc.com.subsystem import build_com_subsystem_procedure
|
|
from eive_tmtc.tmtc.com.syrlinks_handler import pack_syrlinks_command
|
|
from eive_tmtc.tmtc.core import pack_core_commands
|
|
from eive_tmtc.tmtc.health import build_health_cmds
|
|
from eive_tmtc.tmtc.payload.plpcdu import pack_pl_pcdu_commands
|
|
from eive_tmtc.tmtc.payload.rad_sensor import create_rad_sensor_cmd
|
|
from eive_tmtc.tmtc.payload.scex import pack_scex_cmds
|
|
from eive_tmtc.tmtc.power.acu import pack_acu_commands
|
|
from eive_tmtc.tmtc.power.p60dock import pack_p60dock_cmds
|
|
from eive_tmtc.tmtc.power.pdu1 import pack_pdu1_commands
|
|
from eive_tmtc.tmtc.power.pdu2 import pack_pdu2_commands
|
|
from eive_tmtc.tmtc.power.power import pack_power_commands
|
|
from eive_tmtc.tmtc.system import build_system_cmds
|
|
from eive_tmtc.tmtc.tcs.ctrl import pack_tcs_ctrl_commands
|
|
from eive_tmtc.tmtc.tcs.heater import pack_heater_cmds
|
|
from eive_tmtc.tmtc.tcs.rtd import pack_rtd_commands
|
|
from eive_tmtc.tmtc.tcs.subsystem import pack_tcs_sys_commands
|
|
from eive_tmtc.tmtc.tcs.tmp1075 import pack_tmp1075_test_into
|
|
from eive_tmtc.tmtc.test import build_test_commands
|
|
from eive_tmtc.tmtc.time import pack_time_management_cmd
|
|
from eive_tmtc.tmtc.wdt import pack_wdt_commands
|
|
from eive_tmtc.utility.input_helper import InputHelper
|
|
|
|
|
|
def handle_pus_procedure( # noqa C901: Complexity okay here.
|
|
info: DefaultProcedureInfo,
|
|
queue_helper: DefaultPusQueueHelper,
|
|
):
|
|
cmd_path = info.cmd_path
|
|
assert cmd_path is not None
|
|
cmd_path_list = cmd_path.split("/")
|
|
if cmd_path_list[0] == "":
|
|
cmd_path_list = cmd_path_list[1:]
|
|
if len(cmd_path_list) == 0:
|
|
raise ValueError(
|
|
"command path list empty. Full command path {cmd_path} might have invalid format"
|
|
)
|
|
if cmd_path_list[0] == "system":
|
|
assert len(cmd_path_list) >= 1
|
|
return build_system_cmds(queue_helper, cmd_path_list[1])
|
|
if cmd_path_list[0] == "health":
|
|
assert len(cmd_path_list) >= 1
|
|
return build_health_cmds(queue_helper, cmd_path_list[1])
|
|
if cmd_path_list[0] == "eps":
|
|
return handle_eps_procedure(queue_helper, cmd_path_list[1:])
|
|
if cmd_path_list[0] == "tcs":
|
|
return handle_tcs_procedure(queue_helper, cmd_path_list[1:])
|
|
if cmd_path_list[0] == "acs":
|
|
return handle_acs_procedure(queue_helper, cmd_path_list[1:])
|
|
if cmd_path_list[0] == "payload":
|
|
return handle_payload_procedure(queue_helper, cmd_path_list[1:])
|
|
if cmd_path_list[0] == "obdh":
|
|
return handle_obdh_procedure(queue_helper, cmd_path_list[1:])
|
|
if cmd_path_list[0] == "test":
|
|
assert len(cmd_path_list) >= 1
|
|
return build_test_commands(queue_helper, cmd_path_list[1])
|
|
if cmd_path_list[0] == "com":
|
|
return handle_com_procedure(queue_helper, cmd_path_list[1:])
|
|
logging.getLogger(__name__).warning(
|
|
f"invalid or unimplemented command path {cmd_path}"
|
|
)
|
|
|
|
|
|
def handle_eps_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]):
|
|
obj_id_man = get_object_ids()
|
|
if len(cmd_path_list) == 1:
|
|
return pack_power_commands(queue_helper, cmd_path_list[0])
|
|
assert len(cmd_path_list) >= 2
|
|
if cmd_path_list[0] == "p60_dock":
|
|
object_id = cast(ObjectIdU32, obj_id_man.get(P60_DOCK_HANDLER))
|
|
assert len(cmd_path_list) >= 2
|
|
return pack_p60dock_cmds(
|
|
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
|
|
)
|
|
if cmd_path_list[0] == "pdu1":
|
|
object_id = cast(ObjectIdU32, obj_id_man.get(PDU_1_HANDLER_ID))
|
|
return pack_pdu1_commands(
|
|
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
|
|
)
|
|
if cmd_path_list[0] == "pdu2":
|
|
object_id = cast(ObjectIdU32, obj_id_man.get(PDU_2_HANDLER_ID))
|
|
return pack_pdu2_commands(
|
|
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
|
|
)
|
|
if cmd_path_list[0] == "acu":
|
|
object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID))
|
|
return pack_acu_commands(
|
|
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
|
|
)
|
|
|
|
|
|
def handle_tcs_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]):
|
|
obj_id_man = get_object_ids()
|
|
if len(cmd_path_list) == 1:
|
|
return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[0])
|
|
assert len(cmd_path_list) >= 2
|
|
if cmd_path_list[0] == "tcs_brd_assy":
|
|
return pack_tcs_sys_commands(q=queue_helper, cmd_str=cmd_path_list[1])
|
|
if cmd_path_list[0] == "rtd_devs":
|
|
assert len(cmd_path_list) >= 2
|
|
return pack_rtd_commands(
|
|
object_id=None, q=queue_helper, cmd_str=cmd_path_list[1]
|
|
)
|
|
if cmd_path_list[0] == "tcs_ctrl":
|
|
assert len(cmd_path_list) >= 2
|
|
return pack_tcs_ctrl_commands(q=queue_helper, cmd_str=cmd_path_list[2])
|
|
if cmd_path_list[0] == "tmp1075_devs":
|
|
menu_dict = {
|
|
"0": ("TMP1075 TCS Board 0", TMP1075_HANDLER_TCS_BRD_0_ID),
|
|
"1": ("TMP1075 TCS Board 1", TMP1075_HANDLER_TCS_BRD_1_ID),
|
|
"2": ("TMP1075 PL PCDU 0", TMP1075_HANDLER_PLPCDU_0_ID),
|
|
"3": ("TMP1075 PL PCDU 1", oids.TMP1075_HANDLER_PLPCDU_1_ID),
|
|
"4": ("TMP1075 IF Board", TMP1075_HANDLER_IF_BRD_ID),
|
|
}
|
|
input_helper = InputHelper(menu_dict)
|
|
tmp_select = input_helper.get_key()
|
|
assert len(cmd_path_list) >= 2
|
|
object_id = obj_id_man.get(menu_dict[tmp_select][1])
|
|
assert object_id is not None
|
|
return pack_tmp1075_test_into(
|
|
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
|
|
)
|
|
if cmd_path_list[0] == "heaters":
|
|
object_id = HEATER_CONTROLLER_ID
|
|
return pack_heater_cmds(
|
|
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
|
|
)
|
|
|
|
|
|
def handle_acs_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]):
|
|
obj_id_man = get_object_ids()
|
|
if len(cmd_path_list) == 1:
|
|
return pack_acs_command(q=queue_helper, cmd_str=cmd_path_list[0])
|
|
assert len(cmd_path_list) >= 2
|
|
if cmd_path_list[0] == "mgt":
|
|
object_id = cast(ObjectIdU32, obj_id_man.get(IMTQ_HANDLER_ID))
|
|
return create_imtq_command(
|
|
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
|
|
)
|
|
if cmd_path_list[0] == "str":
|
|
object_id = cast(ObjectIdU32, obj_id_man.get(STAR_TRACKER_ID))
|
|
return pack_star_tracker_commands(
|
|
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[1]
|
|
)
|
|
|
|
if cmd_path_list[0] == "rws":
|
|
assert len(cmd_path_list) >= 3
|
|
if cmd_path_list[1] == "rw_assy":
|
|
assert len(cmd_path_list) >= 4
|
|
return pack_rw_ass_cmds(
|
|
q=queue_helper, object_id=RW_ASSEMBLY, cmd_str=cmd_path_list[2]
|
|
)
|
|
if cmd_path_list[1] == "rw_1":
|
|
assert len(cmd_path_list) >= 4
|
|
return create_single_rw_cmd(
|
|
object_id=RW1_ID, rw_idx=1, q=queue_helper, cmd_str=cmd_path_list[2]
|
|
)
|
|
if cmd_path_list[1] == "rw_2":
|
|
assert len(cmd_path_list) >= 4
|
|
return create_single_rw_cmd(
|
|
object_id=RW2_ID, rw_idx=2, q=queue_helper, cmd_str=cmd_path_list[2]
|
|
)
|
|
if cmd_path_list[1] == "rw_3":
|
|
assert len(cmd_path_list) >= 4
|
|
return create_single_rw_cmd(
|
|
object_id=RW3_ID, rw_idx=3, q=queue_helper, cmd_str=cmd_path_list[2]
|
|
)
|
|
if cmd_path_list[1] == "rw_4":
|
|
assert len(cmd_path_list) >= 4
|
|
return create_single_rw_cmd(
|
|
object_id=RW4_ID, rw_idx=4, q=queue_helper, cmd_str=cmd_path_list[2]
|
|
)
|
|
|
|
if cmd_path_list[0] == "gnss_devs":
|
|
return pack_gps_command(
|
|
object_id=oids.GPS_CONTROLLER, q=queue_helper, cmd_str=cmd_path_list[1]
|
|
)
|
|
if cmd_path_list[0] == "acs_brd_assy":
|
|
assert len(cmd_path_list) >= 3
|
|
if cmd_path_list[1] == "mgm_devs":
|
|
return handle_mgm_cmd(q=queue_helper, cmd_str=cmd_path_list[2])
|
|
if cmd_path_list[1] == "gyro_devs":
|
|
assert len(cmd_path_list) >= 3
|
|
return handle_gyr_cmd(q=queue_helper, cmd_str=cmd_path_list[2])
|
|
if cmd_path_list[0] == "str_img_helper":
|
|
object_id = cast(ObjectIdU32, obj_id_man.get(STR_IMG_HELPER_ID))
|
|
return pack_str_img_helper_command(
|
|
object_id=object_id, q=queue_helper, op_code=cmd_path_list[1]
|
|
)
|
|
if cmd_path_list[0] == "acs_ctrl":
|
|
return pack_acs_ctrl_command(queue_helper, cmd_path_list[1])
|
|
|
|
|
|
def handle_payload_procedure(
|
|
queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]
|
|
):
|
|
obj_id_man = get_object_ids()
|
|
assert len(cmd_path_list) >= 2
|
|
if cmd_path_list[0] == "rad_sensor":
|
|
assert len(cmd_path_list) >= 3
|
|
object_id = cast(ObjectIdU32, obj_id_man.get(RAD_SENSOR_ID))
|
|
return create_rad_sensor_cmd(
|
|
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
|
|
)
|
|
if cmd_path_list[0] == "pl_pcdu":
|
|
assert len(cmd_path_list) >= 3
|
|
return pack_pl_pcdu_commands(q=queue_helper, cmd_str=cmd_path_list[2])
|
|
if cmd_path_list[0] == "scex":
|
|
assert len(cmd_path_list) >= 3
|
|
return pack_scex_cmds(
|
|
cmd_str=cmd_path_list[2],
|
|
q=queue_helper,
|
|
)
|
|
|
|
|
|
def handle_obdh_procedure(
|
|
queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]
|
|
):
|
|
assert len(cmd_path_list) >= 2
|
|
if cmd_path_list[0] == "core":
|
|
return pack_core_commands(q=queue_helper, cmd_str=cmd_path_list[1])
|
|
if cmd_path_list[0] == "xiphos_wdt":
|
|
return pack_wdt_commands(queue_helper, cmd_path_list[1])
|
|
if cmd_path_list[0] == "time":
|
|
return pack_time_management_cmd(queue_helper, cmd_path_list[1])
|
|
|
|
|
|
def handle_com_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: List[str]):
|
|
obj_id_man = get_object_ids()
|
|
if len(cmd_path_list) == 1:
|
|
return build_com_subsystem_procedure(queue_helper, cmd_path_list[0])
|
|
assert len(cmd_path_list) >= 2
|
|
if cmd_path_list[0] == "ccsds":
|
|
object_id = cast(ObjectIdU32, obj_id_man.get(CCSDS_HANDLER_ID))
|
|
return pack_ccsds_handler_command(
|
|
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
|
|
)
|
|
if cmd_path_list[0] == "pdec":
|
|
return pack_pdec_handler_commands(
|
|
object_id=PDEC_HANDLER_ID, q=queue_helper, cmd_str=cmd_path_list[2]
|
|
)
|
|
if cmd_path_list[0] == "syrlinks":
|
|
object_id = cast(ObjectIdU32, obj_id_man.get(SYRLINKS_HANDLER_ID))
|
|
return pack_syrlinks_command(
|
|
object_id=object_id, q=queue_helper, cmd_str=cmd_path_list[2]
|
|
)
|