eive-tmtc/pus_tm/hk_handling.py

761 lines
27 KiB
Python
Raw Normal View History

2021-08-11 19:16:08 +02:00
"""HK Handling for EIVE OBSW"""
2021-06-15 15:23:27 +02:00
import struct
2021-09-06 18:26:56 +02:00
import os
import datetime
import json
import socket
2020-12-17 17:50:00 +01:00
from tmtccmd.config.definitions import HkReplyUnpacked
from tmtccmd.tm.pus_3_fsfw_hk import (
2022-04-05 19:49:42 +02:00
Service3Base,
HkContentType,
Service3FsfwTm,
)
2022-04-05 00:51:52 +02:00
from tmtccmd.logging import get_console_logger
2022-03-04 10:41:05 +01:00
from pus_tc.devs.bpx_batt import BpxSetIds
2022-03-04 10:44:55 +01:00
from pus_tc.devs.syrlinks_hk_handler import SetIds
2022-04-07 12:51:47 +02:00
from pus_tc.devs.p60dock import SetIds
2022-03-04 10:44:55 +01:00
from pus_tc.devs.imtq import ImtqSetIds
from tmtccmd.utility.obj_id import ObjectId, ObjectIdDictT
2022-04-12 15:36:04 +02:00
import config.object_ids as obj_ids
2022-05-10 18:00:56 +02:00
from pus_tm.devs.reaction_wheels import handle_rw_hk_data
2022-05-17 11:13:32 +02:00
from pus_tm.defs import FsfwTmTcPrinter, log_to_both
2022-01-18 14:03:56 +01:00
#TODO add to configuration parameters
THERMAL_HOST = "127.0.0.1"
THERMAL_PORT = 7302
2021-06-28 19:06:09 +02:00
LOGGER = get_console_logger()
2020-12-17 17:50:00 +01:00
def handle_hk_packet(
2022-04-05 19:27:55 +02:00
raw_tm: bytes,
obj_id_dict: ObjectIdDictT,
printer: FsfwTmTcPrinter,
):
2022-04-05 19:49:42 +02:00
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
2022-04-05 19:27:55 +02:00
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
if named_obj_id is None:
named_obj_id = tm_packet.object_id
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
hk_data = tm_packet.tm_data[8:]
2022-04-06 16:41:46 +02:00
printer.generic_hk_tm_print(
content_type=HkContentType.HK,
2022-04-05 19:27:55 +02:00
object_id=named_obj_id,
set_id=tm_packet.set_id,
hk_data=hk_data,
)
2022-04-05 17:05:11 +02:00
handle_regular_hk_print(
printer=printer,
2022-04-05 19:27:55 +02:00
object_id=named_obj_id,
hk_packet=tm_packet,
hk_data=hk_data,
2022-04-05 17:05:11 +02:00
)
2022-04-05 19:27:55 +02:00
if tm_packet.subservice == 10 or tm_packet.subservice == 12:
2022-04-05 17:05:11 +02:00
LOGGER.warning("HK definitions printout not implemented yet")
def handle_regular_hk_print(
2022-04-05 19:27:55 +02:00
printer: FsfwTmTcPrinter,
object_id: ObjectId,
hk_packet: Service3Base,
hk_data: bytes,
2022-04-05 17:05:11 +02:00
):
2022-04-28 19:43:16 +02:00
objb = object_id.as_bytes
2022-04-05 17:05:11 +02:00
set_id = hk_packet.set_id
2022-01-18 14:03:56 +01:00
"""This function is called when a Service 3 Housekeeping packet is received."""
2022-04-28 19:43:16 +02:00
if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
handle_rw_hk_data(printer, object_id, set_id, hk_data)
if objb == obj_ids.SYRLINKS_HANDLER_ID:
2021-06-15 15:23:27 +02:00
if set_id == SetIds.RX_REGISTERS_DATASET:
2022-04-06 17:01:01 +02:00
return handle_syrlinks_rx_registers_dataset(printer, hk_data)
2021-06-15 15:23:27 +02:00
elif set_id == SetIds.TX_REGISTERS_DATASET:
2022-04-06 17:01:01 +02:00
return handle_syrlinks_tx_registers_dataset(printer, hk_data)
2021-06-15 15:23:27 +02:00
else:
2022-04-06 17:35:04 +02:00
LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id")
2022-04-28 19:43:16 +02:00
if objb == obj_ids.IMTQ_HANDLER_ID:
2022-01-18 14:03:56 +01:00
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (
set_id <= ImtqSetIds.NEGATIVE_Z_TEST
):
2022-04-06 17:01:01 +02:00
return handle_self_test_data(printer, hk_data)
2021-06-15 15:23:27 +02:00
else:
2022-04-06 17:35:04 +02:00
LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id")
2022-04-28 19:43:16 +02:00
if objb == obj_ids.GPS_HANDLER_0_ID or object_id == obj_ids.GPS_HANDLER_1_ID:
2022-04-06 17:35:04 +02:00
handle_gps_data(printer=printer, hk_data=hk_data)
2022-04-28 19:43:16 +02:00
if objb == obj_ids.BPX_HANDLER_ID:
2022-04-06 17:35:04 +02:00
handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer)
2022-04-28 19:43:16 +02:00
if objb == obj_ids.CORE_CONTROLLER_ID:
2022-04-05 19:27:55 +02:00
return handle_core_hk_data(printer=printer, hk_data=hk_data)
2022-04-28 19:43:16 +02:00
if objb == obj_ids.PDU_1_HANDLER_ID:
2022-04-12 16:10:51 +02:00
return handle_pdu_data(
printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data
)
2022-04-28 19:43:16 +02:00
if objb == obj_ids.PDU_2_HANDLER_ID:
2022-04-12 16:10:51 +02:00
return handle_pdu_data(
printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data
)
2022-05-05 02:00:18 +02:00
if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
return handle_rw_hk_data(
printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data
)
2022-04-28 19:43:16 +02:00
if objb == obj_ids.P60_DOCK_HANDLER:
2022-04-08 14:46:01 +02:00
handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data)
2022-04-28 19:43:16 +02:00
if objb == obj_ids.PL_PCDU_ID:
2022-04-07 00:15:15 +02:00
log_to_both(printer, "Received PL PCDU HK data")
if objb == obj_ids.THERMAL_CONTROLLER_ID:
handle_thermal_controller_hk_data(printer=printer, set_id=set_id, hk_data=hk_data)
2021-06-15 15:23:27 +02:00
else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
return HkReplyUnpacked()
2021-06-15 15:23:27 +02:00
2022-04-06 17:01:01 +02:00
def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes):
reply = HkReplyUnpacked()
2022-04-06 17:01:01 +02:00
header_list = [
2022-01-18 14:03:56 +01:00
"RX Status",
"RX Sensitivity",
"RX Frequency Shift",
"RX IQ Power",
"RX AGC Value",
"RX Demod Eb",
"RX Demod N0",
"RX Datarate",
]
2021-06-15 15:23:27 +02:00
rx_status = hk_data[0]
2022-01-18 14:03:56 +01:00
rx_sensitivity = struct.unpack("!I", hk_data[1:5])
rx_frequency_shift = struct.unpack("!I", hk_data[5:9])
rx_iq_power = struct.unpack("!H", hk_data[9:11])
rx_agc_value = struct.unpack("!H", hk_data[11:13])
rx_demod_eb = struct.unpack("!I", hk_data[13:17])
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])
2021-06-15 15:23:27 +02:00
rx_data_rate = hk_data[21]
2022-04-06 17:01:01 +02:00
content_list = [
2022-01-18 14:03:56 +01:00
rx_status,
rx_sensitivity,
rx_frequency_shift,
rx_iq_power,
rx_agc_value,
rx_demod_eb,
rx_demod_n0,
rx_data_rate,
]
2022-04-06 17:01:01 +02:00
validity_buffer = hk_data[22:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
2021-06-15 15:23:27 +02:00
2022-01-18 14:03:56 +01:00
def handle_syrlinks_tx_registers_dataset(
2022-04-06 17:01:01 +02:00
printer: FsfwTmTcPrinter,
2022-04-05 17:05:11 +02:00
hk_data: bytes,
2022-04-06 17:01:01 +02:00
):
reply = HkReplyUnpacked()
2022-04-06 17:01:01 +02:00
header_list = ["TX Status", "TX Waveform", "TX AGC value"]
2021-06-15 15:23:27 +02:00
tx_status = hk_data[0]
tx_waveform = hk_data[1]
2022-01-18 14:03:56 +01:00
tx_agc_value = struct.unpack("!H", hk_data[2:4])
2022-04-06 17:01:01 +02:00
content_list = [tx_status, tx_waveform, tx_agc_value]
validity_buffer = hk_data[4:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3)
2021-06-15 15:23:27 +02:00
2022-04-06 17:01:01 +02:00
def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
header_list = [
2022-01-18 14:03:56 +01:00
"Init Err",
"Init Raw Mag X [nT]",
"Init Raw Mag Y [nT]",
"Init Raw Mag Z [nT]",
"Init Cal Mag X [nT]",
"Init Cal Mag Y [nT]",
"Init Cal Mag Z [nT]",
"Init Coil X Current [mA]",
"Init Coil Y Current [mA]",
"Init Coil Z Current [mA]",
"Init Coil X Temperature [°C]",
"Init Coil Y Temperature [°C]",
"Init Coil Z Temperature [°C]",
"Err",
"Raw Mag X [nT]",
"Raw Mag Y [nT]",
"Raw Mag Z [nT]",
"Cal Mag X [nT]",
"Cal Mag Y [nT]",
"Cal Mag Z [nT]",
"Coil X Current [mA]",
"Coil Y Current [mA]",
"Coil Z Current [mA]",
"Coil X Temperature [°C]",
"Coil Y Temperature [°C]",
"Coil Z Temperature [°C]",
"Fina Err",
"Fina Raw Mag X [nT]",
"Fina Raw Mag Y [nT]",
"Fina Raw Mag Z [nT]",
"Fina Cal Mag X [nT]",
"Fina Cal Mag Y [nT]",
"Fina Cal Mag Z [nT]",
"Fina Coil X Current [mA]",
"Fina Coil Y Current [mA]",
"Fina Coil Z Current [mA]",
"Fina Coil X Temperature [°C]",
"Fina Coil Y Temperature [°C]",
"Fina Coil Z Temperature [°C]",
2021-08-11 19:16:08 +02:00
]
2021-06-15 15:23:27 +02:00
# INIT step (no coil actuation)
init_err = hk_data[0]
2022-01-18 14:03:56 +01:00
init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0]
init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0]
init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0]
init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0]
init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0]
init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0]
init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0]
init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0]
init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0]
init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0]
init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0]
init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0]
2021-06-15 15:23:27 +02:00
# Actuation step
err = hk_data[43]
2022-01-18 14:03:56 +01:00
raw_mag_x = struct.unpack("!f", hk_data[44:48])[0]
raw_mag_y = struct.unpack("!f", hk_data[48:52])[0]
raw_mag_z = struct.unpack("!f", hk_data[52:56])[0]
cal_mag_x = struct.unpack("!f", hk_data[56:60])[0]
cal_mag_y = struct.unpack("!f", hk_data[60:64])[0]
cal_mag_z = struct.unpack("!f", hk_data[64:68])[0]
coil_x_current = struct.unpack("!f", hk_data[68:72])[0]
coil_y_current = struct.unpack("!f", hk_data[72:76])[0]
coil_z_current = struct.unpack("!f", hk_data[76:80])[0]
coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0]
coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0]
coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0]
2021-06-15 15:23:27 +02:00
# FINA step (no coil actuation)
fina_err = hk_data[86]
2022-01-18 14:03:56 +01:00
fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0]
fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0]
fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0]
fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0]
fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0]
fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0]
fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0]
fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0]
fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0]
fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0]
fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0]
fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0]
2021-06-15 15:23:27 +02:00
2022-04-06 17:01:01 +02:00
validity_buffer = hk_data[129:]
content_list = [
2022-01-18 14:03:56 +01:00
init_err,
init_raw_mag_x,
init_raw_mag_y,
init_raw_mag_z,
init_cal_mag_x,
init_cal_mag_y,
init_cal_mag_z,
init_coil_x_current,
init_coil_y_current,
init_coil_z_current,
init_coil_x_temperature,
init_coil_y_temperature,
init_coil_z_temperature,
err,
raw_mag_x,
init_raw_mag_y,
raw_mag_z,
cal_mag_x,
cal_mag_y,
cal_mag_z,
coil_x_current,
coil_y_current,
coil_z_current,
coil_x_temperature,
coil_y_temperature,
coil_z_temperature,
fina_err,
fina_raw_mag_x,
fina_raw_mag_y,
fina_raw_mag_z,
fina_cal_mag_x,
fina_cal_mag_y,
fina_cal_mag_z,
fina_coil_x_current,
fina_coil_y_current,
fina_coil_z_current,
fina_coil_x_temperature,
fina_coil_y_temperature,
fina_coil_z_temperature,
2021-08-11 19:16:08 +02:00
]
2022-04-06 17:01:01 +02:00
num_of_vars = len(header_list)
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
2021-06-15 15:23:27 +02:00
def handle_thermal_controller_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
if set_id == 0:
LOGGER.info("Received Sensor Temperature data")
#get all the floats
tm_data = struct.unpack("!ffffffffffffffff", hk_data[:16*4])
parsed_data = {}
#put them into a nice dictionary
parsed_data["SID"] = set_id
parsed_data["content"] = {}
parsed_data["content"]["SENSOR_PLOC_HEATSPREADER"] = tm_data[0]
parsed_data["content"]["SENSOR_PLOC_MISSIONBOARD"] = tm_data[1]
parsed_data["content"]["SENSOR_4K_CAMERA"] = tm_data[2]
parsed_data["content"]["SENSOR_DAC_HEATSPREADER"] = tm_data[3]
parsed_data["content"]["SENSOR_STARTRACKER"] = tm_data[4]
parsed_data["content"]["SENSOR_RW1"] = tm_data[5]
parsed_data["content"]["SENSOR_DRO"] = tm_data[6]
parsed_data["content"]["SENSOR_SCEX"] = tm_data[7]
parsed_data["content"]["SENSOR_X8"] = tm_data[8]
parsed_data["content"]["SENSOR_HPA"] = tm_data[9]
parsed_data["content"]["SENSOR_TX_MODUL"] = tm_data[10]
parsed_data["content"]["SENSOR_MPA"] = tm_data[11]
parsed_data["content"]["SENSOR_ACU"] = tm_data[12]
parsed_data["content"]["SENSOR_PLPCDU_HEATSPREADER"] = tm_data[13]
parsed_data["content"]["SENSOR_TCS_BOARD"] = tm_data[14]
parsed_data["content"]["SENSOR_MAGNETTORQUER"] = tm_data[15]
#which in turn will become a json to be sent over the wire
json_string = json.dumps(parsed_data)
#print(json_string)
#try to send it to a tcp server
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((THERMAL_HOST, THERMAL_PORT))
s.sendall(bytes(json_string, encoding="utf-8"))
except:
#fail silently if there is noone listening, should be a non breaking feature
pass
2021-08-11 19:16:08 +02:00
2022-04-06 17:01:01 +02:00
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
2022-01-18 14:03:56 +01:00
LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}")
reply = HkReplyUnpacked()
2021-09-06 14:52:41 +02:00
var_index = 0
2022-04-06 17:01:01 +02:00
header_list = [
"Latitude",
"Longitude",
"Altitude",
"Fix Mode",
"Sats in Use",
"Date",
2022-02-03 16:02:55 +01:00
"Unix Seconds",
]
2022-01-18 14:03:56 +01:00
latitude = struct.unpack("!d", hk_data[0:8])[0]
longitude = struct.unpack("!d", hk_data[8:16])[0]
altitude = struct.unpack("!d", hk_data[16:24])[0]
2021-09-06 14:52:41 +02:00
fix_mode = hk_data[24]
sat_in_use = hk_data[25]
2022-01-18 14:03:56 +01:00
year = struct.unpack("!H", hk_data[26:28])[0]
2021-09-06 14:52:41 +02:00
month = hk_data[28]
day = hk_data[29]
hours = hk_data[30]
minutes = hk_data[31]
seconds = hk_data[32]
2022-01-18 14:03:56 +01:00
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
2022-04-06 17:01:01 +02:00
content_list = [
latitude,
longitude,
altitude,
fix_mode,
sat_in_use,
date_string,
2022-02-03 16:02:55 +01:00
unix_seconds,
]
2021-09-06 14:52:41 +02:00
var_index += 13
reply.num_of_vars = var_index
2022-01-18 14:03:56 +01:00
if not os.path.isfile("gps_log.txt"):
2021-09-06 18:26:56 +02:00
with open("gps_log.txt", "w") as gps_file:
gps_file.write(
2022-01-18 14:03:56 +01:00
"Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, "
"Date, Unix Seconds\n"
2021-09-06 18:26:56 +02:00
)
with open("gps_log.txt", "a") as gps_file:
gps_file.write(
2022-01-18 14:03:56 +01:00
f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, "
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n"
2021-09-06 18:26:56 +02:00
)
2022-04-06 17:01:01 +02:00
validity_buffer = hk_data[37:39]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
2022-04-06 17:01:01 +02:00
def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
2022-02-03 15:30:58 +01:00
if set_id == BpxSetIds.GET_HK_SET:
2022-04-06 17:35:04 +02:00
fmt_str = "!HHHHhhhhIB"
2022-04-06 17:01:01 +02:00
inc_len = struct.calcsize(fmt_str)
(
charge_current,
discharge_current,
heater_current,
batt_voltage,
batt_temp_1,
batt_temp_2,
batt_temp_3,
batt_temp_4,
reboot_cntr,
boot_cause,
) = struct.unpack(fmt_str, hk_data[0:inc_len])
header_list = [
2022-02-03 15:30:58 +01:00
"Charge Current",
"Discharge Current",
"Heater Current",
"Battery Voltage",
"Batt Temp 1",
"Batt Temp 2",
"Batt Temp 3",
"Batt Temp 4",
"Reboot Counter",
2022-02-03 16:02:55 +01:00
"Boot Cause",
2022-02-03 15:30:58 +01:00
]
2022-04-06 17:01:01 +02:00
content_list = [
2022-02-03 15:30:58 +01:00
charge_current,
discharge_current,
heater_current,
batt_voltage,
batt_temp_1,
batt_temp_2,
batt_temp_3,
batt_temp_4,
reboot_cntr,
2022-02-03 16:02:55 +01:00
boot_cause,
2022-02-03 15:30:58 +01:00
]
2022-04-06 17:01:01 +02:00
validity_buffer = hk_data[inc_len:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
2022-02-03 15:30:58 +01:00
elif set_id == BpxSetIds.GET_CFG_SET:
battheat_mode = hk_data[0]
2022-02-03 16:02:55 +01:00
battheat_low = struct.unpack("!b", hk_data[1:2])[0]
battheat_high = struct.unpack("!b", hk_data[2:3])[0]
2022-04-06 17:01:01 +02:00
header_list = [
2022-02-03 15:30:58 +01:00
"Battery Heater Mode",
"Battery Heater Low Limit",
2022-02-03 16:02:55 +01:00
"Battery Heater High Limit",
2022-02-03 15:30:58 +01:00
]
2022-04-06 17:01:01 +02:00
content_list = [battheat_mode, battheat_low, battheat_high]
validity_buffer = hk_data[3:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
2022-03-13 21:35:12 +01:00
2022-04-05 19:27:55 +02:00
def handle_core_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
fmt_str = "!fffH"
inc_len = struct.calcsize(fmt_str)
2022-04-05 19:49:42 +02:00
(temperature, ps_voltage, pl_voltage, tx_agc_value) = struct.unpack(
fmt_str, hk_data[0 : 0 + inc_len]
)
printout = (
f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}"
)
2022-04-05 19:27:55 +02:00
log_to_both(printer, printout)
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4)
2022-04-04 17:15:12 +02:00
P60_INDEX_LIST = [
2022-04-04 18:46:52 +02:00
"ACU VCC",
"PDU1 VCC",
2022-04-05 19:27:55 +02:00
"X3 IDLE VCC",
2022-04-04 18:46:52 +02:00
"PDU2 VCC",
"ACU VBAT",
"PDU1 VBAT",
"X3 IDLE VBAT",
"PDU2 VBAT",
"STACK VBAT",
"STACK 3V3",
"STACK 5V",
"GS3V3",
"GS5V",
2022-04-04 17:15:12 +02:00
]
2022-04-05 19:27:55 +02:00
WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"]
2022-04-04 17:15:12 +02:00
2022-04-12 16:00:37 +02:00
PDU1_CHANNELS_NAMES = [
"TCS Board",
"Syrlinks",
"Startracker",
"MGT",
"SUS Nominal",
"SCEX",
"PLOC",
"ACS A Side",
2022-04-12 16:10:51 +02:00
"Unused Channel 8",
2022-04-12 16:00:37 +02:00
]
PDU2_CHANNELS_NAMES = [
"Q7S",
"Payload PCDU CH1",
"RW",
"TCS Heater In",
"SUS Redundant",
"Deployment Mechanism",
"Payload PCDU CH6",
"ACS B Side",
2022-04-12 16:10:51 +02:00
"Payload Camera",
2022-04-12 16:00:37 +02:00
]
2022-04-12 16:10:51 +02:00
PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES]
2022-04-12 16:00:37 +02:00
class WdtInfo:
def __init__(self):
self.wdt_reboots_list = []
self.time_pings_left_list = []
def print(self, printer: FsfwTmTcPrinter):
wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
log_to_both(printer, wdt_info)
for idx in range(len(self.wdt_reboots_list)):
log_to_both(
printer,
f"{WDT_LIST[idx].ljust(5)} | "
f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}",
)
def parse(self, wdt_data: bytes, current_idx: int) -> int:
priv_idx = 0
self.wdt_reboots_list = []
self.time_pings_left_list = []
for idx in range(5):
self.wdt_reboots_list.append(
struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0]
)
priv_idx += 4
current_idx += 4
for idx in range(3):
self.time_pings_left_list.append(
struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0]
)
priv_idx += 4
current_idx += 4
for idx in range(2):
self.time_pings_left_list.append(wdt_data[priv_idx])
current_idx += 1
priv_idx += 1
return current_idx
2022-04-12 16:10:51 +02:00
def handle_pdu_data(
printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes
):
current_idx = 0
priv_idx = pdu_idx - 1
2022-04-12 16:00:37 +02:00
if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX:
fmt_str = "!hhBBBIIH"
inc_len = struct.calcsize(fmt_str)
(
vcc,
vbat,
conv_enb_0,
conv_enb_1,
conv_enb_2,
boot_cause,
uptime,
reset_cause,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
log_to_both(printer, f"VCC {vcc} mV | VBAT {vbat} mV")
log_to_both(
printer, f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]"
)
log_to_both(
printer,
f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}",
)
current_idx += inc_len
latchup_list = []
log_to_both(printer, "Latchups")
for idx in range(len(PDU1_CHANNELS_NAMES)):
latchup_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
content_line = (
f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}"
)
log_to_both(printer, content_line)
current_idx += 2
device_types = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
device_types.append(hk_data[current_idx])
current_idx += 1
device_statuses = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
device_statuses.append(hk_data[current_idx])
current_idx += 1
wdt = WdtInfo()
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
wdt.print(printer=printer)
2022-04-12 16:00:37 +02:00
if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE:
log_to_both(printer, f"Received PDU HK from PDU {pdu_idx}")
current_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
current_list.append(
2022-04-12 16:10:51 +02:00
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
2022-04-12 16:00:37 +02:00
)
current_idx += 2
voltage_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
voltage_list.append(
2022-04-12 16:10:51 +02:00
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
2022-04-12 16:00:37 +02:00
)
current_idx += 2
output_enb_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
output_enb_list.append(hk_data[current_idx])
current_idx += 1
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
print(header_str)
printer.file_logger.info(header_str)
for idx in range(len(PDU1_CHANNELS_NAMES)):
out_enb = f"{output_enb_list[idx]}".ljust(6)
content_line = (
f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | "
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
)
log_to_both(printer, content_line)
fmt_str = "!IBh"
inc_len = struct.calcsize(fmt_str)
2022-04-12 16:10:51 +02:00
(boot_count, batt_mode, temperature) = struct.unpack(
fmt_str, hk_data[current_idx : current_idx + inc_len]
)
info = (
f"Boot Count {boot_count} | Battery Mode {batt_mode} | "
f"Temperature {temperature / 10.0}"
)
2022-04-12 16:00:37 +02:00
log_to_both(printer, info)
2022-04-05 19:27:55 +02:00
2022-04-12 15:36:04 +02:00
2022-04-07 12:51:47 +02:00
def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
if set_id == SetIds.P60_CORE:
2022-04-08 14:46:01 +02:00
log_to_both(printer, "Received P60 Core HK. Voltages in mV, currents in mA")
2022-04-07 12:51:47 +02:00
current_idx = 0
current_list = []
for idx in range(13):
current_list.append(
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
voltage_list = []
for idx in range(13):
voltage_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
out_enb_list = []
for idx in range(13):
out_enb_list.append(hk_data[current_idx])
current_idx += 1
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
print(header_str)
printer.file_logger.info(header_str)
for idx in range(13):
out_enb = f"{out_enb_list[idx]}".ljust(6)
content_line = (
f"{P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | "
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
)
log_to_both(printer, content_line)
fmt_str = "!IBhHhh"
inc_len = struct.calcsize(fmt_str)
(
boot_count,
batt_mode,
batt_current,
batt_voltage,
temp_0,
temp_1,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
batt_info = (
2022-04-08 14:46:01 +02:00
f"Batt: Mode {batt_mode} | Boot Count {boot_count} | "
f"Charge current {batt_current} | Voltage {batt_voltage}"
2022-04-04 18:46:52 +02:00
)
2022-04-07 12:51:47 +02:00
temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | "
log_to_both(printer, temps)
log_to_both(printer, batt_info)
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9)
if set_id == SetIds.P60_AUX:
2022-04-08 14:46:01 +02:00
log_to_both(printer, "Received P60 AUX HK. Voltages in mV, currents in mA")
2022-04-07 12:51:47 +02:00
current_idx = 0
latchup_list = []
2022-04-08 14:46:01 +02:00
log_to_both(printer, "P60 Dock Latchups")
2022-04-07 12:51:47 +02:00
for idx in range(0, 13):
latchup_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
content_line = f"{P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}"
log_to_both(printer, content_line)
current_idx += 2
fmt_str = "!IIHBBHHhhB"
inc_len = struct.calcsize(fmt_str)
(
boot_cause,
uptime,
reset_cause,
heater_on,
conv_5v_on,
dock_vbat,
dock_vcc_c,
batt_temp_0,
batt_temp_1,
dearm_status,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
wdt = WdtInfo()
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
2022-04-07 12:51:47 +02:00
fmt_str = "!hhbb"
inc_len = struct.calcsize(fmt_str)
(
batt_charge_current,
batt_discharge_current,
ant6_depl,
ar6_depl,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
device_types = []
device_statuses = []
for idx in range(8):
device_types.append(hk_data[current_idx])
current_idx += 1
for idx in range(8):
device_statuses.append(hk_data[current_idx])
current_idx += 1
util_info = (
f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}"
2022-04-05 19:27:55 +02:00
)
2022-04-07 12:51:47 +02:00
util_info_2 = (
f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | "
f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}"
2022-04-04 18:46:52 +02:00
)
2022-04-07 12:51:47 +02:00
log_to_both(printer, util_info)
log_to_both(printer, util_info_2)
wdt.print(printer)
2022-04-07 12:51:47 +02:00
misc_info = (
f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}"
2022-04-05 19:49:42 +02:00
)
2022-04-07 12:51:47 +02:00
log_to_both(printer, misc_info)
batt_info = (
2022-04-08 14:46:01 +02:00
f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | "
2022-04-07 12:51:47 +02:00
f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}"
2022-04-05 19:49:42 +02:00
)
2022-04-07 12:51:47 +02:00
log_to_both(printer, batt_info)
printer.print_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=27
2022-04-05 19:27:55 +02:00
)