diff --git a/pus_tm/factory_hook.py b/pus_tm/factory_hook.py index d1829ca..5ca178b 100644 --- a/pus_tm/factory_hook.py +++ b/pus_tm/factory_hook.py @@ -19,7 +19,7 @@ from config.object_ids import get_object_ids from .event_handler import handle_event_packet from .verification_handler import handle_service_1_packet -from .hk_handling import handle_hk_packet +from .hk_handler import HkHandler from .action_reply_handler import handle_action_reply LOGGER = get_console_logger() @@ -33,6 +33,7 @@ def ccsds_tm_handler(apid: int, raw_tm_packet: bytes, _user_args: any) -> None: pus_factory_hook(raw_tm_packet=raw_tm_packet) + def pus_factory_hook(raw_tm_packet: bytes): if len(raw_tm_packet) < 8: LOGGER.warning("Detected packet shorter than 8 bytes!") @@ -45,7 +46,7 @@ def pus_factory_hook(raw_tm_packet: bytes): if service_type == 1: handle_service_1_packet(printer=FSFW_PRINTER, raw_tm=raw_tm_packet) elif service_type == 3: - handle_hk_packet( + HkHandler.getInstance().handle_hk_packet( printer=FSFW_PRINTER, raw_tm=raw_tm_packet, obj_id_dict=obj_id_dict ) elif service_type == 5: diff --git a/pus_tm/hk_handler.py b/pus_tm/hk_handler.py new file mode 100644 index 0000000..67f0d74 --- /dev/null +++ b/pus_tm/hk_handler.py @@ -0,0 +1,840 @@ +"""HK Handling for EIVE OBSW""" +import struct +import os +import datetime +import json +import socket +import base64 + +from tmtccmd.config.definitions import HkReplyUnpacked +from tmtccmd.tm.pus_3_fsfw_hk import ( + Service3Base, + HkContentType, + Service3FsfwTm, +) +from tmtccmd.logging import get_console_logger +from pus_tc.devs.bpx_batt import BpxSetIds +from pus_tc.devs.syrlinks_hk_handler import SetIds +from pus_tc.devs.p60dock import SetIds +from pus_tc.devs.imtq import ImtqSetIds +from tmtccmd.utility.obj_id import ObjectId, ObjectIdDictT +import config.object_ids as obj_ids + +from pus_tm.devs.reaction_wheels import handle_rw_hk_data +from pus_tm.defs import FsfwTmTcPrinter, log_to_both + + +# TODO add to configuration parameters +THERMAL_HOST = "127.0.0.1" +THERMAL_PORT = 7302 + + +LOGGER = get_console_logger() + + +class HkHandler(): + + _Instance = None + + def getInstance(): + if HkHandler._Instance == None: + HkHandler._Instance = HkHandler() + return HkHandler._Instance + + def __init__( + self): + + self.raw_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.parsed_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + self.raw_server_socket.bind(("", 3705)) + self.parsed_server_socket.bind(("", 3706)) + + # for now, only accept one connection + self.raw_server_socket.listen(0) + self.parsed_server_socket.listen(0) + + self.raw_server_socket.setblocking(False) + self.parsed_server_socket.setblocking(False) + + self.raw_connection = None + self.parsed_connection = None + + def __del__(self): + try: + self.close() + except: + LOGGER.warning("Could not close sockets!") + + def close(self): + self.raw_server_socket.close + self.parsed_server_socket.close + if self.raw_connection != None: + self.raw_connection.close() + + def report_raw_data(self, object_id: ObjectId, + set_id: int, + hk_data: bytes): + + data_dict = {} + data_dict["objectId"] = object_id.as_string + data_dict["setId"] = set_id + data_dict["rawData"] = base64.b64encode(hk_data).decode() + + self.send_dictionary_over_socket(data_dict, True) + + def send_dictionary_over_socket(self, dictionary, raw=False): + # keep listeners current + if self.raw_connection == None: + # no running connection, see if a client wants to connect + try: + (self.raw_connection, _) = self.raw_server_socket.accept() + self.raw_connection.setblocking(False) + except: + # no client waiting + pass + + if self.parsed_connection == None: + # no running connection, see if a client wants to connect + try: + (self.parsed_connection, _) = self.parsed_server_socket.accept() + self.parsed_connection.setblocking(False) + except: + # no client waiting + pass + + socket_to_use = self.parsed_connection + if raw: + socket_to_use = self.raw_connection + + if socket_to_use == None: + return + + data_json_bytes = json.dumps(dictionary).encode() + + # dle encode the bytes + # Taking a shortcut as json is inherently + # not binary (we also encoded it as utf-8), so there + # can not be any 0x02 or 0x03 be in there + data_json_bytes = b'\x02' + data_json_bytes + b'\n' + b'\x03' + + try: + sent_length = socket_to_use.send(data_json_bytes) + except: + socket_to_use = None + return + if sent_length == 0: + socket_to_use.close() + socket_to_use = None + + def handle_hk_packet( + self, + raw_tm: bytes, + obj_id_dict: ObjectIdDictT, + printer: FsfwTmTcPrinter, + ): + tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False) + named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes) + if named_obj_id is None: + named_obj_id = tm_packet.object_id + if tm_packet.subservice == 25 or tm_packet.subservice == 26: + hk_data = tm_packet.tm_data[8:] + self.report_raw_data(object_id=named_obj_id, + set_id=tm_packet.set_id, + hk_data=hk_data) + printer.generic_hk_tm_print( + content_type=HkContentType.HK, + object_id=named_obj_id, + set_id=tm_packet.set_id, + hk_data=hk_data, + ) + self.handle_regular_hk_print( + printer=printer, + object_id=named_obj_id, + hk_packet=tm_packet, + hk_data=hk_data, + ) + if tm_packet.subservice == 10 or tm_packet.subservice == 12: + LOGGER.warning("HK definitions printout not implemented yet") + + def handle_regular_hk_print( + self, + printer: FsfwTmTcPrinter, + object_id: ObjectId, + hk_packet: Service3Base, + hk_data: bytes, + ): + objb = object_id.as_bytes + set_id = hk_packet.set_id + """This function is called when a Service 3 Housekeeping packet is received.""" + if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: + handle_rw_hk_data(printer, object_id, set_id, hk_data) + if objb == obj_ids.SYRLINKS_HANDLER_ID: + if set_id == SetIds.RX_REGISTERS_DATASET: + return self.handle_syrlinks_rx_registers_dataset(printer, hk_data) + elif set_id == SetIds.TX_REGISTERS_DATASET: + return self.handle_syrlinks_tx_registers_dataset(printer, hk_data) + else: + LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id") + if objb == obj_ids.IMTQ_HANDLER_ID: + if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and ( + set_id <= ImtqSetIds.NEGATIVE_Z_TEST + ): + return self.handle_self_test_data(printer, hk_data) + else: + LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id") + if objb == obj_ids.GPS_HANDLER_0_ID or object_id == obj_ids.GPS_HANDLER_1_ID: + self.handle_gps_data(printer=printer, hk_data=hk_data) + if objb == obj_ids.BPX_HANDLER_ID: + self.handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer) + if objb == obj_ids.CORE_CONTROLLER_ID: + return self.handle_core_hk_data(printer=printer, hk_data=hk_data) + if objb == obj_ids.PDU_1_HANDLER_ID: + return self.handle_pdu_data( + printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data + ) + if objb == obj_ids.PDU_2_HANDLER_ID: + return self.handle_pdu_data( + printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data + ) + if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: + return handle_rw_hk_data( + printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data + ) + if objb == obj_ids.P60_DOCK_HANDLER: + self.handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data) + if objb == obj_ids.PL_PCDU_ID: + log_to_both(printer, "Received PL PCDU HK data") + if objb == obj_ids.THERMAL_CONTROLLER_ID: + self.handle_thermal_controller_hk_data(printer=printer, set_id=set_id, hk_data=hk_data) + else: + LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.") + return HkReplyUnpacked() + + def handle_syrlinks_rx_registers_dataset(self, printer: FsfwTmTcPrinter, hk_data: bytes): + reply = HkReplyUnpacked() + header_list = [ + "RX Status", + "RX Sensitivity", + "RX Frequency Shift", + "RX IQ Power", + "RX AGC Value", + "RX Demod Eb", + "RX Demod N0", + "RX Datarate", + ] + rx_status = hk_data[0] + rx_sensitivity = struct.unpack("!I", hk_data[1:5]) + rx_frequency_shift = struct.unpack("!I", hk_data[5:9]) + rx_iq_power = struct.unpack("!H", hk_data[9:11]) + rx_agc_value = struct.unpack("!H", hk_data[11:13]) + rx_demod_eb = struct.unpack("!I", hk_data[13:17]) + rx_demod_n0 = struct.unpack("!I", hk_data[17:21]) + rx_data_rate = hk_data[21] + content_list = [ + rx_status, + rx_sensitivity, + rx_frequency_shift, + rx_iq_power, + rx_agc_value, + rx_demod_eb, + rx_demod_n0, + rx_data_rate, + ] + validity_buffer = hk_data[22:] + log_to_both(printer, str(header_list)) + log_to_both(printer, str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8) + + def handle_syrlinks_tx_registers_dataset( + self, + printer: FsfwTmTcPrinter, + hk_data: bytes, + ): + reply = HkReplyUnpacked() + header_list = ["TX Status", "TX Waveform", "TX AGC value"] + tx_status = hk_data[0] + tx_waveform = hk_data[1] + tx_agc_value = struct.unpack("!H", hk_data[2:4]) + content_list = [tx_status, tx_waveform, tx_agc_value] + validity_buffer = hk_data[4:] + log_to_both(printer, str(header_list)) + log_to_both(printer, str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3) + + def handle_self_test_data(self, printer: FsfwTmTcPrinter, hk_data: bytes): + header_list = [ + "Init Err", + "Init Raw Mag X [nT]", + "Init Raw Mag Y [nT]", + "Init Raw Mag Z [nT]", + "Init Cal Mag X [nT]", + "Init Cal Mag Y [nT]", + "Init Cal Mag Z [nT]", + "Init Coil X Current [mA]", + "Init Coil Y Current [mA]", + "Init Coil Z Current [mA]", + "Init Coil X Temperature [°C]", + "Init Coil Y Temperature [°C]", + "Init Coil Z Temperature [°C]", + "Err", + "Raw Mag X [nT]", + "Raw Mag Y [nT]", + "Raw Mag Z [nT]", + "Cal Mag X [nT]", + "Cal Mag Y [nT]", + "Cal Mag Z [nT]", + "Coil X Current [mA]", + "Coil Y Current [mA]", + "Coil Z Current [mA]", + "Coil X Temperature [°C]", + "Coil Y Temperature [°C]", + "Coil Z Temperature [°C]", + "Fina Err", + "Fina Raw Mag X [nT]", + "Fina Raw Mag Y [nT]", + "Fina Raw Mag Z [nT]", + "Fina Cal Mag X [nT]", + "Fina Cal Mag Y [nT]", + "Fina Cal Mag Z [nT]", + "Fina Coil X Current [mA]", + "Fina Coil Y Current [mA]", + "Fina Coil Z Current [mA]", + "Fina Coil X Temperature [°C]", + "Fina Coil Y Temperature [°C]", + "Fina Coil Z Temperature [°C]", + ] + # INIT step (no coil actuation) + init_err = hk_data[0] + init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0] + init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0] + init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0] + init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0] + init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0] + init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0] + init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0] + init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0] + init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0] + init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0] + init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0] + init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0] + + # Actuation step + err = hk_data[43] + raw_mag_x = struct.unpack("!f", hk_data[44:48])[0] + raw_mag_y = struct.unpack("!f", hk_data[48:52])[0] + raw_mag_z = struct.unpack("!f", hk_data[52:56])[0] + cal_mag_x = struct.unpack("!f", hk_data[56:60])[0] + cal_mag_y = struct.unpack("!f", hk_data[60:64])[0] + cal_mag_z = struct.unpack("!f", hk_data[64:68])[0] + coil_x_current = struct.unpack("!f", hk_data[68:72])[0] + coil_y_current = struct.unpack("!f", hk_data[72:76])[0] + coil_z_current = struct.unpack("!f", hk_data[76:80])[0] + coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0] + coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0] + coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0] + + # FINA step (no coil actuation) + fina_err = hk_data[86] + fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0] + fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0] + fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0] + fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0] + fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0] + fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0] + fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0] + fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0] + fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0] + fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0] + fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0] + fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0] + + validity_buffer = hk_data[129:] + content_list = [ + init_err, + init_raw_mag_x, + init_raw_mag_y, + init_raw_mag_z, + init_cal_mag_x, + init_cal_mag_y, + init_cal_mag_z, + init_coil_x_current, + init_coil_y_current, + init_coil_z_current, + init_coil_x_temperature, + init_coil_y_temperature, + init_coil_z_temperature, + err, + raw_mag_x, + raw_mag_y, + raw_mag_z, + cal_mag_x, + cal_mag_y, + cal_mag_z, + coil_x_current, + coil_y_current, + coil_z_current, + coil_x_temperature, + coil_y_temperature, + coil_z_temperature, + fina_err, + fina_raw_mag_x, + fina_raw_mag_y, + fina_raw_mag_z, + fina_cal_mag_x, + fina_cal_mag_y, + fina_cal_mag_z, + fina_coil_x_current, + fina_coil_y_current, + fina_coil_z_current, + fina_coil_x_temperature, + fina_coil_y_temperature, + fina_coil_z_temperature, + ] + num_of_vars = len(header_list) + log_to_both(printer, str(header_list)) + log_to_both(printer, str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) + + def handle_thermal_controller_hk_data(self, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): + if set_id == 0: + LOGGER.info("Received Sensor Temperature data") + + # get all the floats + tm_data = struct.unpack("!ffffffffffffffff", hk_data[:16 * 4]) + parsed_data = {} + + # put them into a nice dictionary + parsed_data["SID"] = set_id + parsed_data["content"] = {} + parsed_data["content"]["SENSOR_PLOC_HEATSPREADER"] = tm_data[0] + parsed_data["content"]["SENSOR_PLOC_MISSIONBOARD"] = tm_data[1] + parsed_data["content"]["SENSOR_4K_CAMERA"] = tm_data[2] + parsed_data["content"]["SENSOR_DAC_HEATSPREADER"] = tm_data[3] + parsed_data["content"]["SENSOR_STARTRACKER"] = tm_data[4] + parsed_data["content"]["SENSOR_RW1"] = tm_data[5] + parsed_data["content"]["SENSOR_DRO"] = tm_data[6] + parsed_data["content"]["SENSOR_SCEX"] = tm_data[7] + parsed_data["content"]["SENSOR_X8"] = tm_data[8] + parsed_data["content"]["SENSOR_HPA"] = tm_data[9] + parsed_data["content"]["SENSOR_TX_MODUL"] = tm_data[10] + parsed_data["content"]["SENSOR_MPA"] = tm_data[11] + parsed_data["content"]["SENSOR_ACU"] = tm_data[12] + parsed_data["content"]["SENSOR_PLPCDU_HEATSPREADER"] = tm_data[13] + parsed_data["content"]["SENSOR_TCS_BOARD"] = tm_data[14] + parsed_data["content"]["SENSOR_MAGNETTORQUER"] = tm_data[15] + + self.send_dictionary_over_socket(parsed_data) + + def handle_gps_data(self, printer: FsfwTmTcPrinter, hk_data: bytes): + LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}") + reply = HkReplyUnpacked() + var_index = 0 + header_list = [ + "Latitude", + "Longitude", + "Altitude", + "Fix Mode", + "Sats in Use", + "Date", + "Unix Seconds", + ] + latitude = struct.unpack("!d", hk_data[0:8])[0] + longitude = struct.unpack("!d", hk_data[8:16])[0] + altitude = struct.unpack("!d", hk_data[16:24])[0] + fix_mode = hk_data[24] + sat_in_use = hk_data[25] + year = struct.unpack("!H", hk_data[26:28])[0] + month = hk_data[28] + day = hk_data[29] + hours = hk_data[30] + minutes = hk_data[31] + seconds = hk_data[32] + date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}" + unix_seconds = struct.unpack("!I", hk_data[33:37])[0] + content_list = [ + latitude, + longitude, + altitude, + fix_mode, + sat_in_use, + date_string, + unix_seconds, + ] + var_index += 13 + reply.num_of_vars = var_index + if not os.path.isfile("gps_log.txt"): + with open("gps_log.txt", "w") as gps_file: + gps_file.write( + "Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, " + "Date, Unix Seconds\n" + ) + with open("gps_log.txt", "a") as gps_file: + gps_file.write( + f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, " + f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n" + ) + validity_buffer = hk_data[37:39] + log_to_both(printer, str(header_list)) + log_to_both(printer, str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) + + def handle_bpx_hk_data(self, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): + if set_id == BpxSetIds.GET_HK_SET: + fmt_str = "!HHHHhhhhIB" + inc_len = struct.calcsize(fmt_str) + ( + charge_current, + discharge_current, + heater_current, + batt_voltage, + batt_temp_1, + batt_temp_2, + batt_temp_3, + batt_temp_4, + reboot_cntr, + boot_cause, + ) = struct.unpack(fmt_str, hk_data[0:inc_len]) + header_list = [ + "Charge Current", + "Discharge Current", + "Heater Current", + "Battery Voltage", + "Batt Temp 1", + "Batt Temp 2", + "Batt Temp 3", + "Batt Temp 4", + "Reboot Counter", + "Boot Cause", + ] + content_list = [ + charge_current, + discharge_current, + heater_current, + batt_voltage, + batt_temp_1, + batt_temp_2, + batt_temp_3, + batt_temp_4, + reboot_cntr, + boot_cause, + ] + validity_buffer = hk_data[inc_len:] + log_to_both(printer, str(header_list)) + log_to_both(printer, str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) + elif set_id == BpxSetIds.GET_CFG_SET: + battheat_mode = hk_data[0] + battheat_low = struct.unpack("!b", hk_data[1:2])[0] + battheat_high = struct.unpack("!b", hk_data[2:3])[0] + header_list = [ + "Battery Heater Mode", + "Battery Heater Low Limit", + "Battery Heater High Limit", + ] + content_list = [battheat_mode, battheat_low, battheat_high] + validity_buffer = hk_data[3:] + log_to_both(printer, str(header_list)) + log_to_both(printer, str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) + + def handle_core_hk_data(self, printer: FsfwTmTcPrinter, hk_data: bytes): + + fmt_str = "!fffH" + inc_len = struct.calcsize(fmt_str) + (temperature, ps_voltage, pl_voltage, tx_agc_value) = struct.unpack( + fmt_str, hk_data[0: 0 + inc_len] + ) + printout = ( + f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | " + f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}" + ) + log_to_both(printer, printout) + printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4) + + P60_INDEX_LIST = [ + "ACU VCC", + "PDU1 VCC", + "X3 IDLE VCC", + "PDU2 VCC", + "ACU VBAT", + "PDU1 VBAT", + "X3 IDLE VBAT", + "PDU2 VBAT", + "STACK VBAT", + "STACK 3V3", + "STACK 5V", + "GS3V3", + "GS5V", + ] + + WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"] + + PDU1_CHANNELS_NAMES = [ + "TCS Board", + "Syrlinks", + "Startracker", + "MGT", + "SUS Nominal", + "SCEX", + "PLOC", + "ACS A Side", + "Unused Channel 8", + ] + + PDU2_CHANNELS_NAMES = [ + "Q7S", + "Payload PCDU CH1", + "RW", + "TCS Heater In", + "SUS Redundant", + "Deployment Mechanism", + "Payload PCDU CH6", + "ACS B Side", + "Payload Camera", + ] + + PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES] + + class WdtInfo: + def __init__(self): + self.wdt_reboots_list = [] + self.time_pings_left_list = [] + + def print(self, printer: FsfwTmTcPrinter): + wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)" + log_to_both(printer, wdt_info) + for idx in range(len(self.wdt_reboots_list)): + log_to_both( + printer, + f"{TmHandler.WDT_LIST[idx].ljust(5)} | " + f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}", + ) + + def parse(self, wdt_data: bytes, current_idx: int) -> int: + priv_idx = 0 + self.wdt_reboots_list = [] + self.time_pings_left_list = [] + for idx in range(5): + self.wdt_reboots_list.append( + struct.unpack("!I", wdt_data[priv_idx: priv_idx + 4])[0] + ) + priv_idx += 4 + current_idx += 4 + for idx in range(3): + self.time_pings_left_list.append( + struct.unpack("!I", wdt_data[priv_idx: priv_idx + 4])[0] + ) + priv_idx += 4 + current_idx += 4 + for idx in range(2): + self.time_pings_left_list.append(wdt_data[priv_idx]) + current_idx += 1 + priv_idx += 1 + return current_idx + + def handle_pdu_data( + self, printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes + ): + current_idx = 0 + priv_idx = pdu_idx - 1 + if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX: + fmt_str = "!hhBBBIIH" + inc_len = struct.calcsize(fmt_str) + ( + vcc, + vbat, + conv_enb_0, + conv_enb_1, + conv_enb_2, + boot_cause, + uptime, + reset_cause, + ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) + log_to_both(printer, f"VCC {vcc} mV | VBAT {vbat} mV") + log_to_both( + printer, f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]" + ) + log_to_both( + printer, + f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}", + ) + current_idx += inc_len + latchup_list = [] + log_to_both(printer, "Latchups") + for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): + latchup_list.append( + struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] + ) + content_line = ( + f"{TmHandler.PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}" + ) + log_to_both(printer, content_line) + current_idx += 2 + device_types = [] + for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): + device_types.append(hk_data[current_idx]) + current_idx += 1 + device_statuses = [] + for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): + device_statuses.append(hk_data[current_idx]) + current_idx += 1 + wdt = self.WdtInfo() + current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) + wdt.print(printer=printer) + if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE: + log_to_both(printer, f"Received PDU HK from PDU {pdu_idx}") + current_list = [] + for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): + current_list.append( + struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0] + ) + current_idx += 2 + voltage_list = [] + for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): + voltage_list.append( + struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] + ) + current_idx += 2 + output_enb_list = [] + for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): + output_enb_list.append(hk_data[current_idx]) + current_idx += 1 + header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" + print(header_str) + printer.file_logger.info(header_str) + for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): + out_enb = f"{output_enb_list[idx]}".ljust(6) + content_line = ( + f"{TmHandler.PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | " + f"{voltage_list[idx]:05} | {current_list[idx]:04}" + ) + log_to_both(printer, content_line) + fmt_str = "!IBh" + inc_len = struct.calcsize(fmt_str) + (boot_count, batt_mode, temperature) = struct.unpack( + fmt_str, hk_data[current_idx: current_idx + inc_len] + ) + info = ( + f"Boot Count {boot_count} | Battery Mode {batt_mode} | " + f"Temperature {temperature / 10.0}" + ) + log_to_both(printer, info) + + def handle_p60_hk_data(self, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): + if set_id == SetIds.P60_CORE: + log_to_both(printer, "Received P60 Core HK. Voltages in mV, currents in mA") + current_idx = 0 + current_list = [] + for idx in range(13): + current_list.append( + struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0] + ) + current_idx += 2 + voltage_list = [] + for idx in range(13): + voltage_list.append( + struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] + ) + current_idx += 2 + out_enb_list = [] + for idx in range(13): + out_enb_list.append(hk_data[current_idx]) + current_idx += 1 + header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" + print(header_str) + printer.file_logger.info(header_str) + for idx in range(13): + out_enb = f"{out_enb_list[idx]}".ljust(6) + content_line = ( + f"{TmHandler.P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | " + f"{voltage_list[idx]:05} | {current_list[idx]:04}" + ) + log_to_both(printer, content_line) + fmt_str = "!IBhHhh" + inc_len = struct.calcsize(fmt_str) + ( + boot_count, + batt_mode, + batt_current, + batt_voltage, + temp_0, + temp_1, + ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) + current_idx += inc_len + batt_info = ( + f"Batt: Mode {batt_mode} | Boot Count {boot_count} | " + f"Charge current {batt_current} | Voltage {batt_voltage}" + ) + temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | " + log_to_both(printer, temps) + log_to_both(printer, batt_info) + printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9) + if set_id == SetIds.P60_AUX: + log_to_both(printer, "Received P60 AUX HK. Voltages in mV, currents in mA") + current_idx = 0 + latchup_list = [] + log_to_both(printer, "P60 Dock Latchups") + for idx in range(0, 13): + latchup_list.append( + struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] + ) + content_line = f"{TmHandler.P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}" + log_to_both(printer, content_line) + current_idx += 2 + fmt_str = "!IIHBBHHhhB" + inc_len = struct.calcsize(fmt_str) + ( + boot_cause, + uptime, + reset_cause, + heater_on, + conv_5v_on, + dock_vbat, + dock_vcc_c, + batt_temp_0, + batt_temp_1, + dearm_status, + ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) + current_idx += inc_len + wdt = self.WdtInfo() + current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) + fmt_str = "!hhbb" + inc_len = struct.calcsize(fmt_str) + ( + batt_charge_current, + batt_discharge_current, + ant6_depl, + ar6_depl, + ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) + current_idx += inc_len + device_types = [] + device_statuses = [] + for idx in range(8): + device_types.append(hk_data[current_idx]) + current_idx += 1 + for idx in range(8): + device_statuses.append(hk_data[current_idx]) + current_idx += 1 + util_info = ( + f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}" + ) + util_info_2 = ( + f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | " + f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}" + ) + log_to_both(printer, util_info) + log_to_both(printer, util_info_2) + wdt.print(printer) + misc_info = ( + f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}" + ) + log_to_both(printer, misc_info) + batt_info = ( + f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | " + f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}" + ) + log_to_both(printer, batt_info) + printer.print_validity_buffer( + validity_buffer=hk_data[current_idx:], num_vars=27 + ) diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py deleted file mode 100644 index 8627554..0000000 --- a/pus_tm/hk_handling.py +++ /dev/null @@ -1,760 +0,0 @@ -"""HK Handling for EIVE OBSW""" -import struct -import os -import datetime -import json -import socket - -from tmtccmd.config.definitions import HkReplyUnpacked -from tmtccmd.tm.pus_3_fsfw_hk import ( - Service3Base, - HkContentType, - Service3FsfwTm, -) -from tmtccmd.logging import get_console_logger -from pus_tc.devs.bpx_batt import BpxSetIds -from pus_tc.devs.syrlinks_hk_handler import SetIds -from pus_tc.devs.p60dock import SetIds -from pus_tc.devs.imtq import ImtqSetIds -from tmtccmd.utility.obj_id import ObjectId, ObjectIdDictT -import config.object_ids as obj_ids - -from pus_tm.devs.reaction_wheels import handle_rw_hk_data -from pus_tm.defs import FsfwTmTcPrinter, log_to_both - - -#TODO add to configuration parameters -THERMAL_HOST = "127.0.0.1" -THERMAL_PORT = 7302 - - -LOGGER = get_console_logger() - - -def handle_hk_packet( - raw_tm: bytes, - obj_id_dict: ObjectIdDictT, - printer: FsfwTmTcPrinter, -): - tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False) - named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes) - if named_obj_id is None: - named_obj_id = tm_packet.object_id - if tm_packet.subservice == 25 or tm_packet.subservice == 26: - hk_data = tm_packet.tm_data[8:] - printer.generic_hk_tm_print( - content_type=HkContentType.HK, - object_id=named_obj_id, - set_id=tm_packet.set_id, - hk_data=hk_data, - ) - handle_regular_hk_print( - printer=printer, - object_id=named_obj_id, - hk_packet=tm_packet, - hk_data=hk_data, - ) - if tm_packet.subservice == 10 or tm_packet.subservice == 12: - LOGGER.warning("HK definitions printout not implemented yet") - - -def handle_regular_hk_print( - printer: FsfwTmTcPrinter, - object_id: ObjectId, - hk_packet: Service3Base, - hk_data: bytes, -): - objb = object_id.as_bytes - set_id = hk_packet.set_id - """This function is called when a Service 3 Housekeeping packet is received.""" - if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: - handle_rw_hk_data(printer, object_id, set_id, hk_data) - if objb == obj_ids.SYRLINKS_HANDLER_ID: - if set_id == SetIds.RX_REGISTERS_DATASET: - return handle_syrlinks_rx_registers_dataset(printer, hk_data) - elif set_id == SetIds.TX_REGISTERS_DATASET: - return handle_syrlinks_tx_registers_dataset(printer, hk_data) - else: - LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id") - if objb == obj_ids.IMTQ_HANDLER_ID: - if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and ( - set_id <= ImtqSetIds.NEGATIVE_Z_TEST - ): - return handle_self_test_data(printer, hk_data) - else: - LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id") - if objb == obj_ids.GPS_HANDLER_0_ID or object_id == obj_ids.GPS_HANDLER_1_ID: - handle_gps_data(printer=printer, hk_data=hk_data) - if objb == obj_ids.BPX_HANDLER_ID: - handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer) - if objb == obj_ids.CORE_CONTROLLER_ID: - return handle_core_hk_data(printer=printer, hk_data=hk_data) - if objb == obj_ids.PDU_1_HANDLER_ID: - return handle_pdu_data( - printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data - ) - if objb == obj_ids.PDU_2_HANDLER_ID: - return handle_pdu_data( - printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data - ) - if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: - return handle_rw_hk_data( - printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data - ) - if objb == obj_ids.P60_DOCK_HANDLER: - handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data) - if objb == obj_ids.PL_PCDU_ID: - log_to_both(printer, "Received PL PCDU HK data") - if objb == obj_ids.THERMAL_CONTROLLER_ID: - handle_thermal_controller_hk_data(printer=printer, set_id=set_id, hk_data=hk_data) - else: - LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.") - return HkReplyUnpacked() - - -def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes): - reply = HkReplyUnpacked() - header_list = [ - "RX Status", - "RX Sensitivity", - "RX Frequency Shift", - "RX IQ Power", - "RX AGC Value", - "RX Demod Eb", - "RX Demod N0", - "RX Datarate", - ] - rx_status = hk_data[0] - rx_sensitivity = struct.unpack("!I", hk_data[1:5]) - rx_frequency_shift = struct.unpack("!I", hk_data[5:9]) - rx_iq_power = struct.unpack("!H", hk_data[9:11]) - rx_agc_value = struct.unpack("!H", hk_data[11:13]) - rx_demod_eb = struct.unpack("!I", hk_data[13:17]) - rx_demod_n0 = struct.unpack("!I", hk_data[17:21]) - rx_data_rate = hk_data[21] - content_list = [ - rx_status, - rx_sensitivity, - rx_frequency_shift, - rx_iq_power, - rx_agc_value, - rx_demod_eb, - rx_demod_n0, - rx_data_rate, - ] - validity_buffer = hk_data[22:] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8) - - -def handle_syrlinks_tx_registers_dataset( - printer: FsfwTmTcPrinter, - hk_data: bytes, -): - reply = HkReplyUnpacked() - header_list = ["TX Status", "TX Waveform", "TX AGC value"] - tx_status = hk_data[0] - tx_waveform = hk_data[1] - tx_agc_value = struct.unpack("!H", hk_data[2:4]) - content_list = [tx_status, tx_waveform, tx_agc_value] - validity_buffer = hk_data[4:] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3) - - -def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes): - header_list = [ - "Init Err", - "Init Raw Mag X [nT]", - "Init Raw Mag Y [nT]", - "Init Raw Mag Z [nT]", - "Init Cal Mag X [nT]", - "Init Cal Mag Y [nT]", - "Init Cal Mag Z [nT]", - "Init Coil X Current [mA]", - "Init Coil Y Current [mA]", - "Init Coil Z Current [mA]", - "Init Coil X Temperature [°C]", - "Init Coil Y Temperature [°C]", - "Init Coil Z Temperature [°C]", - "Err", - "Raw Mag X [nT]", - "Raw Mag Y [nT]", - "Raw Mag Z [nT]", - "Cal Mag X [nT]", - "Cal Mag Y [nT]", - "Cal Mag Z [nT]", - "Coil X Current [mA]", - "Coil Y Current [mA]", - "Coil Z Current [mA]", - "Coil X Temperature [°C]", - "Coil Y Temperature [°C]", - "Coil Z Temperature [°C]", - "Fina Err", - "Fina Raw Mag X [nT]", - "Fina Raw Mag Y [nT]", - "Fina Raw Mag Z [nT]", - "Fina Cal Mag X [nT]", - "Fina Cal Mag Y [nT]", - "Fina Cal Mag Z [nT]", - "Fina Coil X Current [mA]", - "Fina Coil Y Current [mA]", - "Fina Coil Z Current [mA]", - "Fina Coil X Temperature [°C]", - "Fina Coil Y Temperature [°C]", - "Fina Coil Z Temperature [°C]", - ] - # INIT step (no coil actuation) - init_err = hk_data[0] - init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0] - init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0] - init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0] - init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0] - init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0] - init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0] - init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0] - init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0] - init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0] - init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0] - init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0] - init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0] - - # Actuation step - err = hk_data[43] - raw_mag_x = struct.unpack("!f", hk_data[44:48])[0] - raw_mag_y = struct.unpack("!f", hk_data[48:52])[0] - raw_mag_z = struct.unpack("!f", hk_data[52:56])[0] - cal_mag_x = struct.unpack("!f", hk_data[56:60])[0] - cal_mag_y = struct.unpack("!f", hk_data[60:64])[0] - cal_mag_z = struct.unpack("!f", hk_data[64:68])[0] - coil_x_current = struct.unpack("!f", hk_data[68:72])[0] - coil_y_current = struct.unpack("!f", hk_data[72:76])[0] - coil_z_current = struct.unpack("!f", hk_data[76:80])[0] - coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0] - coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0] - coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0] - - # FINA step (no coil actuation) - fina_err = hk_data[86] - fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0] - fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0] - fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0] - fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0] - fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0] - fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0] - fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0] - fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0] - fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0] - fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0] - fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0] - fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0] - - validity_buffer = hk_data[129:] - content_list = [ - init_err, - init_raw_mag_x, - init_raw_mag_y, - init_raw_mag_z, - init_cal_mag_x, - init_cal_mag_y, - init_cal_mag_z, - init_coil_x_current, - init_coil_y_current, - init_coil_z_current, - init_coil_x_temperature, - init_coil_y_temperature, - init_coil_z_temperature, - err, - raw_mag_x, - init_raw_mag_y, - raw_mag_z, - cal_mag_x, - cal_mag_y, - cal_mag_z, - coil_x_current, - coil_y_current, - coil_z_current, - coil_x_temperature, - coil_y_temperature, - coil_z_temperature, - fina_err, - fina_raw_mag_x, - fina_raw_mag_y, - fina_raw_mag_z, - fina_cal_mag_x, - fina_cal_mag_y, - fina_cal_mag_z, - fina_coil_x_current, - fina_coil_y_current, - fina_coil_z_current, - fina_coil_x_temperature, - fina_coil_y_temperature, - fina_coil_z_temperature, - ] - num_of_vars = len(header_list) - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) - -def handle_thermal_controller_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): - if set_id == 0: - LOGGER.info("Received Sensor Temperature data") - - #get all the floats - tm_data = struct.unpack("!ffffffffffffffff", hk_data[:16*4]) - parsed_data = {} - - #put them into a nice dictionary - parsed_data["SID"] = set_id - parsed_data["content"] = {} - parsed_data["content"]["SENSOR_PLOC_HEATSPREADER"] = tm_data[0] - parsed_data["content"]["SENSOR_PLOC_MISSIONBOARD"] = tm_data[1] - parsed_data["content"]["SENSOR_4K_CAMERA"] = tm_data[2] - parsed_data["content"]["SENSOR_DAC_HEATSPREADER"] = tm_data[3] - parsed_data["content"]["SENSOR_STARTRACKER"] = tm_data[4] - parsed_data["content"]["SENSOR_RW1"] = tm_data[5] - parsed_data["content"]["SENSOR_DRO"] = tm_data[6] - parsed_data["content"]["SENSOR_SCEX"] = tm_data[7] - parsed_data["content"]["SENSOR_X8"] = tm_data[8] - parsed_data["content"]["SENSOR_HPA"] = tm_data[9] - parsed_data["content"]["SENSOR_TX_MODUL"] = tm_data[10] - parsed_data["content"]["SENSOR_MPA"] = tm_data[11] - parsed_data["content"]["SENSOR_ACU"] = tm_data[12] - parsed_data["content"]["SENSOR_PLPCDU_HEATSPREADER"] = tm_data[13] - parsed_data["content"]["SENSOR_TCS_BOARD"] = tm_data[14] - parsed_data["content"]["SENSOR_MAGNETTORQUER"] = tm_data[15] - - #which in turn will become a json to be sent over the wire - json_string = json.dumps(parsed_data) - #print(json_string) - - #try to send it to a tcp server - try: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.connect((THERMAL_HOST, THERMAL_PORT)) - s.sendall(bytes(json_string, encoding="utf-8")) - except: - #fail silently if there is noone listening, should be a non breaking feature - pass - - -def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes): - LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}") - reply = HkReplyUnpacked() - var_index = 0 - header_list = [ - "Latitude", - "Longitude", - "Altitude", - "Fix Mode", - "Sats in Use", - "Date", - "Unix Seconds", - ] - latitude = struct.unpack("!d", hk_data[0:8])[0] - longitude = struct.unpack("!d", hk_data[8:16])[0] - altitude = struct.unpack("!d", hk_data[16:24])[0] - fix_mode = hk_data[24] - sat_in_use = hk_data[25] - year = struct.unpack("!H", hk_data[26:28])[0] - month = hk_data[28] - day = hk_data[29] - hours = hk_data[30] - minutes = hk_data[31] - seconds = hk_data[32] - date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}" - unix_seconds = struct.unpack("!I", hk_data[33:37])[0] - content_list = [ - latitude, - longitude, - altitude, - fix_mode, - sat_in_use, - date_string, - unix_seconds, - ] - var_index += 13 - reply.num_of_vars = var_index - if not os.path.isfile("gps_log.txt"): - with open("gps_log.txt", "w") as gps_file: - gps_file.write( - "Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, " - "Date, Unix Seconds\n" - ) - with open("gps_log.txt", "a") as gps_file: - gps_file.write( - f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, " - f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n" - ) - validity_buffer = hk_data[37:39] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) - - -def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): - if set_id == BpxSetIds.GET_HK_SET: - fmt_str = "!HHHHhhhhIB" - inc_len = struct.calcsize(fmt_str) - ( - charge_current, - discharge_current, - heater_current, - batt_voltage, - batt_temp_1, - batt_temp_2, - batt_temp_3, - batt_temp_4, - reboot_cntr, - boot_cause, - ) = struct.unpack(fmt_str, hk_data[0:inc_len]) - header_list = [ - "Charge Current", - "Discharge Current", - "Heater Current", - "Battery Voltage", - "Batt Temp 1", - "Batt Temp 2", - "Batt Temp 3", - "Batt Temp 4", - "Reboot Counter", - "Boot Cause", - ] - content_list = [ - charge_current, - discharge_current, - heater_current, - batt_voltage, - batt_temp_1, - batt_temp_2, - batt_temp_3, - batt_temp_4, - reboot_cntr, - boot_cause, - ] - validity_buffer = hk_data[inc_len:] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) - elif set_id == BpxSetIds.GET_CFG_SET: - battheat_mode = hk_data[0] - battheat_low = struct.unpack("!b", hk_data[1:2])[0] - battheat_high = struct.unpack("!b", hk_data[2:3])[0] - header_list = [ - "Battery Heater Mode", - "Battery Heater Low Limit", - "Battery Heater High Limit", - ] - content_list = [battheat_mode, battheat_low, battheat_high] - validity_buffer = hk_data[3:] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) - - -def handle_core_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes): - - fmt_str = "!fffH" - inc_len = struct.calcsize(fmt_str) - (temperature, ps_voltage, pl_voltage, tx_agc_value) = struct.unpack( - fmt_str, hk_data[0 : 0 + inc_len] - ) - printout = ( - f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | " - f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}" - ) - log_to_both(printer, printout) - printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4) - - -P60_INDEX_LIST = [ - "ACU VCC", - "PDU1 VCC", - "X3 IDLE VCC", - "PDU2 VCC", - "ACU VBAT", - "PDU1 VBAT", - "X3 IDLE VBAT", - "PDU2 VBAT", - "STACK VBAT", - "STACK 3V3", - "STACK 5V", - "GS3V3", - "GS5V", -] - -WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"] - -PDU1_CHANNELS_NAMES = [ - "TCS Board", - "Syrlinks", - "Startracker", - "MGT", - "SUS Nominal", - "SCEX", - "PLOC", - "ACS A Side", - "Unused Channel 8", -] - -PDU2_CHANNELS_NAMES = [ - "Q7S", - "Payload PCDU CH1", - "RW", - "TCS Heater In", - "SUS Redundant", - "Deployment Mechanism", - "Payload PCDU CH6", - "ACS B Side", - "Payload Camera", -] - -PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES] - - -class WdtInfo: - def __init__(self): - self.wdt_reboots_list = [] - self.time_pings_left_list = [] - - def print(self, printer: FsfwTmTcPrinter): - wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)" - log_to_both(printer, wdt_info) - for idx in range(len(self.wdt_reboots_list)): - log_to_both( - printer, - f"{WDT_LIST[idx].ljust(5)} | " - f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}", - ) - - def parse(self, wdt_data: bytes, current_idx: int) -> int: - priv_idx = 0 - self.wdt_reboots_list = [] - self.time_pings_left_list = [] - for idx in range(5): - self.wdt_reboots_list.append( - struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0] - ) - priv_idx += 4 - current_idx += 4 - for idx in range(3): - self.time_pings_left_list.append( - struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0] - ) - priv_idx += 4 - current_idx += 4 - for idx in range(2): - self.time_pings_left_list.append(wdt_data[priv_idx]) - current_idx += 1 - priv_idx += 1 - return current_idx - - -def handle_pdu_data( - printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes -): - current_idx = 0 - priv_idx = pdu_idx - 1 - if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX: - fmt_str = "!hhBBBIIH" - inc_len = struct.calcsize(fmt_str) - ( - vcc, - vbat, - conv_enb_0, - conv_enb_1, - conv_enb_2, - boot_cause, - uptime, - reset_cause, - ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) - log_to_both(printer, f"VCC {vcc} mV | VBAT {vbat} mV") - log_to_both( - printer, f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]" - ) - log_to_both( - printer, - f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}", - ) - current_idx += inc_len - latchup_list = [] - log_to_both(printer, "Latchups") - for idx in range(len(PDU1_CHANNELS_NAMES)): - latchup_list.append( - struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] - ) - content_line = ( - f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}" - ) - log_to_both(printer, content_line) - current_idx += 2 - device_types = [] - for idx in range(len(PDU1_CHANNELS_NAMES)): - device_types.append(hk_data[current_idx]) - current_idx += 1 - device_statuses = [] - for idx in range(len(PDU1_CHANNELS_NAMES)): - device_statuses.append(hk_data[current_idx]) - current_idx += 1 - wdt = WdtInfo() - current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) - wdt.print(printer=printer) - if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE: - log_to_both(printer, f"Received PDU HK from PDU {pdu_idx}") - current_list = [] - for idx in range(len(PDU1_CHANNELS_NAMES)): - current_list.append( - struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0] - ) - current_idx += 2 - voltage_list = [] - for idx in range(len(PDU1_CHANNELS_NAMES)): - voltage_list.append( - struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] - ) - current_idx += 2 - output_enb_list = [] - for idx in range(len(PDU1_CHANNELS_NAMES)): - output_enb_list.append(hk_data[current_idx]) - current_idx += 1 - header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" - print(header_str) - printer.file_logger.info(header_str) - for idx in range(len(PDU1_CHANNELS_NAMES)): - out_enb = f"{output_enb_list[idx]}".ljust(6) - content_line = ( - f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | " - f"{voltage_list[idx]:05} | {current_list[idx]:04}" - ) - log_to_both(printer, content_line) - fmt_str = "!IBh" - inc_len = struct.calcsize(fmt_str) - (boot_count, batt_mode, temperature) = struct.unpack( - fmt_str, hk_data[current_idx : current_idx + inc_len] - ) - info = ( - f"Boot Count {boot_count} | Battery Mode {batt_mode} | " - f"Temperature {temperature / 10.0}" - ) - log_to_both(printer, info) - - -def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): - if set_id == SetIds.P60_CORE: - log_to_both(printer, "Received P60 Core HK. Voltages in mV, currents in mA") - current_idx = 0 - current_list = [] - for idx in range(13): - current_list.append( - struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0] - ) - current_idx += 2 - voltage_list = [] - for idx in range(13): - voltage_list.append( - struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] - ) - current_idx += 2 - out_enb_list = [] - for idx in range(13): - out_enb_list.append(hk_data[current_idx]) - current_idx += 1 - header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" - print(header_str) - printer.file_logger.info(header_str) - for idx in range(13): - out_enb = f"{out_enb_list[idx]}".ljust(6) - content_line = ( - f"{P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | " - f"{voltage_list[idx]:05} | {current_list[idx]:04}" - ) - log_to_both(printer, content_line) - fmt_str = "!IBhHhh" - inc_len = struct.calcsize(fmt_str) - ( - boot_count, - batt_mode, - batt_current, - batt_voltage, - temp_0, - temp_1, - ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) - current_idx += inc_len - batt_info = ( - f"Batt: Mode {batt_mode} | Boot Count {boot_count} | " - f"Charge current {batt_current} | Voltage {batt_voltage}" - ) - temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | " - log_to_both(printer, temps) - log_to_both(printer, batt_info) - printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9) - if set_id == SetIds.P60_AUX: - log_to_both(printer, "Received P60 AUX HK. Voltages in mV, currents in mA") - current_idx = 0 - latchup_list = [] - log_to_both(printer, "P60 Dock Latchups") - for idx in range(0, 13): - latchup_list.append( - struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] - ) - content_line = f"{P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}" - log_to_both(printer, content_line) - current_idx += 2 - fmt_str = "!IIHBBHHhhB" - inc_len = struct.calcsize(fmt_str) - ( - boot_cause, - uptime, - reset_cause, - heater_on, - conv_5v_on, - dock_vbat, - dock_vcc_c, - batt_temp_0, - batt_temp_1, - dearm_status, - ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) - current_idx += inc_len - wdt = WdtInfo() - current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) - fmt_str = "!hhbb" - inc_len = struct.calcsize(fmt_str) - ( - batt_charge_current, - batt_discharge_current, - ant6_depl, - ar6_depl, - ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) - current_idx += inc_len - device_types = [] - device_statuses = [] - for idx in range(8): - device_types.append(hk_data[current_idx]) - current_idx += 1 - for idx in range(8): - device_statuses.append(hk_data[current_idx]) - current_idx += 1 - util_info = ( - f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}" - ) - util_info_2 = ( - f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | " - f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}" - ) - log_to_both(printer, util_info) - log_to_both(printer, util_info_2) - wdt.print(printer) - misc_info = ( - f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}" - ) - log_to_both(printer, misc_info) - batt_info = ( - f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | " - f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}" - ) - log_to_both(printer, batt_info) - printer.print_validity_buffer( - validity_buffer=hk_data[current_idx:], num_vars=27 - )