741 lines
26 KiB
Python
741 lines
26 KiB
Python
"""HK Handling for EIVE OBSW"""
|
|
import struct
|
|
import os
|
|
import datetime
|
|
|
|
|
|
|
|
from tmtccmd.config.definitions import HkReplyUnpacked
|
|
from tmtccmd.tm.pus_3_fsfw_hk import (
|
|
Service3Base,
|
|
HkContentType,
|
|
Service3FsfwTm,
|
|
)
|
|
from tmtccmd.logging import get_console_logger
|
|
from pus_tc.devs.bpx_batt import BpxSetIds
|
|
from pus_tc.devs.syrlinks_hk_handler import SetIds
|
|
from pus_tc.devs.p60dock import SetIds
|
|
from pus_tc.devs.imtq import ImtqSetIds
|
|
from tmtccmd.utility.obj_id import ObjectId, ObjectIdDictT
|
|
import config.object_ids as obj_ids
|
|
|
|
from pus_tm.devs.reaction_wheels import handle_rw_hk_data
|
|
from pus_tm.defs import FsfwTmTcPrinter, log_to_both
|
|
|
|
from pus_tm.tm_tcp_server import TmTcpServer
|
|
|
|
|
|
|
|
LOGGER = get_console_logger()
|
|
|
|
TM_TCP_SERVER = TmTcpServer.getInstance()
|
|
|
|
|
|
def handle_hk_packet(
|
|
raw_tm: bytes,
|
|
obj_id_dict: ObjectIdDictT,
|
|
printer: FsfwTmTcPrinter,
|
|
):
|
|
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
|
|
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
|
|
if named_obj_id is None:
|
|
named_obj_id = tm_packet.object_id
|
|
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
|
|
hk_data = tm_packet.tm_data[8:]
|
|
TM_TCP_SERVER.report_raw_hk_data(object_id=named_obj_id,
|
|
set_id=tm_packet.set_id,
|
|
hk_data=hk_data)
|
|
printer.generic_hk_tm_print(
|
|
content_type=HkContentType.HK,
|
|
object_id=named_obj_id,
|
|
set_id=tm_packet.set_id,
|
|
hk_data=hk_data,
|
|
)
|
|
handle_regular_hk_print(
|
|
printer=printer,
|
|
object_id=named_obj_id,
|
|
hk_packet=tm_packet,
|
|
hk_data=hk_data,
|
|
)
|
|
if tm_packet.subservice == 10 or tm_packet.subservice == 12:
|
|
LOGGER.warning("HK definitions printout not implemented yet")
|
|
|
|
def handle_regular_hk_print(
|
|
printer: FsfwTmTcPrinter,
|
|
object_id: ObjectId,
|
|
hk_packet: Service3Base,
|
|
hk_data: bytes,
|
|
):
|
|
objb = object_id.as_bytes
|
|
set_id = hk_packet.set_id
|
|
"""This function is called when a Service 3 Housekeeping packet is received."""
|
|
if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
|
|
handle_rw_hk_data(printer, object_id, set_id, hk_data)
|
|
if objb == obj_ids.SYRLINKS_HANDLER_ID:
|
|
if set_id == SetIds.RX_REGISTERS_DATASET:
|
|
return handle_syrlinks_rx_registers_dataset(printer, hk_data)
|
|
elif set_id == SetIds.TX_REGISTERS_DATASET:
|
|
return handle_syrlinks_tx_registers_dataset(printer, hk_data)
|
|
else:
|
|
LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id")
|
|
if objb == obj_ids.IMTQ_HANDLER_ID:
|
|
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (
|
|
set_id <= ImtqSetIds.NEGATIVE_Z_TEST
|
|
):
|
|
return handle_self_test_data(printer, hk_data)
|
|
else:
|
|
LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id")
|
|
if objb == obj_ids.GPS_HANDLER_0_ID or object_id == obj_ids.GPS_HANDLER_1_ID:
|
|
handle_gps_data(printer=printer, hk_data=hk_data)
|
|
if objb == obj_ids.BPX_HANDLER_ID:
|
|
handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer)
|
|
if objb == obj_ids.CORE_CONTROLLER_ID:
|
|
return handle_core_hk_data(printer=printer, hk_data=hk_data)
|
|
if objb == obj_ids.PDU_1_HANDLER_ID:
|
|
return handle_pdu_data(
|
|
printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data
|
|
)
|
|
if objb == obj_ids.PDU_2_HANDLER_ID:
|
|
return handle_pdu_data(
|
|
printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data
|
|
)
|
|
if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
|
|
return handle_rw_hk_data(
|
|
printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data
|
|
)
|
|
if objb == obj_ids.P60_DOCK_HANDLER:
|
|
handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data)
|
|
if objb == obj_ids.PL_PCDU_ID:
|
|
log_to_both(printer, "Received PL PCDU HK data")
|
|
if objb == obj_ids.THERMAL_CONTROLLER_ID:
|
|
handle_thermal_controller_hk_data( object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data)
|
|
else:
|
|
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
|
|
return HkReplyUnpacked()
|
|
|
|
def handle_syrlinks_rx_registers_dataset( printer: FsfwTmTcPrinter, hk_data: bytes):
|
|
reply = HkReplyUnpacked()
|
|
header_list = [
|
|
"RX Status",
|
|
"RX Sensitivity",
|
|
"RX Frequency Shift",
|
|
"RX IQ Power",
|
|
"RX AGC Value",
|
|
"RX Demod Eb",
|
|
"RX Demod N0",
|
|
"RX Datarate",
|
|
]
|
|
rx_status = hk_data[0]
|
|
rx_sensitivity = struct.unpack("!I", hk_data[1:5])
|
|
rx_frequency_shift = struct.unpack("!I", hk_data[5:9])
|
|
rx_iq_power = struct.unpack("!H", hk_data[9:11])
|
|
rx_agc_value = struct.unpack("!H", hk_data[11:13])
|
|
rx_demod_eb = struct.unpack("!I", hk_data[13:17])
|
|
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])
|
|
rx_data_rate = hk_data[21]
|
|
content_list = [
|
|
rx_status,
|
|
rx_sensitivity,
|
|
rx_frequency_shift,
|
|
rx_iq_power,
|
|
rx_agc_value,
|
|
rx_demod_eb,
|
|
rx_demod_n0,
|
|
rx_data_rate,
|
|
]
|
|
validity_buffer = hk_data[22:]
|
|
log_to_both(printer, str(header_list))
|
|
log_to_both(printer, str(content_list))
|
|
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
|
|
|
|
def handle_syrlinks_tx_registers_dataset(
|
|
printer: FsfwTmTcPrinter,
|
|
hk_data: bytes,
|
|
):
|
|
reply = HkReplyUnpacked()
|
|
header_list = ["TX Status", "TX Waveform", "TX AGC value"]
|
|
tx_status = hk_data[0]
|
|
tx_waveform = hk_data[1]
|
|
tx_agc_value = struct.unpack("!H", hk_data[2:4])
|
|
content_list = [tx_status, tx_waveform, tx_agc_value]
|
|
validity_buffer = hk_data[4:]
|
|
log_to_both(printer, str(header_list))
|
|
log_to_both(printer, str(content_list))
|
|
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3)
|
|
|
|
def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
|
header_list = [
|
|
"Init Err",
|
|
"Init Raw Mag X [nT]",
|
|
"Init Raw Mag Y [nT]",
|
|
"Init Raw Mag Z [nT]",
|
|
"Init Cal Mag X [nT]",
|
|
"Init Cal Mag Y [nT]",
|
|
"Init Cal Mag Z [nT]",
|
|
"Init Coil X Current [mA]",
|
|
"Init Coil Y Current [mA]",
|
|
"Init Coil Z Current [mA]",
|
|
"Init Coil X Temperature [°C]",
|
|
"Init Coil Y Temperature [°C]",
|
|
"Init Coil Z Temperature [°C]",
|
|
"Err",
|
|
"Raw Mag X [nT]",
|
|
"Raw Mag Y [nT]",
|
|
"Raw Mag Z [nT]",
|
|
"Cal Mag X [nT]",
|
|
"Cal Mag Y [nT]",
|
|
"Cal Mag Z [nT]",
|
|
"Coil X Current [mA]",
|
|
"Coil Y Current [mA]",
|
|
"Coil Z Current [mA]",
|
|
"Coil X Temperature [°C]",
|
|
"Coil Y Temperature [°C]",
|
|
"Coil Z Temperature [°C]",
|
|
"Fina Err",
|
|
"Fina Raw Mag X [nT]",
|
|
"Fina Raw Mag Y [nT]",
|
|
"Fina Raw Mag Z [nT]",
|
|
"Fina Cal Mag X [nT]",
|
|
"Fina Cal Mag Y [nT]",
|
|
"Fina Cal Mag Z [nT]",
|
|
"Fina Coil X Current [mA]",
|
|
"Fina Coil Y Current [mA]",
|
|
"Fina Coil Z Current [mA]",
|
|
"Fina Coil X Temperature [°C]",
|
|
"Fina Coil Y Temperature [°C]",
|
|
"Fina Coil Z Temperature [°C]",
|
|
]
|
|
# INIT step (no coil actuation)
|
|
init_err = hk_data[0]
|
|
init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0]
|
|
init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0]
|
|
init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0]
|
|
init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0]
|
|
init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0]
|
|
init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0]
|
|
init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0]
|
|
init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0]
|
|
init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0]
|
|
init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0]
|
|
init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0]
|
|
init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0]
|
|
|
|
# Actuation step
|
|
err = hk_data[43]
|
|
raw_mag_x = struct.unpack("!f", hk_data[44:48])[0]
|
|
raw_mag_y = struct.unpack("!f", hk_data[48:52])[0]
|
|
raw_mag_z = struct.unpack("!f", hk_data[52:56])[0]
|
|
cal_mag_x = struct.unpack("!f", hk_data[56:60])[0]
|
|
cal_mag_y = struct.unpack("!f", hk_data[60:64])[0]
|
|
cal_mag_z = struct.unpack("!f", hk_data[64:68])[0]
|
|
coil_x_current = struct.unpack("!f", hk_data[68:72])[0]
|
|
coil_y_current = struct.unpack("!f", hk_data[72:76])[0]
|
|
coil_z_current = struct.unpack("!f", hk_data[76:80])[0]
|
|
coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0]
|
|
coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0]
|
|
coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0]
|
|
|
|
# FINA step (no coil actuation)
|
|
fina_err = hk_data[86]
|
|
fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0]
|
|
fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0]
|
|
fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0]
|
|
fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0]
|
|
fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0]
|
|
fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0]
|
|
fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0]
|
|
fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0]
|
|
fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0]
|
|
fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0]
|
|
fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0]
|
|
fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0]
|
|
|
|
validity_buffer = hk_data[129:]
|
|
content_list = [
|
|
init_err,
|
|
init_raw_mag_x,
|
|
init_raw_mag_y,
|
|
init_raw_mag_z,
|
|
init_cal_mag_x,
|
|
init_cal_mag_y,
|
|
init_cal_mag_z,
|
|
init_coil_x_current,
|
|
init_coil_y_current,
|
|
init_coil_z_current,
|
|
init_coil_x_temperature,
|
|
init_coil_y_temperature,
|
|
init_coil_z_temperature,
|
|
err,
|
|
raw_mag_x,
|
|
raw_mag_y,
|
|
raw_mag_z,
|
|
cal_mag_x,
|
|
cal_mag_y,
|
|
cal_mag_z,
|
|
coil_x_current,
|
|
coil_y_current,
|
|
coil_z_current,
|
|
coil_x_temperature,
|
|
coil_y_temperature,
|
|
coil_z_temperature,
|
|
fina_err,
|
|
fina_raw_mag_x,
|
|
fina_raw_mag_y,
|
|
fina_raw_mag_z,
|
|
fina_cal_mag_x,
|
|
fina_cal_mag_y,
|
|
fina_cal_mag_z,
|
|
fina_coil_x_current,
|
|
fina_coil_y_current,
|
|
fina_coil_z_current,
|
|
fina_coil_x_temperature,
|
|
fina_coil_y_temperature,
|
|
fina_coil_z_temperature,
|
|
]
|
|
num_of_vars = len(header_list)
|
|
log_to_both(printer, str(header_list))
|
|
log_to_both(printer, str(content_list))
|
|
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
|
|
|
def handle_thermal_controller_hk_data(object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
|
if set_id == 0:
|
|
LOGGER.info("Received Sensor Temperature data")
|
|
|
|
# get all the floats
|
|
tm_data = struct.unpack("!ffffffffffffffff", hk_data[:16 * 4])
|
|
parsed_data = {}
|
|
|
|
# put them into an list with their names
|
|
parsed_data = []
|
|
parsed_data.append({"SENSOR_PLOC_HEATSPREADER": tm_data[0]})
|
|
parsed_data.append({"SENSOR_PLOC_MISSIONBOARD": tm_data[1]})
|
|
parsed_data.append({"SENSOR_4K_CAMERA": tm_data[2]})
|
|
parsed_data.append({"SENSOR_DAC_HEATSPREADER": tm_data[3]})
|
|
parsed_data.append({"SENSOR_STARTRACKER": tm_data[4]})
|
|
parsed_data.append({"SENSOR_RW1": tm_data[5]})
|
|
parsed_data.append({"SENSOR_DRO": tm_data[6]})
|
|
parsed_data.append({"SENSOR_SCEX": tm_data[7]})
|
|
parsed_data.append({"SENSOR_X8": tm_data[8]})
|
|
parsed_data.append({"SENSOR_HPA": tm_data[9]})
|
|
parsed_data.append({"SENSOR_TX_MODUL": tm_data[10]})
|
|
parsed_data.append({"SENSOR_MPA": tm_data[11]})
|
|
parsed_data.append({"SENSOR_ACU": tm_data[12]})
|
|
parsed_data.append({"SENSOR_PLPCDU_HEATSPREADER": tm_data[13]})
|
|
parsed_data.append({"SENSOR_TCS_BOARD": tm_data[14]})
|
|
parsed_data.append({"SENSOR_MAGNETTORQUER": tm_data[15]})
|
|
|
|
TM_TCP_SERVER.report_parsed_hk_data(object_id, set_id, parsed_data)
|
|
|
|
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
|
LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}")
|
|
reply = HkReplyUnpacked()
|
|
var_index = 0
|
|
header_list = [
|
|
"Latitude",
|
|
"Longitude",
|
|
"Altitude",
|
|
"Fix Mode",
|
|
"Sats in Use",
|
|
"Date",
|
|
"Unix Seconds",
|
|
]
|
|
latitude = struct.unpack("!d", hk_data[0:8])[0]
|
|
longitude = struct.unpack("!d", hk_data[8:16])[0]
|
|
altitude = struct.unpack("!d", hk_data[16:24])[0]
|
|
fix_mode = hk_data[24]
|
|
sat_in_use = hk_data[25]
|
|
year = struct.unpack("!H", hk_data[26:28])[0]
|
|
month = hk_data[28]
|
|
day = hk_data[29]
|
|
hours = hk_data[30]
|
|
minutes = hk_data[31]
|
|
seconds = hk_data[32]
|
|
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
|
|
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
|
|
content_list = [
|
|
latitude,
|
|
longitude,
|
|
altitude,
|
|
fix_mode,
|
|
sat_in_use,
|
|
date_string,
|
|
unix_seconds,
|
|
]
|
|
var_index += 13
|
|
reply.num_of_vars = var_index
|
|
if not os.path.isfile("gps_log.txt"):
|
|
with open("gps_log.txt", "w") as gps_file:
|
|
gps_file.write(
|
|
"Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, "
|
|
"Date, Unix Seconds\n"
|
|
)
|
|
with open("gps_log.txt", "a") as gps_file:
|
|
gps_file.write(
|
|
f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, "
|
|
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n"
|
|
)
|
|
validity_buffer = hk_data[37:39]
|
|
log_to_both(printer, str(header_list))
|
|
log_to_both(printer, str(content_list))
|
|
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
|
|
|
|
def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
|
if set_id == BpxSetIds.GET_HK_SET:
|
|
fmt_str = "!HHHHhhhhIB"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(
|
|
charge_current,
|
|
discharge_current,
|
|
heater_current,
|
|
batt_voltage,
|
|
batt_temp_1,
|
|
batt_temp_2,
|
|
batt_temp_3,
|
|
batt_temp_4,
|
|
reboot_cntr,
|
|
boot_cause,
|
|
) = struct.unpack(fmt_str, hk_data[0:inc_len])
|
|
header_list = [
|
|
"Charge Current",
|
|
"Discharge Current",
|
|
"Heater Current",
|
|
"Battery Voltage",
|
|
"Batt Temp 1",
|
|
"Batt Temp 2",
|
|
"Batt Temp 3",
|
|
"Batt Temp 4",
|
|
"Reboot Counter",
|
|
"Boot Cause",
|
|
]
|
|
content_list = [
|
|
charge_current,
|
|
discharge_current,
|
|
heater_current,
|
|
batt_voltage,
|
|
batt_temp_1,
|
|
batt_temp_2,
|
|
batt_temp_3,
|
|
batt_temp_4,
|
|
reboot_cntr,
|
|
boot_cause,
|
|
]
|
|
validity_buffer = hk_data[inc_len:]
|
|
log_to_both(printer, str(header_list))
|
|
log_to_both(printer, str(content_list))
|
|
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
|
|
elif set_id == BpxSetIds.GET_CFG_SET:
|
|
battheat_mode = hk_data[0]
|
|
battheat_low = struct.unpack("!b", hk_data[1:2])[0]
|
|
battheat_high = struct.unpack("!b", hk_data[2:3])[0]
|
|
header_list = [
|
|
"Battery Heater Mode",
|
|
"Battery Heater Low Limit",
|
|
"Battery Heater High Limit",
|
|
]
|
|
content_list = [battheat_mode, battheat_low, battheat_high]
|
|
validity_buffer = hk_data[3:]
|
|
log_to_both(printer, str(header_list))
|
|
log_to_both(printer, str(content_list))
|
|
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
|
|
|
|
def handle_core_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
|
|
|
fmt_str = "!fffH"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(temperature, ps_voltage, pl_voltage, tx_agc_value) = struct.unpack(
|
|
fmt_str, hk_data[0: 0 + inc_len]
|
|
)
|
|
printout = (
|
|
f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
|
|
f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}"
|
|
)
|
|
log_to_both(printer, printout)
|
|
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4)
|
|
|
|
P60_INDEX_LIST = [
|
|
"ACU VCC",
|
|
"PDU1 VCC",
|
|
"X3 IDLE VCC",
|
|
"PDU2 VCC",
|
|
"ACU VBAT",
|
|
"PDU1 VBAT",
|
|
"X3 IDLE VBAT",
|
|
"PDU2 VBAT",
|
|
"STACK VBAT",
|
|
"STACK 3V3",
|
|
"STACK 5V",
|
|
"GS3V3",
|
|
"GS5V",
|
|
]
|
|
|
|
WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"]
|
|
|
|
PDU1_CHANNELS_NAMES = [
|
|
"TCS Board",
|
|
"Syrlinks",
|
|
"Startracker",
|
|
"MGT",
|
|
"SUS Nominal",
|
|
"SCEX",
|
|
"PLOC",
|
|
"ACS A Side",
|
|
"Unused Channel 8",
|
|
]
|
|
|
|
PDU2_CHANNELS_NAMES = [
|
|
"Q7S",
|
|
"Payload PCDU CH1",
|
|
"RW",
|
|
"TCS Heater In",
|
|
"SUS Redundant",
|
|
"Deployment Mechanism",
|
|
"Payload PCDU CH6",
|
|
"ACS B Side",
|
|
"Payload Camera",
|
|
]
|
|
|
|
PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES]
|
|
|
|
class WdtInfo:
|
|
def __init__(self):
|
|
self.wdt_reboots_list = []
|
|
self.time_pings_left_list = []
|
|
|
|
def print(self, printer: FsfwTmTcPrinter):
|
|
wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
|
|
log_to_both(printer, wdt_info)
|
|
for idx in range(len(self.wdt_reboots_list)):
|
|
log_to_both(
|
|
printer,
|
|
f"{TmHandler.WDT_LIST[idx].ljust(5)} | "
|
|
f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}",
|
|
)
|
|
|
|
def parse(self, wdt_data: bytes, current_idx: int) -> int:
|
|
priv_idx = 0
|
|
self.wdt_reboots_list = []
|
|
self.time_pings_left_list = []
|
|
for idx in range(5):
|
|
self.wdt_reboots_list.append(
|
|
struct.unpack("!I", wdt_data[priv_idx: priv_idx + 4])[0]
|
|
)
|
|
priv_idx += 4
|
|
current_idx += 4
|
|
for idx in range(3):
|
|
self.time_pings_left_list.append(
|
|
struct.unpack("!I", wdt_data[priv_idx: priv_idx + 4])[0]
|
|
)
|
|
priv_idx += 4
|
|
current_idx += 4
|
|
for idx in range(2):
|
|
self.time_pings_left_list.append(wdt_data[priv_idx])
|
|
current_idx += 1
|
|
priv_idx += 1
|
|
return current_idx
|
|
|
|
def handle_pdu_data(
|
|
printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes
|
|
):
|
|
current_idx = 0
|
|
priv_idx = pdu_idx - 1
|
|
if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX:
|
|
fmt_str = "!hhBBBIIH"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(
|
|
vcc,
|
|
vbat,
|
|
conv_enb_0,
|
|
conv_enb_1,
|
|
conv_enb_2,
|
|
boot_cause,
|
|
uptime,
|
|
reset_cause,
|
|
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
|
|
log_to_both(printer, f"VCC {vcc} mV | VBAT {vbat} mV")
|
|
log_to_both(
|
|
printer, f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]"
|
|
)
|
|
log_to_both(
|
|
printer,
|
|
f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}",
|
|
)
|
|
current_idx += inc_len
|
|
latchup_list = []
|
|
log_to_both(printer, "Latchups")
|
|
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
|
|
latchup_list.append(
|
|
struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0]
|
|
)
|
|
content_line = (
|
|
f"{TmHandler.PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}"
|
|
)
|
|
log_to_both(printer, content_line)
|
|
current_idx += 2
|
|
device_types = []
|
|
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
|
|
device_types.append(hk_data[current_idx])
|
|
current_idx += 1
|
|
device_statuses = []
|
|
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
|
|
device_statuses.append(hk_data[current_idx])
|
|
current_idx += 1
|
|
wdt = WdtInfo()
|
|
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
|
|
wdt.print(printer=printer)
|
|
if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE:
|
|
log_to_both(printer, f"Received PDU HK from PDU {pdu_idx}")
|
|
current_list = []
|
|
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
|
|
current_list.append(
|
|
struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0]
|
|
)
|
|
current_idx += 2
|
|
voltage_list = []
|
|
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
|
|
voltage_list.append(
|
|
struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0]
|
|
)
|
|
current_idx += 2
|
|
output_enb_list = []
|
|
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
|
|
output_enb_list.append(hk_data[current_idx])
|
|
current_idx += 1
|
|
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
|
|
print(header_str)
|
|
printer.file_logger.info(header_str)
|
|
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
|
|
out_enb = f"{output_enb_list[idx]}".ljust(6)
|
|
content_line = (
|
|
f"{TmHandler.PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | "
|
|
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
|
|
)
|
|
log_to_both(printer, content_line)
|
|
fmt_str = "!IBh"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(boot_count, batt_mode, temperature) = struct.unpack(
|
|
fmt_str, hk_data[current_idx: current_idx + inc_len]
|
|
)
|
|
info = (
|
|
f"Boot Count {boot_count} | Battery Mode {batt_mode} | "
|
|
f"Temperature {temperature / 10.0}"
|
|
)
|
|
log_to_both(printer, info)
|
|
|
|
def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
|
if set_id == SetIds.P60_CORE:
|
|
log_to_both(printer, "Received P60 Core HK. Voltages in mV, currents in mA")
|
|
current_idx = 0
|
|
current_list = []
|
|
for idx in range(13):
|
|
current_list.append(
|
|
struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0]
|
|
)
|
|
current_idx += 2
|
|
voltage_list = []
|
|
for idx in range(13):
|
|
voltage_list.append(
|
|
struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0]
|
|
)
|
|
current_idx += 2
|
|
out_enb_list = []
|
|
for idx in range(13):
|
|
out_enb_list.append(hk_data[current_idx])
|
|
current_idx += 1
|
|
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
|
|
print(header_str)
|
|
printer.file_logger.info(header_str)
|
|
for idx in range(13):
|
|
out_enb = f"{out_enb_list[idx]}".ljust(6)
|
|
content_line = (
|
|
f"{TmHandler.P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | "
|
|
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
|
|
)
|
|
log_to_both(printer, content_line)
|
|
fmt_str = "!IBhHhh"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(
|
|
boot_count,
|
|
batt_mode,
|
|
batt_current,
|
|
batt_voltage,
|
|
temp_0,
|
|
temp_1,
|
|
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
|
|
current_idx += inc_len
|
|
batt_info = (
|
|
f"Batt: Mode {batt_mode} | Boot Count {boot_count} | "
|
|
f"Charge current {batt_current} | Voltage {batt_voltage}"
|
|
)
|
|
temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | "
|
|
log_to_both(printer, temps)
|
|
log_to_both(printer, batt_info)
|
|
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9)
|
|
if set_id == SetIds.P60_AUX:
|
|
log_to_both(printer, "Received P60 AUX HK. Voltages in mV, currents in mA")
|
|
current_idx = 0
|
|
latchup_list = []
|
|
log_to_both(printer, "P60 Dock Latchups")
|
|
for idx in range(0, 13):
|
|
latchup_list.append(
|
|
struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0]
|
|
)
|
|
content_line = f"{TmHandler.P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}"
|
|
log_to_both(printer, content_line)
|
|
current_idx += 2
|
|
fmt_str = "!IIHBBHHhhB"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(
|
|
boot_cause,
|
|
uptime,
|
|
reset_cause,
|
|
heater_on,
|
|
conv_5v_on,
|
|
dock_vbat,
|
|
dock_vcc_c,
|
|
batt_temp_0,
|
|
batt_temp_1,
|
|
dearm_status,
|
|
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
|
|
current_idx += inc_len
|
|
wdt = WdtInfo()
|
|
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
|
|
fmt_str = "!hhbb"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(
|
|
batt_charge_current,
|
|
batt_discharge_current,
|
|
ant6_depl,
|
|
ar6_depl,
|
|
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
|
|
current_idx += inc_len
|
|
device_types = []
|
|
device_statuses = []
|
|
for idx in range(8):
|
|
device_types.append(hk_data[current_idx])
|
|
current_idx += 1
|
|
for idx in range(8):
|
|
device_statuses.append(hk_data[current_idx])
|
|
current_idx += 1
|
|
util_info = (
|
|
f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}"
|
|
)
|
|
util_info_2 = (
|
|
f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | "
|
|
f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}"
|
|
)
|
|
log_to_both(printer, util_info)
|
|
log_to_both(printer, util_info_2)
|
|
wdt.print(printer)
|
|
misc_info = (
|
|
f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}"
|
|
)
|
|
log_to_both(printer, misc_info)
|
|
batt_info = (
|
|
f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | "
|
|
f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}"
|
|
)
|
|
log_to_both(printer, batt_info)
|
|
printer.print_validity_buffer(
|
|
validity_buffer=hk_data[current_idx:], num_vars=27
|
|
)
|