"""HK Handling for EIVE OBSW""" import dataclasses import logging import base64 import sqlite3 import struct from typing import List, cast from uuid import UUID from spacepackets.ccsds.time import CdsShortTimestamp from spacepackets.ecss import PusTm from eive_tmtc.config.definitions import PRINT_RAW_HK_B64_STR from eive_tmtc.pus_tm.hk import HkTmInfo from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_hk_data from eive_tmtc.tmtc.internal_err_reporter import handle_ier_hk_data from eive_tmtc.tmtc.payload.ploc_mpsoc import handle_ploc_mpsoc_hk_data from eive_tmtc.tmtc.tcs.rtd import RTD_NAMES, handle_rtd_hk from eive_tmtc.tmtc.acs.star_tracker import handle_str_hk_data from eive_tmtc.tmtc.payload.plpcdu import handle_plpcdu_hk from eive_tmtc.tmtc.payload.rad_sensor import handle_rad_sensor_data from eive_tmtc.tmtc.acs.sus import handle_sus_hk from eive_tmtc.tmtc.payload.ploc_supervisor import handle_supv_hk_data from eive_tmtc.tmtc.acs.reaction_wheels import handle_rw_hk_data from eive_tmtc.tmtc.power.pwr_ctrl import handle_pwr_ctrl_hk_data from eive_tmtc.tmtc.com.syrlinks_handler import handle_syrlinks_hk_data from eive_tmtc.tmtc.tcs import handle_thermal_controller_hk_data from eive_tmtc.tmtc.tcs.tmp1075 import handle_tmp_1075_hk_data from tmtccmd.pus.tm.s3_hk_base import HkContentType from tmtccmd.util.obj_id import ObjectIdU32, ObjectIdDictT from eive_tmtc.tmtc.power.bpx_batt import handle_bpx_hk_data from eive_tmtc.tmtc.acs.gps import handle_gps_data from eive_tmtc.tmtc.acs.gyros import handle_gyros_hk_data from eive_tmtc.tmtc.power.tm import ( handle_pdu_data, handle_p60_hk_data, handle_acu_hk_data, handle_pcdu_hk, ) from eive_tmtc.tmtc.acs.imtq import ( handle_imtq_hk, ) from eive_tmtc.pus_tm.defs import FsfwTmTcPrinter, PrintWrapper from eive_tmtc.tmtc.core import handle_core_hk_data from eive_tmtc.tmtc.acs.mgms import handle_mgm_hk_data import eive_tmtc.config.object_ids as obj_ids _LOGGER = logging.getLogger(__name__) FORWARD_SENSOR_TEMPS = False @dataclasses.dataclass class HkFilter: object_ids: List[bytes] set_ids: List[int] def handle_hk_packet( raw_tm: bytes, packet_uuid: UUID, obj_id_dict: ObjectIdDictT, printer: FsfwTmTcPrinter, hk_filter: HkFilter, hk_level: int, db_con: sqlite3.Connection, ): tm_packet = PusTm.unpack(raw_tm, CdsShortTimestamp.TIMESTAMP_SIZE) obj_id_raw = struct.unpack("!I", tm_packet.tm_data[0:4])[0] named_obj_id = cast(ObjectIdU32, obj_id_dict.get(obj_id_raw)) if named_obj_id is None: named_obj_id = ObjectIdU32(obj_id_raw, "Unknown ID") if tm_packet.subservice == 25 or tm_packet.subservice == 26: set_id = struct.unpack("!I", tm_packet.tm_data[4:8])[0] hk_data = tm_packet.tm_data[8:] if named_obj_id.as_bytes in hk_filter.object_ids: if PRINT_RAW_HK_B64_STR: print(f"PUS TM Base64: {base64.b64encode(raw_tm)}") handle_regular_hk_print( printer=printer, set_id=set_id, packet_uuid=packet_uuid, object_id=named_obj_id, hk_packet=tm_packet, hk_data=hk_data, db=db_con, ) return try: if hk_level >= 1: printer.generic_hk_tm_print( HkContentType.HK, named_obj_id, set_id, hk_data ) if hk_level >= 1: handle_regular_hk_print( printer=printer, set_id=set_id, packet_uuid=packet_uuid, object_id=named_obj_id, hk_packet=tm_packet, hk_data=hk_data, db=db_con, ) except ValueError as e: _LOGGER.exception( f"{e} error when parsing HK data coming from {named_obj_id}" ) if tm_packet.subservice == 10 or tm_packet.subservice == 12: _LOGGER.warning("HK definitions printout not implemented yet") def handle_regular_hk_print( # noqa C901: Complexity okay here hk_packet: PusTm, set_id: int, packet_uuid: UUID, hk_data: bytes, db: sqlite3.Connection, object_id: ObjectIdU32, printer: FsfwTmTcPrinter, ): objb = object_id.as_bytes hk_info = HkTmInfo( packet_uuid=packet_uuid, hk_packet=hk_packet, db_con=db, hk_data=hk_data, set_id=set_id, ) timestamp = CdsShortTimestamp.unpack(hk_packet.timestamp) packet_dt = timestamp.as_datetime() pw = PrintWrapper(printer.file_logger) """This function is called when a Service 3 Housekeeping packet is received.""" if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: return handle_rw_hk_data(pw, object_id, set_id, hk_data) elif objb == obj_ids.SYRLINKS_HANDLER_ID: return handle_syrlinks_hk_data( hk_info=hk_info, pw=pw, ) elif objb == obj_ids.IMTQ_HANDLER_ID: return handle_imtq_hk(pw=pw, hk_data=hk_data, set_id=set_id) elif objb == obj_ids.GPS_CONTROLLER: return handle_gps_data( pw=pw, set_id=set_id, hk_data=hk_data, packet_time=timestamp.as_datetime(), ) elif objb == obj_ids.PCDU_HANDLER_ID: return handle_pcdu_hk(pw=pw, set_id=set_id, hk_data=hk_data) elif objb == obj_ids.BPX_HANDLER_ID: return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, pw=pw) elif objb == obj_ids.CORE_CONTROLLER_ID: return handle_core_hk_data(pw=pw, hk_data=hk_data, set_id=set_id) elif objb == obj_ids.PDU_1_HANDLER_ID: return handle_pdu_data( hk_info=hk_info, pw=pw, pdu_idx=1, set_id=set_id, hk_data=hk_data, ) elif objb == obj_ids.PDU_2_HANDLER_ID: return handle_pdu_data( hk_info=hk_info, pw=pw, pdu_idx=2, set_id=set_id, hk_data=hk_data, ) elif objb == obj_ids.PLOC_MPSOC_ID: return handle_ploc_mpsoc_hk_data(pw=pw, hk_data=hk_data, set_id=set_id) elif objb == obj_ids.ACU_HANDLER_ID: return handle_acu_hk_data(pw=pw, hk_data=hk_data, set_id=set_id) elif objb == obj_ids.INTERNAL_ERROR_REPORTER_ID: return handle_ier_hk_data(pw=pw, hk_data=hk_data, set_id=set_id) elif objb == obj_ids.RAD_SENSOR_ID: return handle_rad_sensor_data(pw=pw, hk_data=hk_data, set_id=set_id) elif objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: return handle_rw_hk_data( pw=pw, object_id=object_id, set_id=set_id, hk_data=hk_data ) if objb in [ obj_ids.SUS_0_N_LOC_XFYFZM_PT_XF, obj_ids.SUS_1_N_LOC_XBYFZM_PT_XB, obj_ids.SUS_2_N_LOC_XFYBZB_PT_YB, obj_ids.SUS_3_N_LOC_XFYBZF_PT_YF, obj_ids.SUS_4_N_LOC_XMYFZF_PT_ZF, obj_ids.SUS_5_N_LOC_XFYMZB_PT_ZB, obj_ids.SUS_6_R_LOC_XFYBZM_PT_XF, obj_ids.SUS_7_R_LOC_XBYBZM_PT_XB, obj_ids.SUS_8_R_LOC_XBYBZB_PT_YB, obj_ids.SUS_9_R_LOC_XBYBZB_PT_YF, obj_ids.SUS_10_R_LOC_XMYBZF_PT_ZF, obj_ids.SUS_11_R_LOC_XBYMZB_PT_ZB, ]: return handle_sus_hk(object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id) elif objb in RTD_NAMES.keys(): return handle_rtd_hk(object_id=objb, hk_data=hk_data, pw=pw) elif objb == obj_ids.P60_DOCK_HANDLER: return handle_p60_hk_data(pw=pw, set_id=set_id, hk_data=hk_data) elif objb in [ obj_ids.GYRO_0_ADIS_HANDLER_ID, obj_ids.GYRO_1_L3G_HANDLER_ID, obj_ids.GYRO_2_ADIS_HANDLER_ID, obj_ids.GYRO_3_L3G_HANDLER_ID, ]: return handle_gyros_hk_data( object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id ) elif objb in [ obj_ids.MGM_0_LIS3_HANDLER_ID, obj_ids.MGM_1_RM3100_HANDLER_ID, obj_ids.MGM_2_LIS3_HANDLER_ID, obj_ids.MGM_3_RM3100_HANDLER_ID, ]: return handle_mgm_hk_data( object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id ) elif objb == obj_ids.PL_PCDU_ID: return handle_plpcdu_hk(set_id=set_id, hk_data=hk_data, pw=pw) elif objb == obj_ids.THERMAL_CONTROLLER_ID: return handle_thermal_controller_hk_data( object_id=object_id, pw=pw, set_id=set_id, hk_data=hk_data ) elif objb == obj_ids.STAR_TRACKER_ID: return handle_str_hk_data(set_id=set_id, hk_data=hk_data, pw=pw) elif objb == obj_ids.PLOC_SUPV_ID: return handle_supv_hk_data(set_id=set_id, hk_data=hk_data, pw=pw) elif objb in [ obj_ids.TMP1075_HANDLER_TCS_BRD_0_ID, obj_ids.TMP1075_HANDLER_TCS_BRD_1_ID, obj_ids.TMP1075_HANDLER_IF_BRD_ID, obj_ids.TMP1075_HANDLER_PLPCDU_0_ID, obj_ids.TMP1075_HANDLER_PLPCDU_1_ID, ]: return handle_tmp_1075_hk_data(set_id=set_id, hk_data=hk_data, pw=pw) elif objb == obj_ids.ACS_CONTROLLER: return handle_acs_ctrl_hk_data( pw=pw, set_id=set_id, hk_data=hk_data, packet_time=packet_dt ) elif objb == obj_ids.PWR_CONTROLLER: return handle_pwr_ctrl_hk_data( pw=pw, set_id=set_id, hk_data=hk_data, packet_time=packet_dt ) else: _LOGGER.info( f"Service 3 TM: Parsing for object {object_id} and set ID {set_id} " "has not been implemented." )