eive-tmtc/eive_tmtc/pus_tm/hk_handler.py

255 lines
9.3 KiB
Python

"""HK Handling for EIVE OBSW"""
import dataclasses
import logging
import base64
import sqlite3
import struct
from typing import List, cast
from uuid import UUID
from spacepackets.ccsds.time import CdsShortTimestamp
from spacepackets.ecss import PusTm
from eive_tmtc.config.definitions import PRINT_RAW_HK_B64_STR
from eive_tmtc.pus_tm.hk import HkTmInfo
from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_hk_data
from eive_tmtc.tmtc.internal_err_reporter import handle_ier_hk_data
from eive_tmtc.tmtc.payload.ploc_mpsoc import handle_ploc_mpsoc_hk_data
from eive_tmtc.tmtc.tcs.rtd import RTD_NAMES, handle_rtd_hk
from eive_tmtc.tmtc.acs.star_tracker import handle_str_hk_data
from eive_tmtc.tmtc.payload.plpcdu import handle_plpcdu_hk
from eive_tmtc.tmtc.payload.rad_sensor import handle_rad_sensor_data
from eive_tmtc.tmtc.acs.sus import handle_sus_hk
from eive_tmtc.tmtc.payload.ploc_supervisor import handle_supv_hk_data
from eive_tmtc.tmtc.acs.reaction_wheels import handle_rw_hk_data
from eive_tmtc.tmtc.power.pwr_ctrl import handle_pwr_ctrl_hk_data
from eive_tmtc.tmtc.com.syrlinks_handler import handle_syrlinks_hk_data
from eive_tmtc.tmtc.tcs import handle_thermal_controller_hk_data
from eive_tmtc.tmtc.tcs.tmp1075 import handle_tmp_1075_hk_data
from tmtccmd.pus.tm.s3_hk_base import HkContentType
from tmtccmd.util.obj_id import ObjectIdU32, ObjectIdDictT
from eive_tmtc.tmtc.power.bpx_batt import handle_bpx_hk_data
from eive_tmtc.tmtc.acs.gps import handle_gps_data
from eive_tmtc.tmtc.acs.gyros import handle_gyros_hk_data
from eive_tmtc.tmtc.power.tm import (
handle_pdu_data,
handle_p60_hk_data,
handle_acu_hk_data,
handle_pcdu_hk,
)
from eive_tmtc.tmtc.acs.imtq import (
handle_imtq_hk,
)
from eive_tmtc.pus_tm.defs import FsfwTmTcPrinter, PrintWrapper
from eive_tmtc.tmtc.core import handle_core_hk_data
from eive_tmtc.tmtc.acs.mgms import handle_mgm_hk_data
import eive_tmtc.config.object_ids as obj_ids
_LOGGER = logging.getLogger(__name__)
FORWARD_SENSOR_TEMPS = False
@dataclasses.dataclass
class HkFilter:
object_ids: List[bytes]
set_ids: List[int]
def handle_hk_packet(
raw_tm: bytes,
packet_uuid: UUID,
obj_id_dict: ObjectIdDictT,
printer: FsfwTmTcPrinter,
hk_filter: HkFilter,
hk_level: int,
db_con: sqlite3.Connection,
):
tm_packet = PusTm.unpack(raw_tm, CdsShortTimestamp.TIMESTAMP_SIZE)
obj_id_raw = struct.unpack("!I", tm_packet.tm_data[0:4])[0]
named_obj_id = cast(ObjectIdU32, obj_id_dict.get(obj_id_raw))
if named_obj_id is None:
named_obj_id = ObjectIdU32(obj_id_raw, "Unknown ID")
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
set_id = struct.unpack("!I", tm_packet.tm_data[4:8])[0]
hk_data = tm_packet.tm_data[8:]
if named_obj_id.as_bytes in hk_filter.object_ids:
if PRINT_RAW_HK_B64_STR:
print(f"PUS TM Base64: {base64.b64encode(raw_tm)}")
handle_regular_hk_print(
printer=printer,
set_id=set_id,
packet_uuid=packet_uuid,
object_id=named_obj_id,
hk_packet=tm_packet,
hk_data=hk_data,
db=db_con,
)
return
try:
if hk_level >= 1:
printer.generic_hk_tm_print(
HkContentType.HK, named_obj_id, set_id, hk_data
)
if hk_level >= 1:
handle_regular_hk_print(
printer=printer,
set_id=set_id,
packet_uuid=packet_uuid,
object_id=named_obj_id,
hk_packet=tm_packet,
hk_data=hk_data,
db=db_con,
)
except ValueError as e:
_LOGGER.exception(
f"{e} error when parsing HK data coming from {named_obj_id}"
)
if tm_packet.subservice == 10 or tm_packet.subservice == 12:
_LOGGER.warning("HK definitions printout not implemented yet")
def handle_regular_hk_print( # noqa C901: Complexity okay here
hk_packet: PusTm,
set_id: int,
packet_uuid: UUID,
hk_data: bytes,
db: sqlite3.Connection,
object_id: ObjectIdU32,
printer: FsfwTmTcPrinter,
):
objb = object_id.as_bytes
hk_info = HkTmInfo(
packet_uuid=packet_uuid,
hk_packet=hk_packet,
db_con=db,
hk_data=hk_data,
set_id=set_id,
)
timestamp = CdsShortTimestamp.unpack(hk_packet.timestamp)
packet_dt = timestamp.as_datetime()
pw = PrintWrapper(printer.file_logger)
"""This function is called when a Service 3 Housekeeping packet is received."""
if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
return handle_rw_hk_data(pw, object_id, set_id, hk_data)
elif objb == obj_ids.SYRLINKS_HANDLER_ID:
return handle_syrlinks_hk_data(
hk_info=hk_info,
pw=pw,
)
elif objb == obj_ids.IMTQ_HANDLER_ID:
return handle_imtq_hk(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.GPS_CONTROLLER:
return handle_gps_data(
pw=pw,
set_id=set_id,
hk_data=hk_data,
packet_time=timestamp.as_datetime(),
)
elif objb == obj_ids.PCDU_HANDLER_ID:
return handle_pcdu_hk(pw=pw, set_id=set_id, hk_data=hk_data)
elif objb == obj_ids.BPX_HANDLER_ID:
return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, pw=pw)
elif objb == obj_ids.CORE_CONTROLLER_ID:
return handle_core_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.PDU_1_HANDLER_ID:
return handle_pdu_data(
hk_info=hk_info,
pw=pw,
pdu_idx=1,
set_id=set_id,
hk_data=hk_data,
)
elif objb == obj_ids.PDU_2_HANDLER_ID:
return handle_pdu_data(
hk_info=hk_info,
pw=pw,
pdu_idx=2,
set_id=set_id,
hk_data=hk_data,
)
elif objb == obj_ids.PLOC_MPSOC_ID:
return handle_ploc_mpsoc_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.ACU_HANDLER_ID:
return handle_acu_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.INTERNAL_ERROR_REPORTER_ID:
return handle_ier_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.RAD_SENSOR_ID:
return handle_rad_sensor_data(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
return handle_rw_hk_data(
pw=pw, object_id=object_id, set_id=set_id, hk_data=hk_data
)
if objb in [
obj_ids.SUS_0_N_LOC_XFYFZM_PT_XF,
obj_ids.SUS_1_N_LOC_XBYFZM_PT_XB,
obj_ids.SUS_2_N_LOC_XFYBZB_PT_YB,
obj_ids.SUS_3_N_LOC_XFYBZF_PT_YF,
obj_ids.SUS_4_N_LOC_XMYFZF_PT_ZF,
obj_ids.SUS_5_N_LOC_XFYMZB_PT_ZB,
obj_ids.SUS_6_R_LOC_XFYBZM_PT_XF,
obj_ids.SUS_7_R_LOC_XBYBZM_PT_XB,
obj_ids.SUS_8_R_LOC_XBYBZB_PT_YB,
obj_ids.SUS_9_R_LOC_XBYBZB_PT_YF,
obj_ids.SUS_10_R_LOC_XMYBZF_PT_ZF,
obj_ids.SUS_11_R_LOC_XBYMZB_PT_ZB,
]:
return handle_sus_hk(object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id)
elif objb in RTD_NAMES.keys():
return handle_rtd_hk(object_id=objb, hk_data=hk_data, pw=pw)
elif objb == obj_ids.P60_DOCK_HANDLER:
return handle_p60_hk_data(pw=pw, set_id=set_id, hk_data=hk_data)
elif objb in [
obj_ids.GYRO_0_ADIS_HANDLER_ID,
obj_ids.GYRO_1_L3G_HANDLER_ID,
obj_ids.GYRO_2_ADIS_HANDLER_ID,
obj_ids.GYRO_3_L3G_HANDLER_ID,
]:
return handle_gyros_hk_data(
object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id
)
elif objb in [
obj_ids.MGM_0_LIS3_HANDLER_ID,
obj_ids.MGM_1_RM3100_HANDLER_ID,
obj_ids.MGM_2_LIS3_HANDLER_ID,
obj_ids.MGM_3_RM3100_HANDLER_ID,
]:
return handle_mgm_hk_data(
object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id
)
elif objb == obj_ids.PL_PCDU_ID:
return handle_plpcdu_hk(set_id=set_id, hk_data=hk_data, pw=pw)
elif objb == obj_ids.THERMAL_CONTROLLER_ID:
return handle_thermal_controller_hk_data(
object_id=object_id, pw=pw, set_id=set_id, hk_data=hk_data
)
elif objb == obj_ids.STAR_TRACKER_ID:
return handle_str_hk_data(set_id=set_id, hk_data=hk_data, pw=pw)
elif objb == obj_ids.PLOC_SUPV_ID:
return handle_supv_hk_data(set_id=set_id, hk_data=hk_data, pw=pw)
elif objb in [
obj_ids.TMP1075_HANDLER_TCS_BRD_0_ID,
obj_ids.TMP1075_HANDLER_TCS_BRD_1_ID,
obj_ids.TMP1075_HANDLER_IF_BRD_ID,
obj_ids.TMP1075_HANDLER_PLPCDU_0_ID,
obj_ids.TMP1075_HANDLER_PLPCDU_1_ID,
]:
return handle_tmp_1075_hk_data(set_id=set_id, hk_data=hk_data, pw=pw)
elif objb == obj_ids.ACS_CONTROLLER:
return handle_acs_ctrl_hk_data(
pw=pw, set_id=set_id, hk_data=hk_data, packet_time=packet_dt
)
elif objb == obj_ids.PWR_CONTROLLER:
return handle_pwr_ctrl_hk_data(
pw=pw, set_id=set_id, hk_data=hk_data, packet_time=packet_dt
)
else:
_LOGGER.info(
f"Service 3 TM: Parsing for object {object_id} and set ID {set_id} "
"has not been implemented."
)