"""HK Handling for EIVE OBSW""" import struct from pus_tm.devs.rad_sensor import handle_rad_sensor_data from pus_tm.devs.sus import handle_sus_hk from pus_tm.system.tcs import handle_thermal_controller_hk_data, TM_TCP_SERVER from tmtccmd.config.definitions import HkReplyUnpacked from tmtccmd.tm.pus_3_fsfw_hk import ( Service3Base, HkContentType, Service3FsfwTm, ) from tmtccmd.utility.obj_id import ObjectId, ObjectIdDictT from tmtccmd.logging import get_console_logger from pus_tm.devs.bpx_bat import handle_bpx_hk_data from pus_tm.devs.gps import handle_gps_data from pus_tm.devs.gyros import handle_gyros_hk_data from pus_tm.devs.imtq_mgt import ( handle_self_test_data, handle_eng_set, handle_calibrated_mtm_measurement, handle_raw_mtm_measurement, ) from pus_tm.devs.pcdu import handle_pdu_data, handle_p60_hk_data, handle_acu_hk_data from pus_tm.devs.syrlinks import handle_syrlinks_hk_data from pus_tc.devs.imtq import ImtqSetIds from pus_tm.devs.reaction_wheels import handle_rw_hk_data from pus_tm.defs import FsfwTmTcPrinter, log_to_both from pus_tm.system.core import handle_core_hk_data from pus_tm.devs.mgms import handle_mgm_hk_data import config.object_ids as obj_ids LOGGER = get_console_logger() def handle_hk_packet( raw_tm: bytes, obj_id_dict: ObjectIdDictT, printer: FsfwTmTcPrinter, ): tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False) named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes) if named_obj_id is None: named_obj_id = tm_packet.object_id if tm_packet.subservice == 25 or tm_packet.subservice == 26: hk_data = tm_packet.tm_data[8:] TM_TCP_SERVER.report_raw_hk_data( object_id=named_obj_id, set_id=tm_packet.set_id, hk_data=hk_data ) printer.generic_hk_tm_print( content_type=HkContentType.HK, object_id=named_obj_id, set_id=tm_packet.set_id, hk_data=hk_data, ) try: handle_regular_hk_print( printer=printer, object_id=named_obj_id, hk_packet=tm_packet, hk_data=hk_data, ) except ValueError as e: LOGGER.exception( f"{e} error when parsing HK data coming from {named_obj_id}" ) if tm_packet.subservice == 10 or tm_packet.subservice == 12: LOGGER.warning("HK definitions printout not implemented yet") def handle_regular_hk_print( printer: FsfwTmTcPrinter, object_id: ObjectId, hk_packet: Service3Base, hk_data: bytes, ): objb = object_id.as_bytes set_id = hk_packet.set_id """This function is called when a Service 3 Housekeeping packet is received.""" if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: handle_rw_hk_data(printer, object_id, set_id, hk_data) elif objb == obj_ids.SYRLINKS_HANDLER_ID: handle_syrlinks_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) elif objb == obj_ids.IMTQ_HANDLER_ID: if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and ( set_id <= ImtqSetIds.NEGATIVE_Z_TEST ): return handle_self_test_data(printer, hk_data) elif set_id == ImtqSetIds.ENG_HK_SET: return handle_eng_set(printer, hk_data) elif set_id == ImtqSetIds.CAL_MTM_SET: return handle_calibrated_mtm_measurement(printer, hk_data) elif set_id == ImtqSetIds.RAW_MTM_SET: return handle_raw_mtm_measurement(printer, hk_data) else: LOGGER.info("Service 3 TM: IMTQ handler reply with unknown set id") elif objb == obj_ids.GPS_CONTROLLER: return handle_gps_data(printer=printer, hk_data=hk_data) if objb == obj_ids.BPX_HANDLER_ID: handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer) if objb == obj_ids.CORE_CONTROLLER_ID: return handle_core_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) elif objb == obj_ids.PDU_1_HANDLER_ID: return handle_pdu_data( printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data ) elif objb == obj_ids.PDU_2_HANDLER_ID: return handle_pdu_data( printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data ) elif objb == obj_ids.ACU_HANDLER_ID: return handle_acu_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) elif objb == obj_ids.RAD_SENSOR_ID: return handle_rad_sensor_data(printer=printer, hk_data=hk_data, set_id=set_id) elif objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: return handle_rw_hk_data( printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data ) if objb in [ obj_ids.SUS_0_N_LOC_XFYFZM_PT_XF, obj_ids.SUS_1_N_LOC_XBYFZM_PT_XB, obj_ids.SUS_2_N_LOC_XFYBZB_PT_YB, obj_ids.SUS_3_N_LOC_XFYBZF_PT_YF, obj_ids.SUS_4_N_LOC_XMYFZF_PT_ZF, obj_ids.SUS_5_N_LOC_XFYMZB_PT_ZB, obj_ids.SUS_6_R_LOC_XFYBZM_PT_XF, obj_ids.SUS_7_R_LOC_XBYBZM_PT_XB, obj_ids.SUS_8_R_LOC_XBYBZB_PT_YB, obj_ids.SUS_9_R_LOC_XBYBZB_PT_YF, obj_ids.SUS_10_R_LOC_XMYBZF_PT_ZF, obj_ids.SUS_11_R_LOC_XBYMZB_PT_ZB, ]: handle_sus_hk( object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id ) if objb == obj_ids.P60_DOCK_HANDLER: handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data) elif objb in [ obj_ids.GYRO_0_ADIS_HANDLER_ID, obj_ids.GYRO_1_L3G_HANDLER_ID, obj_ids.GYRO_2_ADIS_HANDLER_ID, obj_ids.GYRO_3_L3G_HANDLER_ID, ]: handle_gyros_hk_data( object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id ) elif objb in [ obj_ids.MGM_0_LIS3_HANDLER_ID, obj_ids.MGM_1_RM3100_HANDLER_ID, obj_ids.MGM_2_LIS3_HANDLER_ID, obj_ids.MGM_3_RM3100_HANDLER_ID, ]: handle_mgm_hk_data( object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id ) elif objb == obj_ids.PL_PCDU_ID: log_to_both(printer, "Received PL PCDU HK data") elif objb == obj_ids.THERMAL_CONTROLLER_ID: handle_thermal_controller_hk_data( object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data ) else: LOGGER.info( f"Service 3 TM: Parsing for object {object_id} and set ID {set_id} " f"has not been implemented." )