"""HK Handling for EIVE OBSW""" import struct import os import datetime import json import socket import base64 from tmtccmd.config.definitions import HkReplyUnpacked from tmtccmd.tm.pus_3_fsfw_hk import ( Service3Base, HkContentType, Service3FsfwTm, ) from tmtccmd.logging import get_console_logger from pus_tc.devs.bpx_batt import BpxSetIds from pus_tc.devs.syrlinks_hk_handler import SetIds from pus_tc.devs.p60dock import SetIds from pus_tc.devs.imtq import ImtqSetIds from tmtccmd.utility.obj_id import ObjectId, ObjectIdDictT import config.object_ids as obj_ids from pus_tm.devs.reaction_wheels import handle_rw_hk_data from pus_tm.defs import FsfwTmTcPrinter, log_to_both # TODO add to configuration parameters THERMAL_HOST = "127.0.0.1" THERMAL_PORT = 7302 LOGGER = get_console_logger() class HkHandler(): _Instance = None def getInstance(): if HkHandler._Instance == None: HkHandler._Instance = HkHandler() return HkHandler._Instance def __init__( self): self.raw_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.parsed_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.raw_server_socket.bind(("", 3705)) self.parsed_server_socket.bind(("", 3706)) # for now, only accept one connection self.raw_server_socket.listen(0) self.parsed_server_socket.listen(0) self.raw_server_socket.setblocking(False) self.parsed_server_socket.setblocking(False) self.raw_connection = None self.parsed_connection = None def __del__(self): try: self.close() except: LOGGER.warning("Could not close sockets!") def close(self): self.raw_server_socket.close self.parsed_server_socket.close if self.raw_connection != None: self.raw_connection.close() def report_raw_data(self, object_id: ObjectId, set_id: int, hk_data: bytes): data_dict = {} data_dict["objectId"] = object_id.as_string data_dict["setId"] = set_id data_dict["rawData"] = base64.b64encode(hk_data).decode() self.send_dictionary_over_socket(data_dict, True) def send_dictionary_over_socket(self, dictionary, raw=False): # keep listeners current if self.raw_connection == None: # no running connection, see if a client wants to connect try: (self.raw_connection, _) = self.raw_server_socket.accept() self.raw_connection.setblocking(False) except: # no client waiting pass if self.parsed_connection == None: # no running connection, see if a client wants to connect try: (self.parsed_connection, _) = self.parsed_server_socket.accept() self.parsed_connection.setblocking(False) except: # no client waiting pass socket_to_use = self.parsed_connection if raw: socket_to_use = self.raw_connection if socket_to_use == None: return data_json_bytes = json.dumps(dictionary).encode() # dle encode the bytes # Taking a shortcut as json is inherently # not binary (we also encoded it as utf-8), so there # can not be any 0x02 or 0x03 be in there data_json_bytes = b'\x02' + data_json_bytes + b'\n' + b'\x03' try: sent_length = socket_to_use.send(data_json_bytes) except: socket_to_use = None return if sent_length == 0: socket_to_use.close() socket_to_use = None def handle_hk_packet( self, raw_tm: bytes, obj_id_dict: ObjectIdDictT, printer: FsfwTmTcPrinter, ): tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False) named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes) if named_obj_id is None: named_obj_id = tm_packet.object_id if tm_packet.subservice == 25 or tm_packet.subservice == 26: hk_data = tm_packet.tm_data[8:] self.report_raw_data(object_id=named_obj_id, set_id=tm_packet.set_id, hk_data=hk_data) printer.generic_hk_tm_print( content_type=HkContentType.HK, object_id=named_obj_id, set_id=tm_packet.set_id, hk_data=hk_data, ) self.handle_regular_hk_print( printer=printer, object_id=named_obj_id, hk_packet=tm_packet, hk_data=hk_data, ) if tm_packet.subservice == 10 or tm_packet.subservice == 12: LOGGER.warning("HK definitions printout not implemented yet") def handle_regular_hk_print( self, printer: FsfwTmTcPrinter, object_id: ObjectId, hk_packet: Service3Base, hk_data: bytes, ): objb = object_id.as_bytes set_id = hk_packet.set_id """This function is called when a Service 3 Housekeeping packet is received.""" if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: handle_rw_hk_data(printer, object_id, set_id, hk_data) if objb == obj_ids.SYRLINKS_HANDLER_ID: if set_id == SetIds.RX_REGISTERS_DATASET: return self.handle_syrlinks_rx_registers_dataset(printer, hk_data) elif set_id == SetIds.TX_REGISTERS_DATASET: return self.handle_syrlinks_tx_registers_dataset(printer, hk_data) else: LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id") if objb == obj_ids.IMTQ_HANDLER_ID: if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and ( set_id <= ImtqSetIds.NEGATIVE_Z_TEST ): return self.handle_self_test_data(printer, hk_data) else: LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id") if objb == obj_ids.GPS_HANDLER_0_ID or object_id == obj_ids.GPS_HANDLER_1_ID: self.handle_gps_data(printer=printer, hk_data=hk_data) if objb == obj_ids.BPX_HANDLER_ID: self.handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer) if objb == obj_ids.CORE_CONTROLLER_ID: return self.handle_core_hk_data(printer=printer, hk_data=hk_data) if objb == obj_ids.PDU_1_HANDLER_ID: return self.handle_pdu_data( printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data ) if objb == obj_ids.PDU_2_HANDLER_ID: return self.handle_pdu_data( printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data ) if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: return handle_rw_hk_data( printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data ) if objb == obj_ids.P60_DOCK_HANDLER: self.handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data) if objb == obj_ids.PL_PCDU_ID: log_to_both(printer, "Received PL PCDU HK data") if objb == obj_ids.THERMAL_CONTROLLER_ID: self.handle_thermal_controller_hk_data(printer=printer, set_id=set_id, hk_data=hk_data) else: LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.") return HkReplyUnpacked() def handle_syrlinks_rx_registers_dataset(self, printer: FsfwTmTcPrinter, hk_data: bytes): reply = HkReplyUnpacked() header_list = [ "RX Status", "RX Sensitivity", "RX Frequency Shift", "RX IQ Power", "RX AGC Value", "RX Demod Eb", "RX Demod N0", "RX Datarate", ] rx_status = hk_data[0] rx_sensitivity = struct.unpack("!I", hk_data[1:5]) rx_frequency_shift = struct.unpack("!I", hk_data[5:9]) rx_iq_power = struct.unpack("!H", hk_data[9:11]) rx_agc_value = struct.unpack("!H", hk_data[11:13]) rx_demod_eb = struct.unpack("!I", hk_data[13:17]) rx_demod_n0 = struct.unpack("!I", hk_data[17:21]) rx_data_rate = hk_data[21] content_list = [ rx_status, rx_sensitivity, rx_frequency_shift, rx_iq_power, rx_agc_value, rx_demod_eb, rx_demod_n0, rx_data_rate, ] validity_buffer = hk_data[22:] log_to_both(printer, str(header_list)) log_to_both(printer, str(content_list)) printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8) def handle_syrlinks_tx_registers_dataset( self, printer: FsfwTmTcPrinter, hk_data: bytes, ): reply = HkReplyUnpacked() header_list = ["TX Status", "TX Waveform", "TX AGC value"] tx_status = hk_data[0] tx_waveform = hk_data[1] tx_agc_value = struct.unpack("!H", hk_data[2:4]) content_list = [tx_status, tx_waveform, tx_agc_value] validity_buffer = hk_data[4:] log_to_both(printer, str(header_list)) log_to_both(printer, str(content_list)) printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3) def handle_self_test_data(self, printer: FsfwTmTcPrinter, hk_data: bytes): header_list = [ "Init Err", "Init Raw Mag X [nT]", "Init Raw Mag Y [nT]", "Init Raw Mag Z [nT]", "Init Cal Mag X [nT]", "Init Cal Mag Y [nT]", "Init Cal Mag Z [nT]", "Init Coil X Current [mA]", "Init Coil Y Current [mA]", "Init Coil Z Current [mA]", "Init Coil X Temperature [°C]", "Init Coil Y Temperature [°C]", "Init Coil Z Temperature [°C]", "Err", "Raw Mag X [nT]", "Raw Mag Y [nT]", "Raw Mag Z [nT]", "Cal Mag X [nT]", "Cal Mag Y [nT]", "Cal Mag Z [nT]", "Coil X Current [mA]", "Coil Y Current [mA]", "Coil Z Current [mA]", "Coil X Temperature [°C]", "Coil Y Temperature [°C]", "Coil Z Temperature [°C]", "Fina Err", "Fina Raw Mag X [nT]", "Fina Raw Mag Y [nT]", "Fina Raw Mag Z [nT]", "Fina Cal Mag X [nT]", "Fina Cal Mag Y [nT]", "Fina Cal Mag Z [nT]", "Fina Coil X Current [mA]", "Fina Coil Y Current [mA]", "Fina Coil Z Current [mA]", "Fina Coil X Temperature [°C]", "Fina Coil Y Temperature [°C]", "Fina Coil Z Temperature [°C]", ] # INIT step (no coil actuation) init_err = hk_data[0] init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0] init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0] init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0] init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0] init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0] init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0] init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0] init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0] init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0] init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0] init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0] init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0] # Actuation step err = hk_data[43] raw_mag_x = struct.unpack("!f", hk_data[44:48])[0] raw_mag_y = struct.unpack("!f", hk_data[48:52])[0] raw_mag_z = struct.unpack("!f", hk_data[52:56])[0] cal_mag_x = struct.unpack("!f", hk_data[56:60])[0] cal_mag_y = struct.unpack("!f", hk_data[60:64])[0] cal_mag_z = struct.unpack("!f", hk_data[64:68])[0] coil_x_current = struct.unpack("!f", hk_data[68:72])[0] coil_y_current = struct.unpack("!f", hk_data[72:76])[0] coil_z_current = struct.unpack("!f", hk_data[76:80])[0] coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0] coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0] coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0] # FINA step (no coil actuation) fina_err = hk_data[86] fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0] fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0] fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0] fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0] fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0] fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0] fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0] fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0] fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0] fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0] fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0] fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0] validity_buffer = hk_data[129:] content_list = [ init_err, init_raw_mag_x, init_raw_mag_y, init_raw_mag_z, init_cal_mag_x, init_cal_mag_y, init_cal_mag_z, init_coil_x_current, init_coil_y_current, init_coil_z_current, init_coil_x_temperature, init_coil_y_temperature, init_coil_z_temperature, err, raw_mag_x, raw_mag_y, raw_mag_z, cal_mag_x, cal_mag_y, cal_mag_z, coil_x_current, coil_y_current, coil_z_current, coil_x_temperature, coil_y_temperature, coil_z_temperature, fina_err, fina_raw_mag_x, fina_raw_mag_y, fina_raw_mag_z, fina_cal_mag_x, fina_cal_mag_y, fina_cal_mag_z, fina_coil_x_current, fina_coil_y_current, fina_coil_z_current, fina_coil_x_temperature, fina_coil_y_temperature, fina_coil_z_temperature, ] num_of_vars = len(header_list) log_to_both(printer, str(header_list)) log_to_both(printer, str(content_list)) printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) def handle_thermal_controller_hk_data(self, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): if set_id == 0: LOGGER.info("Received Sensor Temperature data") # get all the floats tm_data = struct.unpack("!ffffffffffffffff", hk_data[:16 * 4]) parsed_data = {} # put them into a nice dictionary parsed_data["SID"] = set_id parsed_data["content"] = {} parsed_data["content"]["SENSOR_PLOC_HEATSPREADER"] = tm_data[0] parsed_data["content"]["SENSOR_PLOC_MISSIONBOARD"] = tm_data[1] parsed_data["content"]["SENSOR_4K_CAMERA"] = tm_data[2] parsed_data["content"]["SENSOR_DAC_HEATSPREADER"] = tm_data[3] parsed_data["content"]["SENSOR_STARTRACKER"] = tm_data[4] parsed_data["content"]["SENSOR_RW1"] = tm_data[5] parsed_data["content"]["SENSOR_DRO"] = tm_data[6] parsed_data["content"]["SENSOR_SCEX"] = tm_data[7] parsed_data["content"]["SENSOR_X8"] = tm_data[8] parsed_data["content"]["SENSOR_HPA"] = tm_data[9] parsed_data["content"]["SENSOR_TX_MODUL"] = tm_data[10] parsed_data["content"]["SENSOR_MPA"] = tm_data[11] parsed_data["content"]["SENSOR_ACU"] = tm_data[12] parsed_data["content"]["SENSOR_PLPCDU_HEATSPREADER"] = tm_data[13] parsed_data["content"]["SENSOR_TCS_BOARD"] = tm_data[14] parsed_data["content"]["SENSOR_MAGNETTORQUER"] = tm_data[15] self.send_dictionary_over_socket(parsed_data) def handle_gps_data(self, printer: FsfwTmTcPrinter, hk_data: bytes): LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}") reply = HkReplyUnpacked() var_index = 0 header_list = [ "Latitude", "Longitude", "Altitude", "Fix Mode", "Sats in Use", "Date", "Unix Seconds", ] latitude = struct.unpack("!d", hk_data[0:8])[0] longitude = struct.unpack("!d", hk_data[8:16])[0] altitude = struct.unpack("!d", hk_data[16:24])[0] fix_mode = hk_data[24] sat_in_use = hk_data[25] year = struct.unpack("!H", hk_data[26:28])[0] month = hk_data[28] day = hk_data[29] hours = hk_data[30] minutes = hk_data[31] seconds = hk_data[32] date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}" unix_seconds = struct.unpack("!I", hk_data[33:37])[0] content_list = [ latitude, longitude, altitude, fix_mode, sat_in_use, date_string, unix_seconds, ] var_index += 13 reply.num_of_vars = var_index if not os.path.isfile("gps_log.txt"): with open("gps_log.txt", "w") as gps_file: gps_file.write( "Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, " "Date, Unix Seconds\n" ) with open("gps_log.txt", "a") as gps_file: gps_file.write( f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, " f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n" ) validity_buffer = hk_data[37:39] log_to_both(printer, str(header_list)) log_to_both(printer, str(content_list)) printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) def handle_bpx_hk_data(self, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): if set_id == BpxSetIds.GET_HK_SET: fmt_str = "!HHHHhhhhIB" inc_len = struct.calcsize(fmt_str) ( charge_current, discharge_current, heater_current, batt_voltage, batt_temp_1, batt_temp_2, batt_temp_3, batt_temp_4, reboot_cntr, boot_cause, ) = struct.unpack(fmt_str, hk_data[0:inc_len]) header_list = [ "Charge Current", "Discharge Current", "Heater Current", "Battery Voltage", "Batt Temp 1", "Batt Temp 2", "Batt Temp 3", "Batt Temp 4", "Reboot Counter", "Boot Cause", ] content_list = [ charge_current, discharge_current, heater_current, batt_voltage, batt_temp_1, batt_temp_2, batt_temp_3, batt_temp_4, reboot_cntr, boot_cause, ] validity_buffer = hk_data[inc_len:] log_to_both(printer, str(header_list)) log_to_both(printer, str(content_list)) printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) elif set_id == BpxSetIds.GET_CFG_SET: battheat_mode = hk_data[0] battheat_low = struct.unpack("!b", hk_data[1:2])[0] battheat_high = struct.unpack("!b", hk_data[2:3])[0] header_list = [ "Battery Heater Mode", "Battery Heater Low Limit", "Battery Heater High Limit", ] content_list = [battheat_mode, battheat_low, battheat_high] validity_buffer = hk_data[3:] log_to_both(printer, str(header_list)) log_to_both(printer, str(content_list)) printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) def handle_core_hk_data(self, printer: FsfwTmTcPrinter, hk_data: bytes): fmt_str = "!fffH" inc_len = struct.calcsize(fmt_str) (temperature, ps_voltage, pl_voltage, tx_agc_value) = struct.unpack( fmt_str, hk_data[0: 0 + inc_len] ) printout = ( f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | " f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}" ) log_to_both(printer, printout) printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4) P60_INDEX_LIST = [ "ACU VCC", "PDU1 VCC", "X3 IDLE VCC", "PDU2 VCC", "ACU VBAT", "PDU1 VBAT", "X3 IDLE VBAT", "PDU2 VBAT", "STACK VBAT", "STACK 3V3", "STACK 5V", "GS3V3", "GS5V", ] WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"] PDU1_CHANNELS_NAMES = [ "TCS Board", "Syrlinks", "Startracker", "MGT", "SUS Nominal", "SCEX", "PLOC", "ACS A Side", "Unused Channel 8", ] PDU2_CHANNELS_NAMES = [ "Q7S", "Payload PCDU CH1", "RW", "TCS Heater In", "SUS Redundant", "Deployment Mechanism", "Payload PCDU CH6", "ACS B Side", "Payload Camera", ] PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES] class WdtInfo: def __init__(self): self.wdt_reboots_list = [] self.time_pings_left_list = [] def print(self, printer: FsfwTmTcPrinter): wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)" log_to_both(printer, wdt_info) for idx in range(len(self.wdt_reboots_list)): log_to_both( printer, f"{TmHandler.WDT_LIST[idx].ljust(5)} | " f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}", ) def parse(self, wdt_data: bytes, current_idx: int) -> int: priv_idx = 0 self.wdt_reboots_list = [] self.time_pings_left_list = [] for idx in range(5): self.wdt_reboots_list.append( struct.unpack("!I", wdt_data[priv_idx: priv_idx + 4])[0] ) priv_idx += 4 current_idx += 4 for idx in range(3): self.time_pings_left_list.append( struct.unpack("!I", wdt_data[priv_idx: priv_idx + 4])[0] ) priv_idx += 4 current_idx += 4 for idx in range(2): self.time_pings_left_list.append(wdt_data[priv_idx]) current_idx += 1 priv_idx += 1 return current_idx def handle_pdu_data( self, printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes ): current_idx = 0 priv_idx = pdu_idx - 1 if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX: fmt_str = "!hhBBBIIH" inc_len = struct.calcsize(fmt_str) ( vcc, vbat, conv_enb_0, conv_enb_1, conv_enb_2, boot_cause, uptime, reset_cause, ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) log_to_both(printer, f"VCC {vcc} mV | VBAT {vbat} mV") log_to_both( printer, f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]" ) log_to_both( printer, f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}", ) current_idx += inc_len latchup_list = [] log_to_both(printer, "Latchups") for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): latchup_list.append( struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] ) content_line = ( f"{TmHandler.PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}" ) log_to_both(printer, content_line) current_idx += 2 device_types = [] for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): device_types.append(hk_data[current_idx]) current_idx += 1 device_statuses = [] for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): device_statuses.append(hk_data[current_idx]) current_idx += 1 wdt = self.WdtInfo() current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) wdt.print(printer=printer) if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE: log_to_both(printer, f"Received PDU HK from PDU {pdu_idx}") current_list = [] for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): current_list.append( struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0] ) current_idx += 2 voltage_list = [] for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): voltage_list.append( struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] ) current_idx += 2 output_enb_list = [] for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): output_enb_list.append(hk_data[current_idx]) current_idx += 1 header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" print(header_str) printer.file_logger.info(header_str) for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): out_enb = f"{output_enb_list[idx]}".ljust(6) content_line = ( f"{TmHandler.PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | " f"{voltage_list[idx]:05} | {current_list[idx]:04}" ) log_to_both(printer, content_line) fmt_str = "!IBh" inc_len = struct.calcsize(fmt_str) (boot_count, batt_mode, temperature) = struct.unpack( fmt_str, hk_data[current_idx: current_idx + inc_len] ) info = ( f"Boot Count {boot_count} | Battery Mode {batt_mode} | " f"Temperature {temperature / 10.0}" ) log_to_both(printer, info) def handle_p60_hk_data(self, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): if set_id == SetIds.P60_CORE: log_to_both(printer, "Received P60 Core HK. Voltages in mV, currents in mA") current_idx = 0 current_list = [] for idx in range(13): current_list.append( struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0] ) current_idx += 2 voltage_list = [] for idx in range(13): voltage_list.append( struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] ) current_idx += 2 out_enb_list = [] for idx in range(13): out_enb_list.append(hk_data[current_idx]) current_idx += 1 header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" print(header_str) printer.file_logger.info(header_str) for idx in range(13): out_enb = f"{out_enb_list[idx]}".ljust(6) content_line = ( f"{TmHandler.P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | " f"{voltage_list[idx]:05} | {current_list[idx]:04}" ) log_to_both(printer, content_line) fmt_str = "!IBhHhh" inc_len = struct.calcsize(fmt_str) ( boot_count, batt_mode, batt_current, batt_voltage, temp_0, temp_1, ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) current_idx += inc_len batt_info = ( f"Batt: Mode {batt_mode} | Boot Count {boot_count} | " f"Charge current {batt_current} | Voltage {batt_voltage}" ) temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | " log_to_both(printer, temps) log_to_both(printer, batt_info) printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9) if set_id == SetIds.P60_AUX: log_to_both(printer, "Received P60 AUX HK. Voltages in mV, currents in mA") current_idx = 0 latchup_list = [] log_to_both(printer, "P60 Dock Latchups") for idx in range(0, 13): latchup_list.append( struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] ) content_line = f"{TmHandler.P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}" log_to_both(printer, content_line) current_idx += 2 fmt_str = "!IIHBBHHhhB" inc_len = struct.calcsize(fmt_str) ( boot_cause, uptime, reset_cause, heater_on, conv_5v_on, dock_vbat, dock_vcc_c, batt_temp_0, batt_temp_1, dearm_status, ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) current_idx += inc_len wdt = self.WdtInfo() current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) fmt_str = "!hhbb" inc_len = struct.calcsize(fmt_str) ( batt_charge_current, batt_discharge_current, ant6_depl, ar6_depl, ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) current_idx += inc_len device_types = [] device_statuses = [] for idx in range(8): device_types.append(hk_data[current_idx]) current_idx += 1 for idx in range(8): device_statuses.append(hk_data[current_idx]) current_idx += 1 util_info = ( f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}" ) util_info_2 = ( f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | " f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}" ) log_to_both(printer, util_info) log_to_both(printer, util_info_2) wdt.print(printer) misc_info = ( f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}" ) log_to_both(printer, misc_info) batt_info = ( f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | " f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}" ) log_to_both(printer, batt_info) printer.print_validity_buffer( validity_buffer=hk_data[current_idx:], num_vars=27 )