255 lines
9.3 KiB
Python
255 lines
9.3 KiB
Python
"""HK Handling for EIVE OBSW"""
|
|
|
|
import dataclasses
|
|
import logging
|
|
import base64
|
|
import sqlite3
|
|
import struct
|
|
from typing import List, cast
|
|
from uuid import UUID
|
|
|
|
from spacepackets.ccsds.time import CdsShortTimestamp
|
|
from spacepackets.ecss import PusTm
|
|
from eive_tmtc.config.definitions import PRINT_RAW_HK_B64_STR
|
|
from eive_tmtc.pus_tm.hk import HkTmInfo
|
|
|
|
from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_hk_data
|
|
from eive_tmtc.tmtc.internal_err_reporter import handle_ier_hk_data
|
|
from eive_tmtc.tmtc.payload.ploc_mpsoc import handle_ploc_mpsoc_hk_data
|
|
from eive_tmtc.tmtc.tcs.rtd import RTD_NAMES, handle_rtd_hk
|
|
from eive_tmtc.tmtc.acs.star_tracker import handle_str_hk_data
|
|
from eive_tmtc.tmtc.payload.plpcdu import handle_plpcdu_hk
|
|
from eive_tmtc.tmtc.payload.rad_sensor import handle_rad_sensor_data
|
|
from eive_tmtc.tmtc.acs.sus import handle_sus_hk
|
|
from eive_tmtc.tmtc.payload.ploc_supervisor import handle_supv_hk_data
|
|
from eive_tmtc.tmtc.acs.reaction_wheels import handle_rw_hk_data
|
|
from eive_tmtc.tmtc.power.pwr_ctrl import handle_pwr_ctrl_hk_data
|
|
from eive_tmtc.tmtc.com.syrlinks_handler import handle_syrlinks_hk_data
|
|
from eive_tmtc.tmtc.tcs import handle_thermal_controller_hk_data
|
|
from eive_tmtc.tmtc.tcs.tmp1075 import handle_tmp_1075_hk_data
|
|
from tmtccmd.pus.tm.s3_hk_base import HkContentType
|
|
from tmtccmd.util.obj_id import ObjectIdU32, ObjectIdDictT
|
|
|
|
from eive_tmtc.tmtc.power.bpx_batt import handle_bpx_hk_data
|
|
from eive_tmtc.tmtc.acs.gps import handle_gps_data
|
|
from eive_tmtc.tmtc.acs.gyros import handle_gyros_hk_data
|
|
from eive_tmtc.tmtc.power.tm import (
|
|
handle_pdu_data,
|
|
handle_p60_hk_data,
|
|
handle_acu_hk_data,
|
|
handle_pcdu_hk,
|
|
)
|
|
from eive_tmtc.tmtc.acs.imtq import (
|
|
handle_imtq_hk,
|
|
)
|
|
from eive_tmtc.pus_tm.defs import FsfwTmTcPrinter, PrintWrapper
|
|
from eive_tmtc.tmtc.core import handle_core_hk_data
|
|
from eive_tmtc.tmtc.acs.mgms import handle_mgm_hk_data
|
|
import eive_tmtc.config.object_ids as obj_ids
|
|
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
FORWARD_SENSOR_TEMPS = False
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class HkFilter:
|
|
object_ids: List[bytes]
|
|
set_ids: List[int]
|
|
|
|
|
|
def handle_hk_packet(
|
|
raw_tm: bytes,
|
|
packet_uuid: UUID,
|
|
obj_id_dict: ObjectIdDictT,
|
|
printer: FsfwTmTcPrinter,
|
|
hk_filter: HkFilter,
|
|
hk_level: int,
|
|
db_con: sqlite3.Connection,
|
|
):
|
|
tm_packet = PusTm.unpack(raw_tm, CdsShortTimestamp.TIMESTAMP_SIZE)
|
|
obj_id_raw = struct.unpack("!I", tm_packet.tm_data[0:4])[0]
|
|
named_obj_id = cast(ObjectIdU32, obj_id_dict.get(obj_id_raw))
|
|
if named_obj_id is None:
|
|
named_obj_id = ObjectIdU32(obj_id_raw, "Unknown ID")
|
|
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
|
|
set_id = struct.unpack("!I", tm_packet.tm_data[4:8])[0]
|
|
hk_data = tm_packet.tm_data[8:]
|
|
if named_obj_id.as_bytes in hk_filter.object_ids:
|
|
if PRINT_RAW_HK_B64_STR:
|
|
print(f"PUS TM Base64: {base64.b64encode(raw_tm)}")
|
|
handle_regular_hk_print(
|
|
printer=printer,
|
|
set_id=set_id,
|
|
packet_uuid=packet_uuid,
|
|
object_id=named_obj_id,
|
|
hk_packet=tm_packet,
|
|
hk_data=hk_data,
|
|
db=db_con,
|
|
)
|
|
return
|
|
try:
|
|
if hk_level >= 1:
|
|
printer.generic_hk_tm_print(
|
|
HkContentType.HK, named_obj_id, set_id, hk_data
|
|
)
|
|
if hk_level >= 1:
|
|
handle_regular_hk_print(
|
|
printer=printer,
|
|
set_id=set_id,
|
|
packet_uuid=packet_uuid,
|
|
object_id=named_obj_id,
|
|
hk_packet=tm_packet,
|
|
hk_data=hk_data,
|
|
db=db_con,
|
|
)
|
|
except ValueError as e:
|
|
_LOGGER.exception(
|
|
f"{e} error when parsing HK data coming from {named_obj_id}"
|
|
)
|
|
if tm_packet.subservice == 10 or tm_packet.subservice == 12:
|
|
_LOGGER.warning("HK definitions printout not implemented yet")
|
|
|
|
|
|
def handle_regular_hk_print( # noqa C901: Complexity okay here
|
|
hk_packet: PusTm,
|
|
set_id: int,
|
|
packet_uuid: UUID,
|
|
hk_data: bytes,
|
|
db: sqlite3.Connection,
|
|
object_id: ObjectIdU32,
|
|
printer: FsfwTmTcPrinter,
|
|
):
|
|
objb = object_id.as_bytes
|
|
hk_info = HkTmInfo(
|
|
packet_uuid=packet_uuid,
|
|
hk_packet=hk_packet,
|
|
db_con=db,
|
|
hk_data=hk_data,
|
|
set_id=set_id,
|
|
)
|
|
timestamp = CdsShortTimestamp.unpack(hk_packet.timestamp)
|
|
packet_dt = timestamp.as_datetime()
|
|
pw = PrintWrapper(printer.file_logger)
|
|
"""This function is called when a Service 3 Housekeeping packet is received."""
|
|
if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
|
|
return handle_rw_hk_data(pw, object_id, set_id, hk_data)
|
|
elif objb == obj_ids.SYRLINKS_HANDLER_ID:
|
|
return handle_syrlinks_hk_data(
|
|
hk_info=hk_info,
|
|
pw=pw,
|
|
)
|
|
elif objb == obj_ids.IMTQ_HANDLER_ID:
|
|
return handle_imtq_hk(pw=pw, hk_data=hk_data, set_id=set_id)
|
|
elif objb == obj_ids.GPS_CONTROLLER:
|
|
return handle_gps_data(
|
|
pw=pw,
|
|
set_id=set_id,
|
|
hk_data=hk_data,
|
|
packet_time=timestamp.as_datetime(),
|
|
)
|
|
elif objb == obj_ids.PCDU_HANDLER_ID:
|
|
return handle_pcdu_hk(pw=pw, set_id=set_id, hk_data=hk_data)
|
|
elif objb == obj_ids.BPX_HANDLER_ID:
|
|
return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, pw=pw)
|
|
elif objb == obj_ids.CORE_CONTROLLER_ID:
|
|
return handle_core_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
|
|
elif objb == obj_ids.PDU_1_HANDLER_ID:
|
|
return handle_pdu_data(
|
|
hk_info=hk_info,
|
|
pw=pw,
|
|
pdu_idx=1,
|
|
set_id=set_id,
|
|
hk_data=hk_data,
|
|
)
|
|
elif objb == obj_ids.PDU_2_HANDLER_ID:
|
|
return handle_pdu_data(
|
|
hk_info=hk_info,
|
|
pw=pw,
|
|
pdu_idx=2,
|
|
set_id=set_id,
|
|
hk_data=hk_data,
|
|
)
|
|
elif objb == obj_ids.PLOC_MPSOC_ID:
|
|
return handle_ploc_mpsoc_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
|
|
elif objb == obj_ids.ACU_HANDLER_ID:
|
|
return handle_acu_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
|
|
elif objb == obj_ids.INTERNAL_ERROR_REPORTER_ID:
|
|
return handle_ier_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
|
|
elif objb == obj_ids.RAD_SENSOR_ID:
|
|
return handle_rad_sensor_data(pw=pw, hk_data=hk_data, set_id=set_id)
|
|
elif objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
|
|
return handle_rw_hk_data(
|
|
pw=pw, object_id=object_id, set_id=set_id, hk_data=hk_data
|
|
)
|
|
if objb in [
|
|
obj_ids.SUS_0_N_LOC_XFYFZM_PT_XF,
|
|
obj_ids.SUS_1_N_LOC_XBYFZM_PT_XB,
|
|
obj_ids.SUS_2_N_LOC_XFYBZB_PT_YB,
|
|
obj_ids.SUS_3_N_LOC_XFYBZF_PT_YF,
|
|
obj_ids.SUS_4_N_LOC_XMYFZF_PT_ZF,
|
|
obj_ids.SUS_5_N_LOC_XFYMZB_PT_ZB,
|
|
obj_ids.SUS_6_R_LOC_XFYBZM_PT_XF,
|
|
obj_ids.SUS_7_R_LOC_XBYBZM_PT_XB,
|
|
obj_ids.SUS_8_R_LOC_XBYBZB_PT_YB,
|
|
obj_ids.SUS_9_R_LOC_XBYBZB_PT_YF,
|
|
obj_ids.SUS_10_R_LOC_XMYBZF_PT_ZF,
|
|
obj_ids.SUS_11_R_LOC_XBYMZB_PT_ZB,
|
|
]:
|
|
return handle_sus_hk(object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id)
|
|
elif objb in RTD_NAMES.keys():
|
|
return handle_rtd_hk(object_id=objb, hk_data=hk_data, pw=pw)
|
|
elif objb == obj_ids.P60_DOCK_HANDLER:
|
|
return handle_p60_hk_data(pw=pw, set_id=set_id, hk_data=hk_data)
|
|
elif objb in [
|
|
obj_ids.GYRO_0_ADIS_HANDLER_ID,
|
|
obj_ids.GYRO_1_L3G_HANDLER_ID,
|
|
obj_ids.GYRO_2_ADIS_HANDLER_ID,
|
|
obj_ids.GYRO_3_L3G_HANDLER_ID,
|
|
]:
|
|
return handle_gyros_hk_data(
|
|
object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id
|
|
)
|
|
elif objb in [
|
|
obj_ids.MGM_0_LIS3_HANDLER_ID,
|
|
obj_ids.MGM_1_RM3100_HANDLER_ID,
|
|
obj_ids.MGM_2_LIS3_HANDLER_ID,
|
|
obj_ids.MGM_3_RM3100_HANDLER_ID,
|
|
]:
|
|
return handle_mgm_hk_data(
|
|
object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id
|
|
)
|
|
elif objb == obj_ids.PL_PCDU_ID:
|
|
return handle_plpcdu_hk(set_id=set_id, hk_data=hk_data, pw=pw)
|
|
elif objb == obj_ids.THERMAL_CONTROLLER_ID:
|
|
return handle_thermal_controller_hk_data(
|
|
object_id=object_id, pw=pw, set_id=set_id, hk_data=hk_data
|
|
)
|
|
elif objb == obj_ids.STAR_TRACKER_ID:
|
|
return handle_str_hk_data(set_id=set_id, hk_data=hk_data, pw=pw)
|
|
elif objb == obj_ids.PLOC_SUPV_ID:
|
|
return handle_supv_hk_data(set_id=set_id, hk_data=hk_data, pw=pw)
|
|
elif objb in [
|
|
obj_ids.TMP1075_HANDLER_TCS_BRD_0_ID,
|
|
obj_ids.TMP1075_HANDLER_TCS_BRD_1_ID,
|
|
obj_ids.TMP1075_HANDLER_IF_BRD_ID,
|
|
obj_ids.TMP1075_HANDLER_PLPCDU_0_ID,
|
|
obj_ids.TMP1075_HANDLER_PLPCDU_1_ID,
|
|
]:
|
|
return handle_tmp_1075_hk_data(set_id=set_id, hk_data=hk_data, pw=pw)
|
|
elif objb == obj_ids.ACS_CONTROLLER:
|
|
return handle_acs_ctrl_hk_data(
|
|
pw=pw, set_id=set_id, hk_data=hk_data, packet_time=packet_dt
|
|
)
|
|
elif objb == obj_ids.PWR_CONTROLLER:
|
|
return handle_pwr_ctrl_hk_data(
|
|
pw=pw, set_id=set_id, hk_data=hk_data, packet_time=packet_dt
|
|
)
|
|
else:
|
|
_LOGGER.info(
|
|
f"Service 3 TM: Parsing for object {object_id} and set ID {set_id} "
|
|
"has not been implemented."
|
|
)
|