"""HK Handling for EIVE OBSW""" import dataclasses import logging import sqlite3 from typing import List, cast from uuid import UUID from eive_tmtc.pus_tm.hk import HkTmInfo from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_hk_data from eive_tmtc.tmtc.internal_err_reporter import handle_ier_hk_data from eive_tmtc.tmtc.payload.ploc_mpsoc import handle_ploc_mpsoc_hk_data from eive_tmtc.tmtc.tcs.rtd import RTD_NAMES, handle_rtd_hk from eive_tmtc.tmtc.acs.star_tracker import handle_str_hk_data from eive_tmtc.tmtc.payload.plpcdu import handle_plpcdu_hk from eive_tmtc.tmtc.payload.rad_sensor import handle_rad_sensor_data from eive_tmtc.tmtc.acs.sus import handle_sus_hk from eive_tmtc.tmtc.payload.ploc_supervisor import handle_supv_hk_data from eive_tmtc.tmtc.acs.reaction_wheels import handle_rw_hk_data from eive_tmtc.tmtc.power.pwr_ctrl import handle_pwr_ctrl_hk_data from eive_tmtc.tmtc.com.syrlinks_handler import handle_syrlinks_hk_data from eive_tmtc.tmtc.tcs import handle_thermal_controller_hk_data from eive_tmtc.tmtc.tcs.tmp1075 import handle_tmp_1075_hk_data from tmtccmd.pus.tm.s3_fsfw_hk import ( Service3FsfwTm, ) from tmtccmd.pus.tm.s3_hk_base import HkContentType from tmtccmd.util.obj_id import ObjectIdU32, ObjectIdDictT from eive_tmtc.tmtc.power.bpx_batt import handle_bpx_hk_data from eive_tmtc.tmtc.acs.gps import handle_gps_data from eive_tmtc.tmtc.acs.gyros import handle_gyros_hk_data from eive_tmtc.tmtc.power.tm import ( handle_pdu_data, handle_p60_hk_data, handle_acu_hk_data, handle_pcdu_hk, ) from eive_tmtc.tmtc.acs.imtq import ( handle_imtq_hk, ) from eive_tmtc.pus_tm.defs import FsfwTmTcPrinter, PrintWrapper from eive_tmtc.tmtc.core import handle_core_hk_data from eive_tmtc.tmtc.acs.mgms import handle_mgm_hk_data import eive_tmtc.config.object_ids as obj_ids _LOGGER = logging.getLogger(__name__) FORWARD_SENSOR_TEMPS = False @dataclasses.dataclass class HkFilter: object_ids: List[ObjectIdU32] set_ids: List[int] def handle_hk_packet( raw_tm: bytes, packet_uuid: UUID, obj_id_dict: ObjectIdDictT, printer: FsfwTmTcPrinter, hk_filter: HkFilter, hk_level: int, db_con: sqlite3.Connection, ): tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False) named_obj_id = cast(ObjectIdU32, obj_id_dict.get(tm_packet.object_id.as_bytes)) if named_obj_id is None: named_obj_id = tm_packet.object_id if tm_packet.subservice == 25 or tm_packet.subservice == 26: hk_data = tm_packet.tm_data[8:] if named_obj_id in hk_filter.object_ids: handle_regular_hk_print( printer=printer, packet_uuid=packet_uuid, object_id=named_obj_id, hk_packet=tm_packet, hk_data=hk_data, db=db_con, ) return try: if hk_level >= 1: printer.generic_hk_tm_print( HkContentType.HK, named_obj_id, tm_packet.set_id, hk_data ) if hk_level >= 1: handle_regular_hk_print( printer=printer, packet_uuid=packet_uuid, object_id=named_obj_id, hk_packet=tm_packet, hk_data=hk_data, db=db_con, ) except ValueError as e: _LOGGER.exception( f"{e} error when parsing HK data coming from {named_obj_id}" ) if tm_packet.subservice == 10 or tm_packet.subservice == 12: _LOGGER.warning("HK definitions printout not implemented yet") def handle_regular_hk_print( # noqa C901: Complexity okay here hk_packet: Service3FsfwTm, packet_uuid: UUID, hk_data: bytes, db: sqlite3.Connection, object_id: ObjectIdU32, printer: FsfwTmTcPrinter, ): objb = object_id.as_bytes set_id = hk_packet.set_id hk_info = HkTmInfo( packet_uuid=packet_uuid, hk_packet=hk_packet, db_con=db, hk_data=hk_data ) assert hk_packet.pus_tm.time_provider is not None packet_dt = hk_packet.pus_tm.time_provider.as_date_time() pw = PrintWrapper(printer.file_logger) """This function is called when a Service 3 Housekeeping packet is received.""" if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: return handle_rw_hk_data(pw, object_id, set_id, hk_data) elif objb == obj_ids.SYRLINKS_HANDLER_ID: return handle_syrlinks_hk_data( hk_info=hk_info, pw=pw, ) elif objb == obj_ids.IMTQ_HANDLER_ID: return handle_imtq_hk(pw=pw, hk_data=hk_data, set_id=set_id) elif objb == obj_ids.GPS_CONTROLLER: return handle_gps_data( pw=pw, set_id=set_id, hk_data=hk_data, packet_time=packet_dt, ) elif objb == obj_ids.PCDU_HANDLER_ID: return handle_pcdu_hk(pw=pw, set_id=set_id, hk_data=hk_data) elif objb == obj_ids.BPX_HANDLER_ID: return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, pw=pw) elif objb == obj_ids.CORE_CONTROLLER_ID: return handle_core_hk_data(pw=pw, hk_data=hk_data, set_id=set_id) elif objb == obj_ids.PDU_1_HANDLER_ID: return handle_pdu_data( hk_info=hk_info, pw=pw, pdu_idx=1, set_id=set_id, hk_data=hk_data, ) elif objb == obj_ids.PDU_2_HANDLER_ID: return handle_pdu_data( hk_info=hk_info, pw=pw, pdu_idx=2, set_id=set_id, hk_data=hk_data, ) elif objb == obj_ids.PLOC_MPSOC_ID: return handle_ploc_mpsoc_hk_data(pw=pw, hk_data=hk_data, set_id=set_id) elif objb == obj_ids.ACU_HANDLER_ID: return handle_acu_hk_data(pw=pw, hk_data=hk_data, set_id=set_id) elif objb == obj_ids.INTERNAL_ERROR_REPORTER_ID: return handle_ier_hk_data(pw=pw, hk_data=hk_data, set_id=set_id) elif objb == obj_ids.RAD_SENSOR_ID: return handle_rad_sensor_data(pw=pw, hk_data=hk_data, set_id=set_id) elif objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: return handle_rw_hk_data( pw=pw, object_id=object_id, set_id=set_id, hk_data=hk_data ) if objb in [ obj_ids.SUS_0_N_LOC_XFYFZM_PT_XF, obj_ids.SUS_1_N_LOC_XBYFZM_PT_XB, obj_ids.SUS_2_N_LOC_XFYBZB_PT_YB, obj_ids.SUS_3_N_LOC_XFYBZF_PT_YF, obj_ids.SUS_4_N_LOC_XMYFZF_PT_ZF, obj_ids.SUS_5_N_LOC_XFYMZB_PT_ZB, obj_ids.SUS_6_R_LOC_XFYBZM_PT_XF, obj_ids.SUS_7_R_LOC_XBYBZM_PT_XB, obj_ids.SUS_8_R_LOC_XBYBZB_PT_YB, obj_ids.SUS_9_R_LOC_XBYBZB_PT_YF, obj_ids.SUS_10_R_LOC_XMYBZF_PT_ZF, obj_ids.SUS_11_R_LOC_XBYMZB_PT_ZB, ]: return handle_sus_hk(object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id) elif objb in RTD_NAMES.keys(): return handle_rtd_hk(object_id=objb, hk_data=hk_data, pw=pw) elif objb == obj_ids.P60_DOCK_HANDLER: return handle_p60_hk_data(pw=pw, set_id=set_id, hk_data=hk_data) elif objb in [ obj_ids.GYRO_0_ADIS_HANDLER_ID, obj_ids.GYRO_1_L3G_HANDLER_ID, obj_ids.GYRO_2_ADIS_HANDLER_ID, obj_ids.GYRO_3_L3G_HANDLER_ID, ]: return handle_gyros_hk_data( object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id ) elif objb in [ obj_ids.MGM_0_LIS3_HANDLER_ID, obj_ids.MGM_1_RM3100_HANDLER_ID, obj_ids.MGM_2_LIS3_HANDLER_ID, obj_ids.MGM_3_RM3100_HANDLER_ID, ]: return handle_mgm_hk_data( object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id ) elif objb == obj_ids.PL_PCDU_ID: return handle_plpcdu_hk(set_id=set_id, hk_data=hk_data, pw=pw) elif objb == obj_ids.THERMAL_CONTROLLER_ID: return handle_thermal_controller_hk_data( object_id=object_id, pw=pw, set_id=set_id, hk_data=hk_data ) elif objb == obj_ids.STAR_TRACKER_ID: return handle_str_hk_data(set_id=set_id, hk_data=hk_data, pw=pw) elif objb == obj_ids.PLOC_SUPV_ID: return handle_supv_hk_data(set_id=set_id, hk_data=hk_data, pw=pw) elif objb in [ obj_ids.TMP1075_HANDLER_TCS_BRD_0_ID, obj_ids.TMP1075_HANDLER_TCS_BRD_1_ID, obj_ids.TMP1075_HANDLER_IF_BRD_ID, obj_ids.TMP1075_HANDLER_PLPCDU_0_ID, obj_ids.TMP1075_HANDLER_PLPCDU_1_ID, ]: return handle_tmp_1075_hk_data(set_id=set_id, hk_data=hk_data, pw=pw) elif objb == obj_ids.ACS_CONTROLLER: return handle_acs_ctrl_hk_data( pw=pw, set_id=set_id, hk_data=hk_data, packet_time=packet_dt ) elif objb == obj_ids.PWR_CONTROLLER: return handle_pwr_ctrl_hk_data( pw=pw, set_id=set_id, hk_data=hk_data, packet_time=packet_dt ) else: _LOGGER.info( f"Service 3 TM: Parsing for object {object_id} and set ID {set_id} " "has not been implemented." )