From 260a0830919332d246c1be1f6159cfafd1cb7b6d Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 19 May 2022 13:20:22 +0200 Subject: [PATCH 1/2] continued hk parsing --- pus_tc/system/acs.py | 1 + pus_tc/system/controllers.py | 4 +- pus_tm/devs/bpx_bat.py | 66 ++++ pus_tm/devs/gps.py | 59 +++ pus_tm/devs/gyros.py | 8 + pus_tm/devs/imtq_mgt.py | 140 +++++++ pus_tm/devs/mgms.py | 8 + pus_tm/devs/pcdu.py | 294 +++++++++++++++ pus_tm/devs/syrlinks.py | 67 ++++ pus_tm/factory_hook.py | 2 - pus_tm/hk_handling.py | 687 ++--------------------------------- pus_tm/system/__init__.py | 0 pus_tm/system/core.py | 21 ++ pus_tm/system/tcs.py | 39 ++ pus_tm/tm_tcp_server.py | 41 +-- tmtccmd | 2 +- tmtcgui.py | 1 + 17 files changed, 765 insertions(+), 675 deletions(-) create mode 100644 pus_tm/devs/bpx_bat.py create mode 100644 pus_tm/devs/gps.py create mode 100644 pus_tm/devs/gyros.py create mode 100644 pus_tm/devs/imtq_mgt.py create mode 100644 pus_tm/devs/mgms.py create mode 100644 pus_tm/devs/pcdu.py create mode 100644 pus_tm/devs/syrlinks.py create mode 100644 pus_tm/system/__init__.py create mode 100644 pus_tm/system/core.py create mode 100644 pus_tm/system/tcs.py diff --git a/pus_tc/system/acs.py b/pus_tc/system/acs.py index 7d3eb48..76b3dcc 100644 --- a/pus_tc/system/acs.py +++ b/pus_tc/system/acs.py @@ -5,6 +5,7 @@ from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID from .common import command_mode + class AcsOpCodes: ACS_ASS_A_SIDE = ["0", "acs-a"] ACS_ASS_B_SIDE = ["1", "acs-b"] diff --git a/pus_tc/system/controllers.py b/pus_tc/system/controllers.py index 30a5b4d..6696426 100644 --- a/pus_tc/system/controllers.py +++ b/pus_tc/system/controllers.py @@ -2,7 +2,6 @@ from ast import Pass from tmtccmd.tc.definitions import TcQueueT - from .common import command_mode import config.object_ids as obj_ids @@ -20,7 +19,7 @@ class Info: def pack_controller_commands(tc_queue: TcQueueT, op_code: str): mode = int(input("Specify mode: (OFF = 0; ON = 1; NORMAL = 2) [2] ") or "2") print(mode) - if mode < 0 or mode > 2: + if mode < 0 or mode > 2: print("Invalid Mode, defaulting to OFF") mode = 0 submode = int(input("Specify submode [0]: ") or "0") @@ -32,6 +31,7 @@ def pack_controller_commands(tc_queue: TcQueueT, op_code: str): info=op_code + " to " + str(mode) + "," + str(submode), ) + def get_object_from_op_code(op_code: str): try: return bytes.fromhex(op_code) diff --git a/pus_tm/devs/bpx_bat.py b/pus_tm/devs/bpx_bat.py new file mode 100644 index 0000000..eb27286 --- /dev/null +++ b/pus_tm/devs/bpx_bat.py @@ -0,0 +1,66 @@ +import struct + +from pus_tc.devs.bpx_batt import BpxSetIds +from pus_tm.defs import PrintWrapper +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + + +def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): + pw = PrintWrapper(printer) + if set_id == BpxSetIds.GET_HK_SET: + fmt_str = "!HHHHhhhhIB" + inc_len = struct.calcsize(fmt_str) + ( + charge_current, + discharge_current, + heater_current, + batt_voltage, + batt_temp_1, + batt_temp_2, + batt_temp_3, + batt_temp_4, + reboot_cntr, + boot_cause, + ) = struct.unpack(fmt_str, hk_data[0:inc_len]) + header_list = [ + "Charge Current", + "Discharge Current", + "Heater Current", + "Battery Voltage", + "Batt Temp 1", + "Batt Temp 2", + "Batt Temp 3", + "Batt Temp 4", + "Reboot Counter", + "Boot Cause", + ] + content_list = [ + charge_current, + discharge_current, + heater_current, + batt_voltage, + batt_temp_1, + batt_temp_2, + batt_temp_3, + batt_temp_4, + reboot_cntr, + boot_cause, + ] + validity_buffer = hk_data[inc_len:] + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) + elif set_id == BpxSetIds.GET_CFG_SET: + battheat_mode = hk_data[0] + battheat_low = struct.unpack("!b", hk_data[1:2])[0] + battheat_high = struct.unpack("!b", hk_data[2:3])[0] + header_list = [ + "Battery Heater Mode", + "Battery Heater Low Limit", + "Battery Heater High Limit", + ] + content_list = [battheat_mode, battheat_low, battheat_high] + validity_buffer = hk_data[3:] + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) diff --git a/pus_tm/devs/gps.py b/pus_tm/devs/gps.py new file mode 100644 index 0000000..d60828d --- /dev/null +++ b/pus_tm/devs/gps.py @@ -0,0 +1,59 @@ +import os +import struct +from datetime import datetime + +from pus_tm.defs import PrintWrapper +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + + +def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes): + pw = PrintWrapper(printer) + pw.dlog(f"Received GPS data, HK data length {len(hk_data)}") + var_index = 0 + header_list = [ + "Latitude", + "Longitude", + "Altitude", + "Fix Mode", + "Sats in Use", + "Date", + "Unix Seconds", + ] + latitude = struct.unpack("!d", hk_data[0:8])[0] + longitude = struct.unpack("!d", hk_data[8:16])[0] + altitude = struct.unpack("!d", hk_data[16:24])[0] + fix_mode = hk_data[24] + sat_in_use = hk_data[25] + year = struct.unpack("!H", hk_data[26:28])[0] + month = hk_data[28] + day = hk_data[29] + hours = hk_data[30] + minutes = hk_data[31] + seconds = hk_data[32] + date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}" + unix_seconds = struct.unpack("!I", hk_data[33:37])[0] + content_list = [ + latitude, + longitude, + altitude, + fix_mode, + sat_in_use, + date_string, + unix_seconds, + ] + var_index += 13 + if not os.path.isfile("gps_log.txt"): + with open("gps_log.txt", "w") as gps_file: + gps_file.write( + "Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, " + "Date, Unix Seconds\n" + ) + with open("gps_log.txt", "a") as gps_file: + gps_file.write( + f"{datetime.now()}, {latitude}, {longitude}, {altitude}, " + f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n" + ) + validity_buffer = hk_data[37:39] + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) diff --git a/pus_tm/devs/gyros.py b/pus_tm/devs/gyros.py new file mode 100644 index 0000000..9646368 --- /dev/null +++ b/pus_tm/devs/gyros.py @@ -0,0 +1,8 @@ +from tmtccmd.utility import ObjectId +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + + +def handle_gyros_hk_data( + object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes +): + pass diff --git a/pus_tm/devs/imtq_mgt.py b/pus_tm/devs/imtq_mgt.py new file mode 100644 index 0000000..8ed68c6 --- /dev/null +++ b/pus_tm/devs/imtq_mgt.py @@ -0,0 +1,140 @@ +import struct + +from pus_tm.defs import PrintWrapper +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + + +def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes): + pw = PrintWrapper(printer) + header_list = [ + "Init Err", + "Init Raw Mag X [nT]", + "Init Raw Mag Y [nT]", + "Init Raw Mag Z [nT]", + "Init Cal Mag X [nT]", + "Init Cal Mag Y [nT]", + "Init Cal Mag Z [nT]", + "Init Coil X Current [mA]", + "Init Coil Y Current [mA]", + "Init Coil Z Current [mA]", + "Init Coil X Temperature [°C]", + "Init Coil Y Temperature [°C]", + "Init Coil Z Temperature [°C]", + "Err", + "Raw Mag X [nT]", + "Raw Mag Y [nT]", + "Raw Mag Z [nT]", + "Cal Mag X [nT]", + "Cal Mag Y [nT]", + "Cal Mag Z [nT]", + "Coil X Current [mA]", + "Coil Y Current [mA]", + "Coil Z Current [mA]", + "Coil X Temperature [°C]", + "Coil Y Temperature [°C]", + "Coil Z Temperature [°C]", + "Fina Err", + "Fina Raw Mag X [nT]", + "Fina Raw Mag Y [nT]", + "Fina Raw Mag Z [nT]", + "Fina Cal Mag X [nT]", + "Fina Cal Mag Y [nT]", + "Fina Cal Mag Z [nT]", + "Fina Coil X Current [mA]", + "Fina Coil Y Current [mA]", + "Fina Coil Z Current [mA]", + "Fina Coil X Temperature [°C]", + "Fina Coil Y Temperature [°C]", + "Fina Coil Z Temperature [°C]", + ] + # INIT step (no coil actuation) + init_err = hk_data[0] + init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0] + init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0] + init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0] + init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0] + init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0] + init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0] + init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0] + init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0] + init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0] + init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0] + init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0] + init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0] + + # Actuation step + err = hk_data[43] + raw_mag_x = struct.unpack("!f", hk_data[44:48])[0] + raw_mag_y = struct.unpack("!f", hk_data[48:52])[0] + raw_mag_z = struct.unpack("!f", hk_data[52:56])[0] + cal_mag_x = struct.unpack("!f", hk_data[56:60])[0] + cal_mag_y = struct.unpack("!f", hk_data[60:64])[0] + cal_mag_z = struct.unpack("!f", hk_data[64:68])[0] + coil_x_current = struct.unpack("!f", hk_data[68:72])[0] + coil_y_current = struct.unpack("!f", hk_data[72:76])[0] + coil_z_current = struct.unpack("!f", hk_data[76:80])[0] + coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0] + coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0] + coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0] + + # FINA step (no coil actuation) + fina_err = hk_data[86] + fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0] + fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0] + fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0] + fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0] + fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0] + fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0] + fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0] + fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0] + fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0] + fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0] + fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0] + fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0] + + validity_buffer = hk_data[129:] + content_list = [ + init_err, + init_raw_mag_x, + init_raw_mag_y, + init_raw_mag_z, + init_cal_mag_x, + init_cal_mag_y, + init_cal_mag_z, + init_coil_x_current, + init_coil_y_current, + init_coil_z_current, + init_coil_x_temperature, + init_coil_y_temperature, + init_coil_z_temperature, + err, + raw_mag_x, + init_raw_mag_y, + raw_mag_z, + cal_mag_x, + cal_mag_y, + cal_mag_z, + coil_x_current, + coil_y_current, + coil_z_current, + coil_x_temperature, + coil_y_temperature, + coil_z_temperature, + fina_err, + fina_raw_mag_x, + fina_raw_mag_y, + fina_raw_mag_z, + fina_cal_mag_x, + fina_cal_mag_y, + fina_cal_mag_z, + fina_coil_x_current, + fina_coil_y_current, + fina_coil_z_current, + fina_coil_x_temperature, + fina_coil_y_temperature, + fina_coil_z_temperature, + ] + num_of_vars = len(header_list) + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) diff --git a/pus_tm/devs/mgms.py b/pus_tm/devs/mgms.py new file mode 100644 index 0000000..671902d --- /dev/null +++ b/pus_tm/devs/mgms.py @@ -0,0 +1,8 @@ +from tmtccmd.utility import ObjectId +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + + +def handle_mgm_hk_data( + object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes +): + pass diff --git a/pus_tm/devs/pcdu.py b/pus_tm/devs/pcdu.py new file mode 100644 index 0000000..d33ddc6 --- /dev/null +++ b/pus_tm/devs/pcdu.py @@ -0,0 +1,294 @@ +import struct + +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter +from pus_tm.defs import PrintWrapper +from gomspace.gomspace_common import SetIds + +P60_INDEX_LIST = [ + "ACU VCC", + "PDU1 VCC", + "X3 IDLE VCC", + "PDU2 VCC", + "ACU VBAT", + "PDU1 VBAT", + "X3 IDLE VBAT", + "PDU2 VBAT", + "STACK VBAT", + "STACK 3V3", + "STACK 5V", + "GS3V3", + "GS5V", +] + +WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"] + +PDU1_CHANNELS_NAMES = [ + "TCS Board", + "Syrlinks", + "Startracker", + "MGT", + "SUS Nominal", + "SCEX", + "PLOC", + "ACS A Side", + "Unused Channel 8", +] + +PDU2_CHANNELS_NAMES = [ + "Q7S", + "Payload PCDU CH1", + "RW", + "TCS Heater In", + "SUS Redundant", + "Deployment Mechanism", + "Payload PCDU CH6", + "ACS B Side", + "Payload Camera", +] + +PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES] + + +class WdtInfo: + def __init__(self, pw: PrintWrapper): + self.wdt_reboots_list = [] + self.time_pings_left_list = [] + self.pw = pw + + def print(self): + wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)" + self.pw.dlog(wdt_info) + for idx in range(len(self.wdt_reboots_list)): + self.pw.dlog( + f"{WDT_LIST[idx].ljust(5)} | " + f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}", + ) + + def parse(self, wdt_data: bytes, current_idx: int) -> int: + priv_idx = 0 + self.wdt_reboots_list = [] + self.time_pings_left_list = [] + for idx in range(5): + self.wdt_reboots_list.append( + struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0] + ) + priv_idx += 4 + current_idx += 4 + for idx in range(3): + self.time_pings_left_list.append( + struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0] + ) + priv_idx += 4 + current_idx += 4 + for idx in range(2): + self.time_pings_left_list.append(wdt_data[priv_idx]) + current_idx += 1 + priv_idx += 1 + return current_idx + + +def handle_pdu_data( + printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes +): + pw = PrintWrapper(printer=printer) + current_idx = 0 + priv_idx = pdu_idx - 1 + if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX: + fmt_str = "!hhBBBIIH" + inc_len = struct.calcsize(fmt_str) + ( + vcc, + vbat, + conv_enb_0, + conv_enb_1, + conv_enb_2, + boot_cause, + uptime, + reset_cause, + ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV") + pw.dlog(f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]") + pw.dlog( + f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}", + ) + current_idx += inc_len + latchup_list = [] + pw.dlog("Latchups") + for idx in range(len(PDU1_CHANNELS_NAMES)): + latchup_list.append( + struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] + ) + content_line = ( + f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}" + ) + pw.dlog(content_line) + current_idx += 2 + device_types = [] + for idx in range(len(PDU1_CHANNELS_NAMES)): + device_types.append(hk_data[current_idx]) + current_idx += 1 + device_statuses = [] + for idx in range(len(PDU1_CHANNELS_NAMES)): + device_statuses.append(hk_data[current_idx]) + current_idx += 1 + wdt = WdtInfo(pw=pw) + current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) + wdt.print() + if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE: + pw.dlog(f"Received PDU HK from PDU {pdu_idx}") + current_list = [] + for idx in range(len(PDU1_CHANNELS_NAMES)): + current_list.append( + struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0] + ) + current_idx += 2 + voltage_list = [] + for idx in range(len(PDU1_CHANNELS_NAMES)): + voltage_list.append( + struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] + ) + current_idx += 2 + output_enb_list = [] + for idx in range(len(PDU1_CHANNELS_NAMES)): + output_enb_list.append(hk_data[current_idx]) + current_idx += 1 + header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" + print(header_str) + printer.file_logger.info(header_str) + for idx in range(len(PDU1_CHANNELS_NAMES)): + out_enb = f"{output_enb_list[idx]}".ljust(6) + content_line = ( + f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | " + f"{voltage_list[idx]:05} | {current_list[idx]:04}" + ) + pw.dlog(content_line) + fmt_str = "!IBh" + inc_len = struct.calcsize(fmt_str) + (boot_count, batt_mode, temperature) = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + ) + info = ( + f"Boot Count {boot_count} | Battery Mode {batt_mode} | " + f"Temperature {temperature / 10.0}" + ) + pw.dlog(info) + + +def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): + pw = PrintWrapper(printer=printer) + if set_id == SetIds.P60_CORE: + pw.dlog("Received P60 Core HK. Voltages in mV, currents in mA") + current_idx = 0 + current_list = [] + for idx in range(13): + current_list.append( + struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0] + ) + current_idx += 2 + voltage_list = [] + for idx in range(13): + voltage_list.append( + struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] + ) + current_idx += 2 + out_enb_list = [] + for idx in range(13): + out_enb_list.append(hk_data[current_idx]) + current_idx += 1 + header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" + print(header_str) + printer.file_logger.info(header_str) + for idx in range(13): + out_enb = f"{out_enb_list[idx]}".ljust(6) + content_line = ( + f"{P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | " + f"{voltage_list[idx]:05} | {current_list[idx]:04}" + ) + pw.dlog(content_line) + fmt_str = "!IBhHhh" + inc_len = struct.calcsize(fmt_str) + ( + boot_count, + batt_mode, + batt_current, + batt_voltage, + temp_0, + temp_1, + ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + batt_info = ( + f"Batt: Mode {batt_mode} | Boot Count {boot_count} | " + f"Charge current {batt_current} | Voltage {batt_voltage}" + ) + temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | " + pw.dlog(temps) + pw.dlog(batt_info) + printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9) + if set_id == SetIds.P60_AUX: + pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA") + current_idx = 0 + latchup_list = [] + pw.dlog("P60 Dock Latchups") + for idx in range(0, 13): + latchup_list.append( + struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] + ) + content_line = f"{P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}" + pw.dlog(content_line) + current_idx += 2 + fmt_str = "!IIHBBHHhhB" + inc_len = struct.calcsize(fmt_str) + ( + boot_cause, + uptime, + reset_cause, + heater_on, + conv_5v_on, + dock_vbat, + dock_vcc_c, + batt_temp_0, + batt_temp_1, + dearm_status, + ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + wdt = WdtInfo(pw=pw) + current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) + fmt_str = "!hhbb" + inc_len = struct.calcsize(fmt_str) + ( + batt_charge_current, + batt_discharge_current, + ant6_depl, + ar6_depl, + ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + device_types = [] + device_statuses = [] + for idx in range(8): + device_types.append(hk_data[current_idx]) + current_idx += 1 + for idx in range(8): + device_statuses.append(hk_data[current_idx]) + current_idx += 1 + util_info = ( + f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}" + ) + util_info_2 = ( + f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | " + f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}" + ) + pw.dlog(util_info) + pw.dlog(util_info_2) + wdt.print() + misc_info = ( + f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}" + ) + pw.dlog(misc_info) + batt_info = ( + f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | " + f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}" + ) + pw.dlog(batt_info) + printer.print_validity_buffer( + validity_buffer=hk_data[current_idx:], num_vars=27 + ) diff --git a/pus_tm/devs/syrlinks.py b/pus_tm/devs/syrlinks.py new file mode 100644 index 0000000..3c82f7a --- /dev/null +++ b/pus_tm/devs/syrlinks.py @@ -0,0 +1,67 @@ +import struct + +from pus_tm.defs import PrintWrapper +from pus_tc.devs.syrlinks_hk_handler import SetIds +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + + +def handle_syrlinks_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): + if set_id == SetIds.RX_REGISTERS_DATASET: + return handle_syrlinks_rx_registers_dataset(printer, hk_data) + elif set_id == SetIds.TX_REGISTERS_DATASET: + return handle_syrlinks_tx_registers_dataset(printer, hk_data) + else: + pw = PrintWrapper(printer) + pw.dlog(f"Service 3 TM: Syrlinks handler reply with unknown set ID {set_id}") + + +def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes): + pw = PrintWrapper(printer) + header_list = [ + "RX Status", + "RX Sensitivity", + "RX Frequency Shift", + "RX IQ Power", + "RX AGC Value", + "RX Demod Eb", + "RX Demod N0", + "RX Datarate", + ] + rx_status = hk_data[0] + rx_sensitivity = struct.unpack("!I", hk_data[1:5]) + rx_frequency_shift = struct.unpack("!I", hk_data[5:9]) + rx_iq_power = struct.unpack("!H", hk_data[9:11]) + rx_agc_value = struct.unpack("!H", hk_data[11:13]) + rx_demod_eb = struct.unpack("!I", hk_data[13:17]) + rx_demod_n0 = struct.unpack("!I", hk_data[17:21]) + rx_data_rate = hk_data[21] + content_list = [ + rx_status, + rx_sensitivity, + rx_frequency_shift, + rx_iq_power, + rx_agc_value, + rx_demod_eb, + rx_demod_n0, + rx_data_rate, + ] + validity_buffer = hk_data[22:] + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8) + + +def handle_syrlinks_tx_registers_dataset( + printer: FsfwTmTcPrinter, + hk_data: bytes, +): + pw = PrintWrapper(printer) + header_list = ["TX Status", "TX Waveform", "TX AGC value"] + tx_status = hk_data[0] + tx_waveform = hk_data[1] + tx_agc_value = struct.unpack("!H", hk_data[2:4]) + content_list = [tx_status, tx_waveform, tx_agc_value] + validity_buffer = hk_data[4:] + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3) diff --git a/pus_tm/factory_hook.py b/pus_tm/factory_hook.py index afac7c0..8bd8fe1 100644 --- a/pus_tm/factory_hook.py +++ b/pus_tm/factory_hook.py @@ -33,7 +33,6 @@ def ccsds_tm_handler(apid: int, raw_tm_packet: bytes, _user_args: any) -> None: pus_factory_hook(raw_tm_packet=raw_tm_packet) - def pus_factory_hook(raw_tm_packet: bytes): if len(raw_tm_packet) < 8: LOGGER.warning("Detected packet shorter than 8 bytes!") @@ -81,6 +80,5 @@ def pus_factory_hook(raw_tm_packet: bytes): packet=raw_tm_packet, srv_subservice=(service_type, subservice_type) ) except ValueError: - # TODO: Log faulty packet LOGGER.warning("Invalid packet format detected") log_raw_unknown_packet(packet=raw_tm_packet, packet_type=PacketTypes.TM) diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index 2197f8d..4d04a5d 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -1,31 +1,32 @@ """HK Handling for EIVE OBSW""" import struct -import os -import datetime - - +from pus_tm.system.tcs import handle_thermal_controller_hk_data from tmtccmd.config.definitions import HkReplyUnpacked from tmtccmd.tm.pus_3_fsfw_hk import ( Service3Base, HkContentType, Service3FsfwTm, ) -from tmtccmd.logging import get_console_logger -from pus_tc.devs.bpx_batt import BpxSetIds -from pus_tc.devs.syrlinks_hk_handler import SetIds -from pus_tc.devs.p60dock import SetIds -from pus_tc.devs.imtq import ImtqSetIds from tmtccmd.utility.obj_id import ObjectId, ObjectIdDictT -import config.object_ids as obj_ids +from tmtccmd.logging import get_console_logger +from pus_tm.devs.bpx_bat import handle_bpx_hk_data +from pus_tm.devs.gps import handle_gps_data +from pus_tm.devs.gyros import handle_gyros_hk_data +from pus_tm.devs.imtq_mgt import handle_self_test_data +from pus_tm.devs.pcdu import handle_pdu_data, handle_p60_hk_data +from pus_tm.devs.syrlinks import handle_syrlinks_hk_data +from pus_tc.devs.imtq import ImtqSetIds from pus_tm.devs.reaction_wheels import handle_rw_hk_data from pus_tm.defs import FsfwTmTcPrinter, log_to_both +from pus_tm.system.core import handle_core_hk_data +from pus_tm.devs.mgms import handle_mgm_hk_data +import config.object_ids as obj_ids from pus_tm.tm_tcp_server import TmTcpServer - LOGGER = get_console_logger() TM_TCP_SERVER = TmTcpServer.getInstance() @@ -42,9 +43,9 @@ def handle_hk_packet( named_obj_id = tm_packet.object_id if tm_packet.subservice == 25 or tm_packet.subservice == 26: hk_data = tm_packet.tm_data[8:] - TM_TCP_SERVER.report_raw_hk_data(object_id=named_obj_id, - set_id=tm_packet.set_id, - hk_data=hk_data) + TM_TCP_SERVER.report_raw_hk_data( + object_id=named_obj_id, set_id=tm_packet.set_id, hk_data=hk_data + ) printer.generic_hk_tm_print( content_type=HkContentType.HK, object_id=named_obj_id, @@ -60,6 +61,7 @@ def handle_hk_packet( if tm_packet.subservice == 10 or tm_packet.subservice == 12: LOGGER.warning("HK definitions printout not implemented yet") + def handle_regular_hk_print( printer: FsfwTmTcPrinter, object_id: ObjectId, @@ -72,12 +74,7 @@ def handle_regular_hk_print( if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: handle_rw_hk_data(printer, object_id, set_id, hk_data) if objb == obj_ids.SYRLINKS_HANDLER_ID: - if set_id == SetIds.RX_REGISTERS_DATASET: - return handle_syrlinks_rx_registers_dataset(printer, hk_data) - elif set_id == SetIds.TX_REGISTERS_DATASET: - return handle_syrlinks_tx_registers_dataset(printer, hk_data) - else: - LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id") + handle_syrlinks_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) if objb == obj_ids.IMTQ_HANDLER_ID: if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and ( set_id <= ImtqSetIds.NEGATIVE_Z_TEST @@ -90,7 +87,7 @@ def handle_regular_hk_print( if objb == obj_ids.BPX_HANDLER_ID: handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer) if objb == obj_ids.CORE_CONTROLLER_ID: - return handle_core_hk_data(printer=printer, hk_data=hk_data) + return handle_core_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) if objb == obj_ids.PDU_1_HANDLER_ID: return handle_pdu_data( printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data @@ -105,636 +102,30 @@ def handle_regular_hk_print( ) if objb == obj_ids.P60_DOCK_HANDLER: handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data) + if objb in [ + obj_ids.GYRO_0_HANDLER_ID, + obj_ids.GYRO_1_HANDLER_ID, + obj_ids.GYRO_2_HANDLER_ID, + obj_ids.GYRO_3_HANDLER_ID, + ]: + handle_gyros_hk_data( + object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id + ) + if objb in [ + obj_ids.MGM_0_HANDLER_ID, + obj_ids.MGM_1_HANDLER_ID, + obj_ids.MGM_2_HANDLER_ID, + obj_ids.MGM_3_HANDLER_ID, + ]: + handle_mgm_hk_data( + object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id + ) if objb == obj_ids.PL_PCDU_ID: log_to_both(printer, "Received PL PCDU HK data") if objb == obj_ids.THERMAL_CONTROLLER_ID: - handle_thermal_controller_hk_data( object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data) + handle_thermal_controller_hk_data( + object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data + ) else: LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.") return HkReplyUnpacked() - -def handle_syrlinks_rx_registers_dataset( printer: FsfwTmTcPrinter, hk_data: bytes): - reply = HkReplyUnpacked() - header_list = [ - "RX Status", - "RX Sensitivity", - "RX Frequency Shift", - "RX IQ Power", - "RX AGC Value", - "RX Demod Eb", - "RX Demod N0", - "RX Datarate", - ] - rx_status = hk_data[0] - rx_sensitivity = struct.unpack("!I", hk_data[1:5]) - rx_frequency_shift = struct.unpack("!I", hk_data[5:9]) - rx_iq_power = struct.unpack("!H", hk_data[9:11]) - rx_agc_value = struct.unpack("!H", hk_data[11:13]) - rx_demod_eb = struct.unpack("!I", hk_data[13:17]) - rx_demod_n0 = struct.unpack("!I", hk_data[17:21]) - rx_data_rate = hk_data[21] - content_list = [ - rx_status, - rx_sensitivity, - rx_frequency_shift, - rx_iq_power, - rx_agc_value, - rx_demod_eb, - rx_demod_n0, - rx_data_rate, - ] - validity_buffer = hk_data[22:] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8) - -def handle_syrlinks_tx_registers_dataset( - printer: FsfwTmTcPrinter, - hk_data: bytes, -): - reply = HkReplyUnpacked() - header_list = ["TX Status", "TX Waveform", "TX AGC value"] - tx_status = hk_data[0] - tx_waveform = hk_data[1] - tx_agc_value = struct.unpack("!H", hk_data[2:4]) - content_list = [tx_status, tx_waveform, tx_agc_value] - validity_buffer = hk_data[4:] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3) - -def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes): - header_list = [ - "Init Err", - "Init Raw Mag X [nT]", - "Init Raw Mag Y [nT]", - "Init Raw Mag Z [nT]", - "Init Cal Mag X [nT]", - "Init Cal Mag Y [nT]", - "Init Cal Mag Z [nT]", - "Init Coil X Current [mA]", - "Init Coil Y Current [mA]", - "Init Coil Z Current [mA]", - "Init Coil X Temperature [°C]", - "Init Coil Y Temperature [°C]", - "Init Coil Z Temperature [°C]", - "Err", - "Raw Mag X [nT]", - "Raw Mag Y [nT]", - "Raw Mag Z [nT]", - "Cal Mag X [nT]", - "Cal Mag Y [nT]", - "Cal Mag Z [nT]", - "Coil X Current [mA]", - "Coil Y Current [mA]", - "Coil Z Current [mA]", - "Coil X Temperature [°C]", - "Coil Y Temperature [°C]", - "Coil Z Temperature [°C]", - "Fina Err", - "Fina Raw Mag X [nT]", - "Fina Raw Mag Y [nT]", - "Fina Raw Mag Z [nT]", - "Fina Cal Mag X [nT]", - "Fina Cal Mag Y [nT]", - "Fina Cal Mag Z [nT]", - "Fina Coil X Current [mA]", - "Fina Coil Y Current [mA]", - "Fina Coil Z Current [mA]", - "Fina Coil X Temperature [°C]", - "Fina Coil Y Temperature [°C]", - "Fina Coil Z Temperature [°C]", - ] - # INIT step (no coil actuation) - init_err = hk_data[0] - init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0] - init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0] - init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0] - init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0] - init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0] - init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0] - init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0] - init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0] - init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0] - init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0] - init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0] - init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0] - - # Actuation step - err = hk_data[43] - raw_mag_x = struct.unpack("!f", hk_data[44:48])[0] - raw_mag_y = struct.unpack("!f", hk_data[48:52])[0] - raw_mag_z = struct.unpack("!f", hk_data[52:56])[0] - cal_mag_x = struct.unpack("!f", hk_data[56:60])[0] - cal_mag_y = struct.unpack("!f", hk_data[60:64])[0] - cal_mag_z = struct.unpack("!f", hk_data[64:68])[0] - coil_x_current = struct.unpack("!f", hk_data[68:72])[0] - coil_y_current = struct.unpack("!f", hk_data[72:76])[0] - coil_z_current = struct.unpack("!f", hk_data[76:80])[0] - coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0] - coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0] - coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0] - - # FINA step (no coil actuation) - fina_err = hk_data[86] - fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0] - fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0] - fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0] - fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0] - fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0] - fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0] - fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0] - fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0] - fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0] - fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0] - fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0] - fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0] - - validity_buffer = hk_data[129:] - content_list = [ - init_err, - init_raw_mag_x, - init_raw_mag_y, - init_raw_mag_z, - init_cal_mag_x, - init_cal_mag_y, - init_cal_mag_z, - init_coil_x_current, - init_coil_y_current, - init_coil_z_current, - init_coil_x_temperature, - init_coil_y_temperature, - init_coil_z_temperature, - err, - raw_mag_x, - raw_mag_y, - raw_mag_z, - cal_mag_x, - cal_mag_y, - cal_mag_z, - coil_x_current, - coil_y_current, - coil_z_current, - coil_x_temperature, - coil_y_temperature, - coil_z_temperature, - fina_err, - fina_raw_mag_x, - fina_raw_mag_y, - fina_raw_mag_z, - fina_cal_mag_x, - fina_cal_mag_y, - fina_cal_mag_z, - fina_coil_x_current, - fina_coil_y_current, - fina_coil_z_current, - fina_coil_x_temperature, - fina_coil_y_temperature, - fina_coil_z_temperature, - ] - num_of_vars = len(header_list) - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) - -def handle_thermal_controller_hk_data(object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): - if set_id == 0: - LOGGER.info("Received Sensor Temperature data") - - # get all the floats - tm_data = struct.unpack("!ffffffffffffffff", hk_data[:16 * 4]) - parsed_data = {} - - # put them into an list with their names - parsed_data = [] - parsed_data.append({"SENSOR_PLOC_HEATSPREADER": tm_data[0]}) - parsed_data.append({"SENSOR_PLOC_MISSIONBOARD": tm_data[1]}) - parsed_data.append({"SENSOR_4K_CAMERA": tm_data[2]}) - parsed_data.append({"SENSOR_DAC_HEATSPREADER": tm_data[3]}) - parsed_data.append({"SENSOR_STARTRACKER": tm_data[4]}) - parsed_data.append({"SENSOR_RW1": tm_data[5]}) - parsed_data.append({"SENSOR_DRO": tm_data[6]}) - parsed_data.append({"SENSOR_SCEX": tm_data[7]}) - parsed_data.append({"SENSOR_X8": tm_data[8]}) - parsed_data.append({"SENSOR_HPA": tm_data[9]}) - parsed_data.append({"SENSOR_TX_MODUL": tm_data[10]}) - parsed_data.append({"SENSOR_MPA": tm_data[11]}) - parsed_data.append({"SENSOR_ACU": tm_data[12]}) - parsed_data.append({"SENSOR_PLPCDU_HEATSPREADER": tm_data[13]}) - parsed_data.append({"SENSOR_TCS_BOARD": tm_data[14]}) - parsed_data.append({"SENSOR_MAGNETTORQUER": tm_data[15]}) - - TM_TCP_SERVER.report_parsed_hk_data(object_id, set_id, parsed_data) - -def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes): - LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}") - reply = HkReplyUnpacked() - var_index = 0 - header_list = [ - "Latitude", - "Longitude", - "Altitude", - "Fix Mode", - "Sats in Use", - "Date", - "Unix Seconds", - ] - latitude = struct.unpack("!d", hk_data[0:8])[0] - longitude = struct.unpack("!d", hk_data[8:16])[0] - altitude = struct.unpack("!d", hk_data[16:24])[0] - fix_mode = hk_data[24] - sat_in_use = hk_data[25] - year = struct.unpack("!H", hk_data[26:28])[0] - month = hk_data[28] - day = hk_data[29] - hours = hk_data[30] - minutes = hk_data[31] - seconds = hk_data[32] - date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}" - unix_seconds = struct.unpack("!I", hk_data[33:37])[0] - content_list = [ - latitude, - longitude, - altitude, - fix_mode, - sat_in_use, - date_string, - unix_seconds, - ] - var_index += 13 - reply.num_of_vars = var_index - if not os.path.isfile("gps_log.txt"): - with open("gps_log.txt", "w") as gps_file: - gps_file.write( - "Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, " - "Date, Unix Seconds\n" - ) - with open("gps_log.txt", "a") as gps_file: - gps_file.write( - f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, " - f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n" - ) - validity_buffer = hk_data[37:39] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) - -def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): - if set_id == BpxSetIds.GET_HK_SET: - fmt_str = "!HHHHhhhhIB" - inc_len = struct.calcsize(fmt_str) - ( - charge_current, - discharge_current, - heater_current, - batt_voltage, - batt_temp_1, - batt_temp_2, - batt_temp_3, - batt_temp_4, - reboot_cntr, - boot_cause, - ) = struct.unpack(fmt_str, hk_data[0:inc_len]) - header_list = [ - "Charge Current", - "Discharge Current", - "Heater Current", - "Battery Voltage", - "Batt Temp 1", - "Batt Temp 2", - "Batt Temp 3", - "Batt Temp 4", - "Reboot Counter", - "Boot Cause", - ] - content_list = [ - charge_current, - discharge_current, - heater_current, - batt_voltage, - batt_temp_1, - batt_temp_2, - batt_temp_3, - batt_temp_4, - reboot_cntr, - boot_cause, - ] - validity_buffer = hk_data[inc_len:] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) - elif set_id == BpxSetIds.GET_CFG_SET: - battheat_mode = hk_data[0] - battheat_low = struct.unpack("!b", hk_data[1:2])[0] - battheat_high = struct.unpack("!b", hk_data[2:3])[0] - header_list = [ - "Battery Heater Mode", - "Battery Heater Low Limit", - "Battery Heater High Limit", - ] - content_list = [battheat_mode, battheat_low, battheat_high] - validity_buffer = hk_data[3:] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) - -def handle_core_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes): - - fmt_str = "!fffH" - inc_len = struct.calcsize(fmt_str) - (temperature, ps_voltage, pl_voltage, tx_agc_value) = struct.unpack( - fmt_str, hk_data[0: 0 + inc_len] - ) - printout = ( - f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | " - f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}" - ) - log_to_both(printer, printout) - printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4) - -P60_INDEX_LIST = [ - "ACU VCC", - "PDU1 VCC", - "X3 IDLE VCC", - "PDU2 VCC", - "ACU VBAT", - "PDU1 VBAT", - "X3 IDLE VBAT", - "PDU2 VBAT", - "STACK VBAT", - "STACK 3V3", - "STACK 5V", - "GS3V3", - "GS5V", -] - -WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"] - -PDU1_CHANNELS_NAMES = [ - "TCS Board", - "Syrlinks", - "Startracker", - "MGT", - "SUS Nominal", - "SCEX", - "PLOC", - "ACS A Side", - "Unused Channel 8", -] - -PDU2_CHANNELS_NAMES = [ - "Q7S", - "Payload PCDU CH1", - "RW", - "TCS Heater In", - "SUS Redundant", - "Deployment Mechanism", - "Payload PCDU CH6", - "ACS B Side", - "Payload Camera", -] - -PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES] - -class WdtInfo: - def __init__(self): - self.wdt_reboots_list = [] - self.time_pings_left_list = [] - - def print(self, printer: FsfwTmTcPrinter): - wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)" - log_to_both(printer, wdt_info) - for idx in range(len(self.wdt_reboots_list)): - log_to_both( - printer, - f"{TmHandler.WDT_LIST[idx].ljust(5)} | " - f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}", - ) - - def parse(self, wdt_data: bytes, current_idx: int) -> int: - priv_idx = 0 - self.wdt_reboots_list = [] - self.time_pings_left_list = [] - for idx in range(5): - self.wdt_reboots_list.append( - struct.unpack("!I", wdt_data[priv_idx: priv_idx + 4])[0] - ) - priv_idx += 4 - current_idx += 4 - for idx in range(3): - self.time_pings_left_list.append( - struct.unpack("!I", wdt_data[priv_idx: priv_idx + 4])[0] - ) - priv_idx += 4 - current_idx += 4 - for idx in range(2): - self.time_pings_left_list.append(wdt_data[priv_idx]) - current_idx += 1 - priv_idx += 1 - return current_idx - -def handle_pdu_data( - printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes -): - current_idx = 0 - priv_idx = pdu_idx - 1 - if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX: - fmt_str = "!hhBBBIIH" - inc_len = struct.calcsize(fmt_str) - ( - vcc, - vbat, - conv_enb_0, - conv_enb_1, - conv_enb_2, - boot_cause, - uptime, - reset_cause, - ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) - log_to_both(printer, f"VCC {vcc} mV | VBAT {vbat} mV") - log_to_both( - printer, f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]" - ) - log_to_both( - printer, - f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}", - ) - current_idx += inc_len - latchup_list = [] - log_to_both(printer, "Latchups") - for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): - latchup_list.append( - struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] - ) - content_line = ( - f"{TmHandler.PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}" - ) - log_to_both(printer, content_line) - current_idx += 2 - device_types = [] - for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): - device_types.append(hk_data[current_idx]) - current_idx += 1 - device_statuses = [] - for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): - device_statuses.append(hk_data[current_idx]) - current_idx += 1 - wdt = WdtInfo() - current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) - wdt.print(printer=printer) - if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE: - log_to_both(printer, f"Received PDU HK from PDU {pdu_idx}") - current_list = [] - for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): - current_list.append( - struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0] - ) - current_idx += 2 - voltage_list = [] - for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): - voltage_list.append( - struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] - ) - current_idx += 2 - output_enb_list = [] - for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): - output_enb_list.append(hk_data[current_idx]) - current_idx += 1 - header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" - print(header_str) - printer.file_logger.info(header_str) - for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): - out_enb = f"{output_enb_list[idx]}".ljust(6) - content_line = ( - f"{TmHandler.PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | " - f"{voltage_list[idx]:05} | {current_list[idx]:04}" - ) - log_to_both(printer, content_line) - fmt_str = "!IBh" - inc_len = struct.calcsize(fmt_str) - (boot_count, batt_mode, temperature) = struct.unpack( - fmt_str, hk_data[current_idx: current_idx + inc_len] - ) - info = ( - f"Boot Count {boot_count} | Battery Mode {batt_mode} | " - f"Temperature {temperature / 10.0}" - ) - log_to_both(printer, info) - -def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): - if set_id == SetIds.P60_CORE: - log_to_both(printer, "Received P60 Core HK. Voltages in mV, currents in mA") - current_idx = 0 - current_list = [] - for idx in range(13): - current_list.append( - struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0] - ) - current_idx += 2 - voltage_list = [] - for idx in range(13): - voltage_list.append( - struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] - ) - current_idx += 2 - out_enb_list = [] - for idx in range(13): - out_enb_list.append(hk_data[current_idx]) - current_idx += 1 - header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" - print(header_str) - printer.file_logger.info(header_str) - for idx in range(13): - out_enb = f"{out_enb_list[idx]}".ljust(6) - content_line = ( - f"{TmHandler.P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | " - f"{voltage_list[idx]:05} | {current_list[idx]:04}" - ) - log_to_both(printer, content_line) - fmt_str = "!IBhHhh" - inc_len = struct.calcsize(fmt_str) - ( - boot_count, - batt_mode, - batt_current, - batt_voltage, - temp_0, - temp_1, - ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) - current_idx += inc_len - batt_info = ( - f"Batt: Mode {batt_mode} | Boot Count {boot_count} | " - f"Charge current {batt_current} | Voltage {batt_voltage}" - ) - temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | " - log_to_both(printer, temps) - log_to_both(printer, batt_info) - printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9) - if set_id == SetIds.P60_AUX: - log_to_both(printer, "Received P60 AUX HK. Voltages in mV, currents in mA") - current_idx = 0 - latchup_list = [] - log_to_both(printer, "P60 Dock Latchups") - for idx in range(0, 13): - latchup_list.append( - struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] - ) - content_line = f"{TmHandler.P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}" - log_to_both(printer, content_line) - current_idx += 2 - fmt_str = "!IIHBBHHhhB" - inc_len = struct.calcsize(fmt_str) - ( - boot_cause, - uptime, - reset_cause, - heater_on, - conv_5v_on, - dock_vbat, - dock_vcc_c, - batt_temp_0, - batt_temp_1, - dearm_status, - ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) - current_idx += inc_len - wdt = WdtInfo() - current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) - fmt_str = "!hhbb" - inc_len = struct.calcsize(fmt_str) - ( - batt_charge_current, - batt_discharge_current, - ant6_depl, - ar6_depl, - ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) - current_idx += inc_len - device_types = [] - device_statuses = [] - for idx in range(8): - device_types.append(hk_data[current_idx]) - current_idx += 1 - for idx in range(8): - device_statuses.append(hk_data[current_idx]) - current_idx += 1 - util_info = ( - f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}" - ) - util_info_2 = ( - f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | " - f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}" - ) - log_to_both(printer, util_info) - log_to_both(printer, util_info_2) - wdt.print(printer) - misc_info = ( - f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}" - ) - log_to_both(printer, misc_info) - batt_info = ( - f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | " - f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}" - ) - log_to_both(printer, batt_info) - printer.print_validity_buffer( - validity_buffer=hk_data[current_idx:], num_vars=27 - ) diff --git a/pus_tm/system/__init__.py b/pus_tm/system/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pus_tm/system/core.py b/pus_tm/system/core.py new file mode 100644 index 0000000..1c2ae5d --- /dev/null +++ b/pus_tm/system/core.py @@ -0,0 +1,21 @@ +import struct + +from pus_tm.defs import PrintWrapper +from pus_tc.system.core import SetIds +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + + +def handle_core_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): + if set_id == SetIds.HK: + pw = PrintWrapper(printer) + fmt_str = "!fff" + inc_len = struct.calcsize(fmt_str) + (temperature, ps_voltage, pl_voltage) = struct.unpack( + fmt_str, hk_data[0 : 0 + inc_len] + ) + printout = ( + f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | " + f"PL Voltage [mV] {pl_voltage}" + ) + pw.dlog(printout) + printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4) diff --git a/pus_tm/system/tcs.py b/pus_tm/system/tcs.py new file mode 100644 index 0000000..315c17f --- /dev/null +++ b/pus_tm/system/tcs.py @@ -0,0 +1,39 @@ +import struct + +from pus_tm.defs import PrintWrapper +from pus_tm.hk_handling import TM_TCP_SERVER +from tmtccmd.utility import ObjectId +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + + +def handle_thermal_controller_hk_data( + object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes +): + if set_id == 0: + pw = PrintWrapper(printer) + pw.dlog("Received Sensor Temperature data") + + # get all the floats + tm_data = struct.unpack("!ffffffffffffffff", hk_data[: 16 * 4]) + + # put them into an list with their names + parsed_data = [ + {"SENSOR_PLOC_HEATSPREADER": tm_data[0]}, + {"SENSOR_PLOC_MISSIONBOARD": tm_data[1]}, + {"SENSOR_4K_CAMERA": tm_data[2]}, + {"SENSOR_DAC_HEATSPREADER": tm_data[3]}, + {"SENSOR_STARTRACKER": tm_data[4]}, + {"SENSOR_RW1": tm_data[5]}, + {"SENSOR_DRO": tm_data[6]}, + {"SENSOR_SCEX": tm_data[7]}, + {"SENSOR_X8": tm_data[8]}, + {"SENSOR_HPA": tm_data[9]}, + {"SENSOR_TX_MODUL": tm_data[10]}, + {"SENSOR_MPA": tm_data[11]}, + {"SENSOR_ACU": tm_data[12]}, + {"SENSOR_PLPCDU_HEATSPREADER": tm_data[13]}, + {"SENSOR_TCS_BOARD": tm_data[14]}, + {"SENSOR_MAGNETTORQUER": tm_data[15]}, + ] + + TM_TCP_SERVER.report_parsed_hk_data(object_id, set_id, parsed_data) diff --git a/pus_tm/tm_tcp_server.py b/pus_tm/tm_tcp_server.py index 4bf0138..f6d0e88 100644 --- a/pus_tm/tm_tcp_server.py +++ b/pus_tm/tm_tcp_server.py @@ -18,8 +18,7 @@ class TmTcpServer: _Instance = None - def __init__( - self): + def __init__(self): self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -65,7 +64,7 @@ class TmTcpServer: # dle encode the bytes # adding a newline because someone might want to look at it in a console - data_json_bytes = self.dle_encoder.encode(data_json_bytes + b'\n') + data_json_bytes = self.dle_encoder.encode(data_json_bytes + b"\n") try: sent_length = self.client_connection.send(data_json_bytes) @@ -76,27 +75,25 @@ class TmTcpServer: self.client_connection.close() self.client_connection = None - def report_raw_hk_data(self, object_id: ObjectId, - set_id: int, - hk_data: bytes): + def report_raw_hk_data(self, object_id: ObjectId, set_id: int, hk_data: bytes): - data_dict = {} - data_dict["type"] = "TM" - data_dict["tmType"] = "Raw HK" - data_dict["objectId"] = object_id.as_string - data_dict["setId"] = set_id - data_dict["rawData"] = base64.b64encode(hk_data).decode() + data_dict = { + "type": "TM", + "tmType": "Raw HK", + "objectId": object_id.as_string, + "setId": set_id, + "rawData": base64.b64encode(hk_data).decode(), + } self._send_dictionary_over_socket(data_dict) - def report_parsed_hk_data(self, object_id: ObjectId, - set_id: int, - data_dictionary): - data_dict = {} - data_dict["type"] = "TM" - data_dict["tmType"] = "Parsed HK" - data_dict["objectId"] = object_id.as_string - data_dict["setId"] = set_id - data_dict["content"] = data_dictionary + def report_parsed_hk_data(self, object_id: ObjectId, set_id: int, data_dictionary): + data_dict = { + "type": "TM", + "tmType": "Parsed HK", + "objectId": object_id.as_string, + "setId": set_id, + "content": data_dictionary, + } - self._send_dictionary_over_socket(data_dict) \ No newline at end of file + self._send_dictionary_over_socket(data_dict) diff --git a/tmtccmd b/tmtccmd index 862fdf2..4433284 160000 --- a/tmtccmd +++ b/tmtccmd @@ -1 +1 @@ -Subproject commit 862fdf23bc4a90ced47ee1c811de4237e3508536 +Subproject commit 443328422a03395bd51d072d3360de3397e1d88b diff --git a/tmtcgui.py b/tmtcgui.py index 6c8a1fd..22d5eed 100755 --- a/tmtcgui.py +++ b/tmtcgui.py @@ -10,3 +10,4 @@ def main(): if __name__ == "__main__": main() +c From fcb5e9b48cd843376745708e57b09bf51a792b36 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 19 May 2022 13:22:24 +0200 Subject: [PATCH 2/2] remove stray letter --- tmtcgui.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tmtcgui.py b/tmtcgui.py index 22d5eed..6c8a1fd 100755 --- a/tmtcgui.py +++ b/tmtcgui.py @@ -10,4 +10,3 @@ def main(): if __name__ == "__main__": main() -c