358 lines
12 KiB
Python
358 lines
12 KiB
Python
"""HK Handling for EIVE OBSW"""
|
|
import struct
|
|
import os
|
|
import datetime
|
|
|
|
from tmtccmd.config.definitions import HkReplyUnpacked
|
|
from tmtccmd.tm.service_3_fsfw_housekeeping import Service3Base
|
|
from tmtccmd.utility.logger import get_console_logger
|
|
from pus_tc.devs.bpx_batt import BpxSetIds
|
|
from pus_tc.devs.syrlinks_hk_handler import SetIds
|
|
from pus_tc.devs.imtq import ImtqSetIds
|
|
from config.object_ids import (
|
|
SYRLINKS_HANDLER_ID,
|
|
IMTQ_HANDLER_ID,
|
|
GPS_HANDLER_0_ID,
|
|
GPS_HANDLER_1_ID,
|
|
BPX_HANDLER_ID,
|
|
CORE_CONTROLLER_ID
|
|
)
|
|
|
|
LOGGER = get_console_logger()
|
|
|
|
|
|
def handle_user_hk_packet(
|
|
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
|
|
) -> HkReplyUnpacked:
|
|
"""This function is called when a Service 3 Housekeeping packet is received."""
|
|
if object_id == SYRLINKS_HANDLER_ID:
|
|
if set_id == SetIds.RX_REGISTERS_DATASET:
|
|
return handle_syrlinks_rx_registers_dataset(hk_data)
|
|
elif set_id == SetIds.TX_REGISTERS_DATASET:
|
|
return handle_syrlinks_tx_registers_dataset(hk_data)
|
|
else:
|
|
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
|
|
elif object_id == IMTQ_HANDLER_ID:
|
|
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (
|
|
set_id <= ImtqSetIds.NEGATIVE_Z_TEST
|
|
):
|
|
return handle_self_test_data(hk_data)
|
|
else:
|
|
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
|
|
elif object_id == GPS_HANDLER_0_ID or object_id == GPS_HANDLER_1_ID:
|
|
return handle_gps_data(hk_data=hk_data)
|
|
elif object_id == BPX_HANDLER_ID:
|
|
return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id)
|
|
elif object_id == CORE_CONTROLLER_ID:
|
|
return handle_core_hk_data(hk_data=hk_data, set_id=set_id)
|
|
else:
|
|
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
|
|
return HkReplyUnpacked()
|
|
|
|
|
|
def handle_syrlinks_rx_registers_dataset(
|
|
hk_data: bytearray,
|
|
) -> HkReplyUnpacked:
|
|
reply = HkReplyUnpacked()
|
|
reply.header_list = [
|
|
"RX Status",
|
|
"RX Sensitivity",
|
|
"RX Frequency Shift",
|
|
"RX IQ Power",
|
|
"RX AGC Value",
|
|
"RX Demod Eb",
|
|
"RX Demod N0",
|
|
"RX Datarate",
|
|
]
|
|
rx_status = hk_data[0]
|
|
rx_sensitivity = struct.unpack("!I", hk_data[1:5])
|
|
rx_frequency_shift = struct.unpack("!I", hk_data[5:9])
|
|
rx_iq_power = struct.unpack("!H", hk_data[9:11])
|
|
rx_agc_value = struct.unpack("!H", hk_data[11:13])
|
|
rx_demod_eb = struct.unpack("!I", hk_data[13:17])
|
|
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])
|
|
rx_data_rate = hk_data[21]
|
|
reply.content_list = [
|
|
rx_status,
|
|
rx_sensitivity,
|
|
rx_frequency_shift,
|
|
rx_iq_power,
|
|
rx_agc_value,
|
|
rx_demod_eb,
|
|
rx_demod_n0,
|
|
rx_data_rate,
|
|
]
|
|
reply.validity_buffer = hk_data[22:]
|
|
reply.num_of_vars = 8
|
|
return reply
|
|
|
|
|
|
def handle_syrlinks_tx_registers_dataset(
|
|
hk_data: bytearray,
|
|
) -> HkReplyUnpacked:
|
|
reply = HkReplyUnpacked()
|
|
reply.header_list = ["TX Status", "TX Waveform", "TX AGC value"]
|
|
tx_status = hk_data[0]
|
|
tx_waveform = hk_data[1]
|
|
tx_agc_value = struct.unpack("!H", hk_data[2:4])
|
|
reply.content_list = [tx_status, tx_waveform, tx_agc_value]
|
|
reply.validity_buffer = hk_data[4:]
|
|
reply.num_of_vars = 3
|
|
return reply
|
|
|
|
|
|
def handle_self_test_data(hk_data: bytearray) -> HkReplyUnpacked:
|
|
reply = HkReplyUnpacked()
|
|
reply.hk_header = [
|
|
"Init Err",
|
|
"Init Raw Mag X [nT]",
|
|
"Init Raw Mag Y [nT]",
|
|
"Init Raw Mag Z [nT]",
|
|
"Init Cal Mag X [nT]",
|
|
"Init Cal Mag Y [nT]",
|
|
"Init Cal Mag Z [nT]",
|
|
"Init Coil X Current [mA]",
|
|
"Init Coil Y Current [mA]",
|
|
"Init Coil Z Current [mA]",
|
|
"Init Coil X Temperature [°C]",
|
|
"Init Coil Y Temperature [°C]",
|
|
"Init Coil Z Temperature [°C]",
|
|
"Err",
|
|
"Raw Mag X [nT]",
|
|
"Raw Mag Y [nT]",
|
|
"Raw Mag Z [nT]",
|
|
"Cal Mag X [nT]",
|
|
"Cal Mag Y [nT]",
|
|
"Cal Mag Z [nT]",
|
|
"Coil X Current [mA]",
|
|
"Coil Y Current [mA]",
|
|
"Coil Z Current [mA]",
|
|
"Coil X Temperature [°C]",
|
|
"Coil Y Temperature [°C]",
|
|
"Coil Z Temperature [°C]",
|
|
"Fina Err",
|
|
"Fina Raw Mag X [nT]",
|
|
"Fina Raw Mag Y [nT]",
|
|
"Fina Raw Mag Z [nT]",
|
|
"Fina Cal Mag X [nT]",
|
|
"Fina Cal Mag Y [nT]",
|
|
"Fina Cal Mag Z [nT]",
|
|
"Fina Coil X Current [mA]",
|
|
"Fina Coil Y Current [mA]",
|
|
"Fina Coil Z Current [mA]",
|
|
"Fina Coil X Temperature [°C]",
|
|
"Fina Coil Y Temperature [°C]",
|
|
"Fina Coil Z Temperature [°C]",
|
|
]
|
|
# INIT step (no coil actuation)
|
|
init_err = hk_data[0]
|
|
init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0]
|
|
init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0]
|
|
init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0]
|
|
init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0]
|
|
init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0]
|
|
init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0]
|
|
init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0]
|
|
init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0]
|
|
init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0]
|
|
init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0]
|
|
init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0]
|
|
init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0]
|
|
|
|
# Actuation step
|
|
err = hk_data[43]
|
|
raw_mag_x = struct.unpack("!f", hk_data[44:48])[0]
|
|
raw_mag_y = struct.unpack("!f", hk_data[48:52])[0]
|
|
raw_mag_z = struct.unpack("!f", hk_data[52:56])[0]
|
|
cal_mag_x = struct.unpack("!f", hk_data[56:60])[0]
|
|
cal_mag_y = struct.unpack("!f", hk_data[60:64])[0]
|
|
cal_mag_z = struct.unpack("!f", hk_data[64:68])[0]
|
|
coil_x_current = struct.unpack("!f", hk_data[68:72])[0]
|
|
coil_y_current = struct.unpack("!f", hk_data[72:76])[0]
|
|
coil_z_current = struct.unpack("!f", hk_data[76:80])[0]
|
|
coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0]
|
|
coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0]
|
|
coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0]
|
|
|
|
# FINA step (no coil actuation)
|
|
fina_err = hk_data[86]
|
|
fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0]
|
|
fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0]
|
|
fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0]
|
|
fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0]
|
|
fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0]
|
|
fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0]
|
|
fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0]
|
|
fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0]
|
|
fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0]
|
|
fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0]
|
|
fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0]
|
|
fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0]
|
|
|
|
reply.validity_buffer = hk_data[129:]
|
|
reply.content_list = [
|
|
init_err,
|
|
init_raw_mag_x,
|
|
init_raw_mag_y,
|
|
init_raw_mag_z,
|
|
init_cal_mag_x,
|
|
init_cal_mag_y,
|
|
init_cal_mag_z,
|
|
init_coil_x_current,
|
|
init_coil_y_current,
|
|
init_coil_z_current,
|
|
init_coil_x_temperature,
|
|
init_coil_y_temperature,
|
|
init_coil_z_temperature,
|
|
err,
|
|
raw_mag_x,
|
|
init_raw_mag_y,
|
|
raw_mag_z,
|
|
cal_mag_x,
|
|
cal_mag_y,
|
|
cal_mag_z,
|
|
coil_x_current,
|
|
coil_y_current,
|
|
coil_z_current,
|
|
coil_x_temperature,
|
|
coil_y_temperature,
|
|
coil_z_temperature,
|
|
fina_err,
|
|
fina_raw_mag_x,
|
|
fina_raw_mag_y,
|
|
fina_raw_mag_z,
|
|
fina_cal_mag_x,
|
|
fina_cal_mag_y,
|
|
fina_cal_mag_z,
|
|
fina_coil_x_current,
|
|
fina_coil_y_current,
|
|
fina_coil_z_current,
|
|
fina_coil_x_temperature,
|
|
fina_coil_y_temperature,
|
|
fina_coil_z_temperature,
|
|
]
|
|
reply.num_of_vars = len(reply.hk_header)
|
|
return reply
|
|
|
|
|
|
def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked:
|
|
LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}")
|
|
reply = HkReplyUnpacked()
|
|
var_index = 0
|
|
header_array = []
|
|
content_array = []
|
|
reply.header_list = [
|
|
"Latitude",
|
|
"Longitude",
|
|
"Altitude",
|
|
"Fix Mode",
|
|
"Sats in Use",
|
|
"Date",
|
|
"Unix Seconds",
|
|
]
|
|
latitude = struct.unpack("!d", hk_data[0:8])[0]
|
|
longitude = struct.unpack("!d", hk_data[8:16])[0]
|
|
altitude = struct.unpack("!d", hk_data[16:24])[0]
|
|
fix_mode = hk_data[24]
|
|
sat_in_use = hk_data[25]
|
|
year = struct.unpack("!H", hk_data[26:28])[0]
|
|
month = hk_data[28]
|
|
day = hk_data[29]
|
|
hours = hk_data[30]
|
|
minutes = hk_data[31]
|
|
seconds = hk_data[32]
|
|
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
|
|
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
|
|
content_array = [
|
|
latitude,
|
|
longitude,
|
|
altitude,
|
|
fix_mode,
|
|
sat_in_use,
|
|
date_string,
|
|
unix_seconds,
|
|
]
|
|
var_index += 13
|
|
reply.num_of_vars = var_index
|
|
if not os.path.isfile("gps_log.txt"):
|
|
with open("gps_log.txt", "w") as gps_file:
|
|
gps_file.write(
|
|
"Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, "
|
|
"Date, Unix Seconds\n"
|
|
)
|
|
with open("gps_log.txt", "a") as gps_file:
|
|
gps_file.write(
|
|
f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, "
|
|
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n"
|
|
)
|
|
reply.header_list = header_array
|
|
reply.content_list = content_array
|
|
reply.validity_buffer = hk_data[37:39]
|
|
return reply
|
|
|
|
|
|
def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
|
|
LOGGER.info(f"Received BPX data, HK data length {len(hk_data)}")
|
|
reply = HkReplyUnpacked()
|
|
if set_id == BpxSetIds.GET_HK_SET:
|
|
charge_current = struct.unpack("!H", hk_data[0:2])[0]
|
|
discharge_current = struct.unpack("!H", hk_data[2:4])[0]
|
|
heater_current = struct.unpack("!H", hk_data[4:6])[0]
|
|
batt_voltage = struct.unpack("!H", hk_data[6:8])[0]
|
|
batt_temp_1 = struct.unpack("!h", hk_data[8:10])[0]
|
|
batt_temp_2 = struct.unpack("!h", hk_data[10:12])[0]
|
|
batt_temp_3 = struct.unpack("!h", hk_data[12:14])[0]
|
|
batt_temp_4 = struct.unpack("!h", hk_data[14:16])[0]
|
|
reboot_cntr = struct.unpack("!I", hk_data[16:20])[0]
|
|
boot_cause = hk_data[20]
|
|
reply.header_list = [
|
|
"Charge Current",
|
|
"Discharge Current",
|
|
"Heater Current",
|
|
"Battery Voltage",
|
|
"Batt Temp 1",
|
|
"Batt Temp 2",
|
|
"Batt Temp 3",
|
|
"Batt Temp 4",
|
|
"Reboot Counter",
|
|
"Boot Cause",
|
|
]
|
|
reply.content_list = [
|
|
charge_current,
|
|
discharge_current,
|
|
heater_current,
|
|
batt_voltage,
|
|
batt_temp_1,
|
|
batt_temp_2,
|
|
batt_temp_3,
|
|
batt_temp_4,
|
|
reboot_cntr,
|
|
boot_cause,
|
|
]
|
|
reply.validity_buffer = hk_data[21:]
|
|
elif set_id == BpxSetIds.GET_CFG_SET:
|
|
battheat_mode = hk_data[0]
|
|
battheat_low = struct.unpack("!b", hk_data[1:2])[0]
|
|
battheat_high = struct.unpack("!b", hk_data[2:3])[0]
|
|
reply.header_list = [
|
|
"Battery Heater Mode",
|
|
"Battery Heater Low Limit",
|
|
"Battery Heater High Limit",
|
|
]
|
|
reply.content_list = [battheat_mode, battheat_low, battheat_high]
|
|
reply.validity_buffer = hk_data[3:]
|
|
return reply
|
|
|
|
|
|
def handle_core_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
|
|
reply = HkReplyUnpacked()
|
|
reply.header_list = ["Chip Temperature [°C]", "PS Voltage [mV]", "PL Voltage [mV]"]
|
|
temperature = struct.unpack("!f", hk_data[0:4])
|
|
ps_voltage = struct.unpack("!f", hk_data[4:8])
|
|
pl_voltage = struct.unpack("!f", hk_data[8:12])
|
|
tx_agc_value = struct.unpack("!H", hk_data[2:4])
|
|
reply.content_list = [temperature, ps_voltage, pl_voltage]
|
|
reply.validity_buffer = hk_data[12:]
|
|
reply.num_of_vars = 3
|
|
return reply
|