eive-tmtc/pus_tm/hk_handling.py

199 lines
9.2 KiB
Python
Raw Normal View History

2021-08-11 19:16:08 +02:00
"""HK Handling for EIVE OBSW"""
2021-06-15 15:23:27 +02:00
import struct
2021-09-06 18:26:56 +02:00
import os
import datetime
2020-12-17 17:50:00 +01:00
from typing import Tuple
from tmtccmd.tm.service_3_housekeeping import Service3Base
2021-06-28 19:06:09 +02:00
from tmtccmd.utility.logger import get_console_logger
2021-06-15 15:23:27 +02:00
from pus_tc.syrlinks_hk_handler import SetIds
from pus_tc.imtq import ImtqSetIds
2022-01-18 11:56:30 +01:00
from config.object_ids import SYRLINKS_HANDLER_ID, IMTQ_HANDLER_ID, GPS_HANDLER_0_ID, GPS_HANDLER_1_ID
2021-06-28 19:06:09 +02:00
LOGGER = get_console_logger()
2020-12-17 17:50:00 +01:00
2021-09-06 14:52:41 +02:00
def handle_user_hk_packet(
object_id: bytes, set_id: int, hk_data: bytearray,
service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
2021-08-11 19:16:08 +02:00
"""This function is called when a Service 3 Housekeeping packet is received.
2020-12-17 17:50:00 +01:00
"""
2022-01-18 11:56:30 +01:00
if object_id == SYRLINKS_HANDLER_ID:
2021-06-15 15:23:27 +02:00
if set_id == SetIds.RX_REGISTERS_DATASET:
return handle_syrlinks_rx_registers_dataset(hk_data)
elif set_id == SetIds.TX_REGISTERS_DATASET:
return handle_syrlinks_tx_registers_dataset(hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
return [], [], bytearray(), 0
elif object_id == IMTQ_HANDLER_ID:
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (set_id <= ImtqSetIds.NEGATIVE_Z_TEST):
return handle_self_test_data(hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
return [], [], bytearray(), 0
2021-08-11 19:26:40 +02:00
elif object_id == GPS_HANDLER_0_ID or object_id == GPS_HANDLER_1_ID:
2021-08-11 19:16:08 +02:00
return handle_gps_data(hk_data=hk_data)
2021-06-15 15:23:27 +02:00
else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
return [], [], bytearray(), 0
def handle_syrlinks_rx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
hk_header = []
hk_content = []
validity_buffer = bytearray()
hk_header = ["RX Status", "RX Sensitivity", "RX Frequency Shift", "RX IQ Power", "RX AGC Value", "RX Demod Eb",
"RX Demod N0", "RX Datarate"]
rx_status = hk_data[0]
rx_sensitivity = struct.unpack('!I', hk_data[1:5])
rx_frequency_shift = struct.unpack('!I', hk_data[5:9])
rx_iq_power = struct.unpack('!H', hk_data[9:11])
rx_agc_value = struct.unpack('!H', hk_data[11:13])
rx_demod_eb = struct.unpack('!I', hk_data[13:17])
rx_demod_n0 = struct.unpack('!I', hk_data[17:21])
rx_data_rate = hk_data[21]
hk_content = [rx_status, rx_sensitivity, rx_frequency_shift, rx_iq_power, rx_agc_value, rx_demod_eb, rx_demod_n0,
rx_data_rate]
return hk_header, hk_content, validity_buffer, 8
def handle_syrlinks_tx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
hk_header = []
hk_content = []
validity_buffer = bytearray()
hk_header = ["TX Status", "TX Waveform", "TX AGC value"]
tx_status = hk_data[0]
tx_waveform = hk_data[1]
tx_agc_value = struct.unpack('!H', hk_data[2:4])
hk_content = [tx_status, tx_waveform, tx_agc_value]
return hk_header, hk_content, validity_buffer, 3
def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
hk_header = []
hk_content = []
validity_buffer = bytearray()
2021-08-11 19:16:08 +02:00
hk_header = [
2021-09-06 13:17:23 +02:00
"Init Err", "Init Raw Mag X [nT]", "Init Raw Mag Y [nT]", "Init Raw Mag Z [nT]",
"Init Cal Mag X [nT]", "Init Cal Mag Y [nT]", "Init Cal Mag Z [nT]",
"Init Coil X Current [mA]", "Init Coil Y Current [mA]", "Init Coil Z Current [mA]",
"Init Coil X Temperature [°C]", "Init Coil Y Temperature [°C]",
"Init Coil Z Temperature [°C]", "Err", "Raw Mag X [nT]", "Raw Mag Y [nT]", "Raw Mag Z [nT]",
"Cal Mag X [nT]", "Cal Mag Y [nT]", "Cal Mag Z [nT]", "Coil X Current [mA]",
"Coil Y Current [mA]", "Coil Z Current [mA]",
"Coil X Temperature [°C]", "Coil Y Temperature [°C]", "Coil Z Temperature [°C]",
"Fina Err", "Fina Raw Mag X [nT]", "Fina Raw Mag Y [nT]", "Fina Raw Mag Z [nT]",
"Fina Cal Mag X [nT]", "Fina Cal Mag Y [nT]", "Fina Cal Mag Z [nT]",
"Fina Coil X Current [mA]", "Fina Coil Y Current [mA]", "Fina Coil Z Current [mA]",
"Fina Coil X Temperature [°C]", "Fina Coil Y Temperature [°C]",
"Fina Coil Z Temperature [°C]"
2021-08-11 19:16:08 +02:00
]
2021-06-15 15:23:27 +02:00
# INIT step (no coil actuation)
init_err = hk_data[0]
init_raw_mag_x = struct.unpack('!f', hk_data[1:5])[0]
init_raw_mag_y = struct.unpack('!f', hk_data[5:9])[0]
init_raw_mag_z = struct.unpack('!f', hk_data[9:13])[0]
init_cal_mag_x = struct.unpack('!f', hk_data[13:17])[0]
init_cal_mag_y = struct.unpack('!f', hk_data[17:21])[0]
init_cal_mag_z = struct.unpack('!f', hk_data[21:25])[0]
init_coil_x_current = struct.unpack('!f', hk_data[25:29])[0]
init_coil_y_current = struct.unpack('!f', hk_data[29:33])[0]
init_coil_z_current = struct.unpack('!f', hk_data[33:37])[0]
init_coil_x_temperature = struct.unpack('!H', hk_data[37:39])[0]
init_coil_y_temperature = struct.unpack('!H', hk_data[39:41])[0]
init_coil_z_temperature = struct.unpack('!H', hk_data[41:43])[0]
# Actuation step
err = hk_data[43]
raw_mag_x = struct.unpack('!f', hk_data[44:48])[0]
raw_mag_y = struct.unpack('!f', hk_data[48:52])[0]
raw_mag_z = struct.unpack('!f', hk_data[52:56])[0]
cal_mag_x = struct.unpack('!f', hk_data[56:60])[0]
cal_mag_y = struct.unpack('!f', hk_data[60:64])[0]
cal_mag_z = struct.unpack('!f', hk_data[64:68])[0]
coil_x_current = struct.unpack('!f', hk_data[68:72])[0]
coil_y_current = struct.unpack('!f', hk_data[72:76])[0]
coil_z_current = struct.unpack('!f', hk_data[76:80])[0]
coil_x_temperature = struct.unpack('!H', hk_data[80:82])[0]
coil_y_temperature = struct.unpack('!H', hk_data[82:84])[0]
coil_z_temperature = struct.unpack('!H', hk_data[84:86])[0]
# FINA step (no coil actuation)
fina_err = hk_data[86]
fina_raw_mag_x = struct.unpack('!f', hk_data[87:91])[0]
fina_raw_mag_y = struct.unpack('!f', hk_data[91:95])[0]
fina_raw_mag_z = struct.unpack('!f', hk_data[95:99])[0]
fina_cal_mag_x = struct.unpack('!f', hk_data[99:103])[0]
fina_cal_mag_y = struct.unpack('!f', hk_data[103:107])[0]
fina_cal_mag_z = struct.unpack('!f', hk_data[107:111])[0]
fina_coil_x_current = struct.unpack('!f', hk_data[111:115])[0]
fina_coil_y_current = struct.unpack('!f', hk_data[115:119])[0]
fina_coil_z_current = struct.unpack('!f', hk_data[119:123])[0]
fina_coil_x_temperature = struct.unpack('!H', hk_data[123:125])[0]
fina_coil_y_temperature = struct.unpack('!H', hk_data[125:127])[0]
fina_coil_z_temperature = struct.unpack('!H', hk_data[127:129])[0]
2021-08-11 19:16:08 +02:00
hk_content = [
init_err, init_raw_mag_x, init_raw_mag_y, init_raw_mag_z, init_cal_mag_x, init_cal_mag_y,
init_cal_mag_z, init_coil_x_current, init_coil_y_current, init_coil_z_current,
2021-09-06 13:17:23 +02:00
init_coil_x_temperature, init_coil_y_temperature, init_coil_z_temperature, err,
raw_mag_x, init_raw_mag_y, raw_mag_z, cal_mag_x, cal_mag_y, cal_mag_z,
coil_x_current, coil_y_current, coil_z_current,
coil_x_temperature, coil_y_temperature, coil_z_temperature, fina_err,
fina_raw_mag_x, fina_raw_mag_y, fina_raw_mag_z,
fina_cal_mag_x, fina_cal_mag_y, fina_cal_mag_z,
fina_coil_x_current, fina_coil_y_current, fina_coil_z_current,
fina_coil_x_temperature, fina_coil_y_temperature, fina_coil_z_temperature
2021-08-11 19:16:08 +02:00
]
2021-06-15 15:23:27 +02:00
return hk_header, hk_content, validity_buffer, len(hk_header)
2021-08-11 19:16:08 +02:00
2021-08-11 19:26:40 +02:00
def handle_gps_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
2021-08-11 19:32:51 +02:00
LOGGER.info(f'Received GPS data, HK data length {len(hk_data)}')
2021-09-06 14:52:41 +02:00
var_index = 0
header_array = []
content_array = []
latitude = struct.unpack('!d', hk_data[0:8])[0]
header_array.append('Latitude')
content_array.append(latitude)
longitude = struct.unpack('!d', hk_data[8:16])[0]
header_array.append('Longitude')
content_array.append(longitude)
altitude = struct.unpack('!d', hk_data[16:24])[0]
header_array.append('Altitude')
content_array.append(altitude)
fix_mode = hk_data[24]
header_array.append('Fix Mode')
content_array.append(fix_mode)
sat_in_use = hk_data[25]
header_array.append('Sats in Use')
content_array.append(sat_in_use)
year = struct.unpack('!H', hk_data[26:28])[0]
month = hk_data[28]
day = hk_data[29]
hours = hk_data[30]
minutes = hk_data[31]
seconds = hk_data[32]
header_array.append('Date')
2021-09-06 18:26:56 +02:00
date_string = f'{day}.{month}.{year} {hours}:{minutes}:{seconds}'
content_array.append(date_string)
2021-09-06 14:52:41 +02:00
unix_seconds = struct.unpack('!I', hk_data[33:37])[0]
header_array.append('Unix Seconds')
content_array.append(unix_seconds)
var_index += 13
2021-09-06 18:26:56 +02:00
if not os.path.isfile('gps_log.txt'):
with open("gps_log.txt", "w") as gps_file:
gps_file.write(
'Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, '
'Date, Unix Seconds\n'
)
with open("gps_log.txt", "a") as gps_file:
gps_file.write(
f'{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, '
f'{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n'
)
2021-09-06 14:52:41 +02:00
validity_buffer = hk_data[37:39]
return header_array, content_array, validity_buffer, var_index