"""HK Handling for EIVE OBSW""" import struct import os import datetime from typing import Tuple from tmtccmd.tm.service_3_housekeeping import Service3Base from tmtccmd.utility.logger import get_console_logger from pus_tc.syrlinks_hk_handler import SetIds from pus_tc.imtq import ImtqSetIds from config.object_ids import SYRLINKS_HANDLER_ID, IMTQ_HANDLER_ID, GPS_HANDLER_0_ID, GPS_HANDLER_1_ID LOGGER = get_console_logger() def handle_user_hk_packet( object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base ) -> Tuple[list, list, bytearray, int]: """This function is called when a Service 3 Housekeeping packet is received. """ if object_id == SYRLINKS_HANDLER_ID: if set_id == SetIds.RX_REGISTERS_DATASET: return handle_syrlinks_rx_registers_dataset(hk_data) elif set_id == SetIds.TX_REGISTERS_DATASET: return handle_syrlinks_tx_registers_dataset(hk_data) else: LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id") return [], [], bytearray(), 0 elif object_id == IMTQ_HANDLER_ID: if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (set_id <= ImtqSetIds.NEGATIVE_Z_TEST): return handle_self_test_data(hk_data) else: LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id") return [], [], bytearray(), 0 elif object_id == GPS_HANDLER_0_ID or object_id == GPS_HANDLER_1_ID: return handle_gps_data(hk_data=hk_data) else: LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.") return [], [], bytearray(), 0 def handle_syrlinks_rx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: hk_header = [] hk_content = [] validity_buffer = bytearray() hk_header = ["RX Status", "RX Sensitivity", "RX Frequency Shift", "RX IQ Power", "RX AGC Value", "RX Demod Eb", "RX Demod N0", "RX Datarate"] rx_status = hk_data[0] rx_sensitivity = struct.unpack('!I', hk_data[1:5]) rx_frequency_shift = struct.unpack('!I', hk_data[5:9]) rx_iq_power = struct.unpack('!H', hk_data[9:11]) rx_agc_value = struct.unpack('!H', hk_data[11:13]) rx_demod_eb = struct.unpack('!I', hk_data[13:17]) rx_demod_n0 = struct.unpack('!I', hk_data[17:21]) rx_data_rate = hk_data[21] hk_content = [rx_status, rx_sensitivity, rx_frequency_shift, rx_iq_power, rx_agc_value, rx_demod_eb, rx_demod_n0, rx_data_rate] return hk_header, hk_content, validity_buffer, 8 def handle_syrlinks_tx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: hk_header = [] hk_content = [] validity_buffer = bytearray() hk_header = ["TX Status", "TX Waveform", "TX AGC value"] tx_status = hk_data[0] tx_waveform = hk_data[1] tx_agc_value = struct.unpack('!H', hk_data[2:4]) hk_content = [tx_status, tx_waveform, tx_agc_value] return hk_header, hk_content, validity_buffer, 3 def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: hk_header = [] hk_content = [] validity_buffer = bytearray() hk_header = [ "Init Err", "Init Raw Mag X [nT]", "Init Raw Mag Y [nT]", "Init Raw Mag Z [nT]", "Init Cal Mag X [nT]", "Init Cal Mag Y [nT]", "Init Cal Mag Z [nT]", "Init Coil X Current [mA]", "Init Coil Y Current [mA]", "Init Coil Z Current [mA]", "Init Coil X Temperature [°C]", "Init Coil Y Temperature [°C]", "Init Coil Z Temperature [°C]", "Err", "Raw Mag X [nT]", "Raw Mag Y [nT]", "Raw Mag Z [nT]", "Cal Mag X [nT]", "Cal Mag Y [nT]", "Cal Mag Z [nT]", "Coil X Current [mA]", "Coil Y Current [mA]", "Coil Z Current [mA]", "Coil X Temperature [°C]", "Coil Y Temperature [°C]", "Coil Z Temperature [°C]", "Fina Err", "Fina Raw Mag X [nT]", "Fina Raw Mag Y [nT]", "Fina Raw Mag Z [nT]", "Fina Cal Mag X [nT]", "Fina Cal Mag Y [nT]", "Fina Cal Mag Z [nT]", "Fina Coil X Current [mA]", "Fina Coil Y Current [mA]", "Fina Coil Z Current [mA]", "Fina Coil X Temperature [°C]", "Fina Coil Y Temperature [°C]", "Fina Coil Z Temperature [°C]" ] # INIT step (no coil actuation) init_err = hk_data[0] init_raw_mag_x = struct.unpack('!f', hk_data[1:5])[0] init_raw_mag_y = struct.unpack('!f', hk_data[5:9])[0] init_raw_mag_z = struct.unpack('!f', hk_data[9:13])[0] init_cal_mag_x = struct.unpack('!f', hk_data[13:17])[0] init_cal_mag_y = struct.unpack('!f', hk_data[17:21])[0] init_cal_mag_z = struct.unpack('!f', hk_data[21:25])[0] init_coil_x_current = struct.unpack('!f', hk_data[25:29])[0] init_coil_y_current = struct.unpack('!f', hk_data[29:33])[0] init_coil_z_current = struct.unpack('!f', hk_data[33:37])[0] init_coil_x_temperature = struct.unpack('!H', hk_data[37:39])[0] init_coil_y_temperature = struct.unpack('!H', hk_data[39:41])[0] init_coil_z_temperature = struct.unpack('!H', hk_data[41:43])[0] # Actuation step err = hk_data[43] raw_mag_x = struct.unpack('!f', hk_data[44:48])[0] raw_mag_y = struct.unpack('!f', hk_data[48:52])[0] raw_mag_z = struct.unpack('!f', hk_data[52:56])[0] cal_mag_x = struct.unpack('!f', hk_data[56:60])[0] cal_mag_y = struct.unpack('!f', hk_data[60:64])[0] cal_mag_z = struct.unpack('!f', hk_data[64:68])[0] coil_x_current = struct.unpack('!f', hk_data[68:72])[0] coil_y_current = struct.unpack('!f', hk_data[72:76])[0] coil_z_current = struct.unpack('!f', hk_data[76:80])[0] coil_x_temperature = struct.unpack('!H', hk_data[80:82])[0] coil_y_temperature = struct.unpack('!H', hk_data[82:84])[0] coil_z_temperature = struct.unpack('!H', hk_data[84:86])[0] # FINA step (no coil actuation) fina_err = hk_data[86] fina_raw_mag_x = struct.unpack('!f', hk_data[87:91])[0] fina_raw_mag_y = struct.unpack('!f', hk_data[91:95])[0] fina_raw_mag_z = struct.unpack('!f', hk_data[95:99])[0] fina_cal_mag_x = struct.unpack('!f', hk_data[99:103])[0] fina_cal_mag_y = struct.unpack('!f', hk_data[103:107])[0] fina_cal_mag_z = struct.unpack('!f', hk_data[107:111])[0] fina_coil_x_current = struct.unpack('!f', hk_data[111:115])[0] fina_coil_y_current = struct.unpack('!f', hk_data[115:119])[0] fina_coil_z_current = struct.unpack('!f', hk_data[119:123])[0] fina_coil_x_temperature = struct.unpack('!H', hk_data[123:125])[0] fina_coil_y_temperature = struct.unpack('!H', hk_data[125:127])[0] fina_coil_z_temperature = struct.unpack('!H', hk_data[127:129])[0] hk_content = [ init_err, init_raw_mag_x, init_raw_mag_y, init_raw_mag_z, init_cal_mag_x, init_cal_mag_y, init_cal_mag_z, init_coil_x_current, init_coil_y_current, init_coil_z_current, init_coil_x_temperature, init_coil_y_temperature, init_coil_z_temperature, err, raw_mag_x, init_raw_mag_y, raw_mag_z, cal_mag_x, cal_mag_y, cal_mag_z, coil_x_current, coil_y_current, coil_z_current, coil_x_temperature, coil_y_temperature, coil_z_temperature, fina_err, fina_raw_mag_x, fina_raw_mag_y, fina_raw_mag_z, fina_cal_mag_x, fina_cal_mag_y, fina_cal_mag_z, fina_coil_x_current, fina_coil_y_current, fina_coil_z_current, fina_coil_x_temperature, fina_coil_y_temperature, fina_coil_z_temperature ] return hk_header, hk_content, validity_buffer, len(hk_header) def handle_gps_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: LOGGER.info(f'Received GPS data, HK data length {len(hk_data)}') var_index = 0 header_array = [] content_array = [] latitude = struct.unpack('!d', hk_data[0:8])[0] header_array.append('Latitude') content_array.append(latitude) longitude = struct.unpack('!d', hk_data[8:16])[0] header_array.append('Longitude') content_array.append(longitude) altitude = struct.unpack('!d', hk_data[16:24])[0] header_array.append('Altitude') content_array.append(altitude) fix_mode = hk_data[24] header_array.append('Fix Mode') content_array.append(fix_mode) sat_in_use = hk_data[25] header_array.append('Sats in Use') content_array.append(sat_in_use) year = struct.unpack('!H', hk_data[26:28])[0] month = hk_data[28] day = hk_data[29] hours = hk_data[30] minutes = hk_data[31] seconds = hk_data[32] header_array.append('Date') date_string = f'{day}.{month}.{year} {hours}:{minutes}:{seconds}' content_array.append(date_string) unix_seconds = struct.unpack('!I', hk_data[33:37])[0] header_array.append('Unix Seconds') content_array.append(unix_seconds) var_index += 13 if not os.path.isfile('gps_log.txt'): with open("gps_log.txt", "w") as gps_file: gps_file.write( 'Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, ' 'Date, Unix Seconds\n' ) with open("gps_log.txt", "a") as gps_file: gps_file.write( f'{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, ' f'{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n' ) validity_buffer = hk_data[37:39] return header_array, content_array, validity_buffer, var_index