149 lines
5.8 KiB
Python
149 lines
5.8 KiB
Python
"""HK Handling for EIVE OBSW"""
|
|
import struct
|
|
|
|
from pus_tm.devs.rad_sensor import handle_rad_sensor_data
|
|
from pus_tm.system.tcs import handle_thermal_controller_hk_data, TM_TCP_SERVER
|
|
from tmtccmd.config.definitions import HkReplyUnpacked
|
|
from tmtccmd.tm.pus_3_fsfw_hk import (
|
|
Service3Base,
|
|
HkContentType,
|
|
Service3FsfwTm,
|
|
)
|
|
from tmtccmd.utility.obj_id import ObjectId, ObjectIdDictT
|
|
from tmtccmd.logging import get_console_logger
|
|
|
|
from pus_tm.devs.bpx_bat import handle_bpx_hk_data
|
|
from pus_tm.devs.gps import handle_gps_data
|
|
from pus_tm.devs.gyros import handle_gyros_hk_data
|
|
from pus_tm.devs.imtq_mgt import (
|
|
handle_self_test_data,
|
|
handle_eng_set,
|
|
handle_calibrated_mtm_measurement,
|
|
handle_raw_mtm_measurement,
|
|
)
|
|
from pus_tm.devs.pcdu import handle_pdu_data, handle_p60_hk_data, handle_acu_hk_data
|
|
from pus_tm.devs.syrlinks import handle_syrlinks_hk_data
|
|
from pus_tc.devs.imtq import ImtqSetIds
|
|
from pus_tm.devs.reaction_wheels import handle_rw_hk_data
|
|
from pus_tm.defs import FsfwTmTcPrinter, log_to_both
|
|
from pus_tm.system.core import handle_core_hk_data
|
|
from pus_tm.devs.mgms import handle_mgm_hk_data
|
|
import config.object_ids as obj_ids
|
|
|
|
|
|
LOGGER = get_console_logger()
|
|
|
|
|
|
def handle_hk_packet(
|
|
raw_tm: bytes,
|
|
obj_id_dict: ObjectIdDictT,
|
|
printer: FsfwTmTcPrinter,
|
|
):
|
|
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
|
|
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
|
|
if named_obj_id is None:
|
|
named_obj_id = tm_packet.object_id
|
|
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
|
|
hk_data = tm_packet.tm_data[8:]
|
|
TM_TCP_SERVER.report_raw_hk_data(
|
|
object_id=named_obj_id, set_id=tm_packet.set_id, hk_data=hk_data
|
|
)
|
|
printer.generic_hk_tm_print(
|
|
content_type=HkContentType.HK,
|
|
object_id=named_obj_id,
|
|
set_id=tm_packet.set_id,
|
|
hk_data=hk_data,
|
|
)
|
|
try:
|
|
handle_regular_hk_print(
|
|
printer=printer,
|
|
object_id=named_obj_id,
|
|
hk_packet=tm_packet,
|
|
hk_data=hk_data,
|
|
)
|
|
except ValueError as e:
|
|
LOGGER.exception(
|
|
f"{e} error when parsing HK data coming from {named_obj_id}"
|
|
)
|
|
if tm_packet.subservice == 10 or tm_packet.subservice == 12:
|
|
LOGGER.warning("HK definitions printout not implemented yet")
|
|
|
|
|
|
def handle_regular_hk_print(
|
|
printer: FsfwTmTcPrinter,
|
|
object_id: ObjectId,
|
|
hk_packet: Service3Base,
|
|
hk_data: bytes,
|
|
):
|
|
objb = object_id.as_bytes
|
|
set_id = hk_packet.set_id
|
|
"""This function is called when a Service 3 Housekeeping packet is received."""
|
|
if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
|
|
handle_rw_hk_data(printer, object_id, set_id, hk_data)
|
|
if objb == obj_ids.SYRLINKS_HANDLER_ID:
|
|
handle_syrlinks_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)
|
|
if objb == obj_ids.IMTQ_HANDLER_ID:
|
|
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (
|
|
set_id <= ImtqSetIds.NEGATIVE_Z_TEST
|
|
):
|
|
return handle_self_test_data(printer, hk_data)
|
|
elif set_id == ImtqSetIds.ENG_HK_SET:
|
|
return handle_eng_set(printer, hk_data)
|
|
elif set_id == ImtqSetIds.CAL_MTM_SET:
|
|
return handle_calibrated_mtm_measurement(printer, hk_data)
|
|
elif set_id == ImtqSetIds.RAW_MTM_SET:
|
|
return handle_raw_mtm_measurement(printer, hk_data)
|
|
else:
|
|
LOGGER.info("Service 3 TM: IMTQ handler reply with unknown set id")
|
|
if objb == obj_ids.GPS_HANDLER_0_ID or object_id == obj_ids.GPS_HANDLER_1_ID:
|
|
handle_gps_data(printer=printer, hk_data=hk_data)
|
|
if objb == obj_ids.BPX_HANDLER_ID:
|
|
handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer)
|
|
if objb == obj_ids.CORE_CONTROLLER_ID:
|
|
return handle_core_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)
|
|
if objb == obj_ids.PDU_1_HANDLER_ID:
|
|
return handle_pdu_data(
|
|
printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data
|
|
)
|
|
if objb == obj_ids.PDU_2_HANDLER_ID:
|
|
return handle_pdu_data(
|
|
printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data
|
|
)
|
|
if objb == obj_ids.ACU_HANDLER_ID:
|
|
return handle_acu_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)
|
|
if objb == obj_ids.RAD_SENSOR_ID:
|
|
return handle_rad_sensor_data(printer=printer, hk_data=hk_data, set_id=set_id)
|
|
if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
|
|
return handle_rw_hk_data(
|
|
printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data
|
|
)
|
|
if objb == obj_ids.P60_DOCK_HANDLER:
|
|
handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data)
|
|
if objb in [
|
|
obj_ids.GYRO_0_ADIS_HANDLER_ID,
|
|
obj_ids.GYRO_1_L3G_HANDLER_ID,
|
|
obj_ids.GYRO_2_ADIS_HANDLER_ID,
|
|
obj_ids.GYRO_3_L3G_HANDLER_ID,
|
|
]:
|
|
handle_gyros_hk_data(
|
|
object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id
|
|
)
|
|
if objb in [
|
|
obj_ids.MGM_0_LIS3_HANDLER_ID,
|
|
obj_ids.MGM_1_RM3100_HANDLER_ID,
|
|
obj_ids.MGM_2_LIS3_HANDLER_ID,
|
|
obj_ids.MGM_3_RM3100_HANDLER_ID,
|
|
]:
|
|
handle_mgm_hk_data(
|
|
object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id
|
|
)
|
|
if objb == obj_ids.PL_PCDU_ID:
|
|
log_to_both(printer, "Received PL PCDU HK data")
|
|
if objb == obj_ids.THERMAL_CONTROLLER_ID:
|
|
handle_thermal_controller_hk_data(
|
|
object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data
|
|
)
|
|
else:
|
|
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
|
|
return HkReplyUnpacked()
|