import struct from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter from pus_tm.defs import PrintWrapper from gomspace.gomspace_common import SetIds P60_INDEX_LIST = [ "ACU VCC", "PDU1 VCC", "X3 IDLE VCC", "PDU2 VCC", "ACU VBAT", "PDU1 VBAT", "X3 IDLE VBAT", "PDU2 VBAT", "STACK VBAT", "STACK 3V3", "STACK 5V", "GS3V3", "GS5V", ] WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"] PDU1_CHANNELS_NAMES = [ "TCS Board", "Syrlinks", "Startracker", "MGT", "SUS Nominal", "SCEX", "PLOC", "ACS A Side", "Unused Channel 8", ] PDU2_CHANNELS_NAMES = [ "Q7S", "Payload PCDU CH1", "RW", "TCS Heater In", "SUS Redundant", "Deployment Mechanism", "Payload PCDU CH6", "ACS B Side", "Payload Camera", ] PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES] class WdtInfo: def __init__(self, pw: PrintWrapper): self.wdt_reboots_list = [] self.time_pings_left_list = [] self.pw = pw def print(self): wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)" self.pw.dlog(wdt_info) for idx in range(len(self.wdt_reboots_list)): self.pw.dlog( f"{WDT_LIST[idx].ljust(5)} | " f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}", ) def parse(self, wdt_data: bytes, current_idx: int) -> int: priv_idx = 0 self.wdt_reboots_list = [] self.time_pings_left_list = [] for idx in range(5): self.wdt_reboots_list.append( struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0] ) priv_idx += 4 current_idx += 4 for idx in range(3): self.time_pings_left_list.append( struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0] ) priv_idx += 4 current_idx += 4 for idx in range(2): self.time_pings_left_list.append(wdt_data[priv_idx]) current_idx += 1 priv_idx += 1 return current_idx def handle_pdu_data( printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes ): pw = PrintWrapper(printer=printer) current_idx = 0 priv_idx = pdu_idx - 1 if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX: fmt_str = "!hhBBBIIH" inc_len = struct.calcsize(fmt_str) ( vcc, vbat, conv_enb_0, conv_enb_1, conv_enb_2, boot_cause, uptime, reset_cause, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV") pw.dlog(f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]") pw.dlog( f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}", ) current_idx += inc_len latchup_list = [] pw.dlog("Latchups") for idx in range(len(PDU1_CHANNELS_NAMES)): latchup_list.append( struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] ) content_line = ( f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}" ) pw.dlog(content_line) current_idx += 2 device_types = [] for idx in range(len(PDU1_CHANNELS_NAMES)): device_types.append(hk_data[current_idx]) current_idx += 1 device_statuses = [] for idx in range(len(PDU1_CHANNELS_NAMES)): device_statuses.append(hk_data[current_idx]) current_idx += 1 wdt = WdtInfo(pw=pw) current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) wdt.print() if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE: pw.dlog(f"Received PDU HK from PDU {pdu_idx}") current_list = [] for idx in range(len(PDU1_CHANNELS_NAMES)): current_list.append( struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0] ) current_idx += 2 voltage_list = [] for idx in range(len(PDU1_CHANNELS_NAMES)): voltage_list.append( struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] ) current_idx += 2 output_enb_list = [] for idx in range(len(PDU1_CHANNELS_NAMES)): output_enb_list.append(hk_data[current_idx]) current_idx += 1 header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" print(header_str) printer.file_logger.info(header_str) for idx in range(len(PDU1_CHANNELS_NAMES)): out_enb = f"{output_enb_list[idx]}".ljust(6) content_line = ( f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | " f"{voltage_list[idx]:05} | {current_list[idx]:04}" ) pw.dlog(content_line) fmt_str = "!IBh" inc_len = struct.calcsize(fmt_str) (boot_count, batt_mode, temperature) = struct.unpack( fmt_str, hk_data[current_idx : current_idx + inc_len] ) info = ( f"Boot Count {boot_count} | Battery Mode {batt_mode} | " f"Temperature {temperature / 10.0}" ) pw.dlog(info) def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): pw = PrintWrapper(printer=printer) if set_id == SetIds.P60_CORE: pw.dlog("Received P60 Core HK. Voltages in mV, currents in mA") current_idx = 0 current_list = [] for idx in range(13): current_list.append( struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0] ) current_idx += 2 voltage_list = [] for idx in range(13): voltage_list.append( struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] ) current_idx += 2 out_enb_list = [] for idx in range(13): out_enb_list.append(hk_data[current_idx]) current_idx += 1 header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" print(header_str) printer.file_logger.info(header_str) for idx in range(13): out_enb = f"{out_enb_list[idx]}".ljust(6) content_line = ( f"{P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | " f"{voltage_list[idx]:05} | {current_list[idx]:04}" ) pw.dlog(content_line) fmt_str = "!IBhHhh" inc_len = struct.calcsize(fmt_str) ( boot_count, batt_mode, batt_current, batt_voltage, temp_0, temp_1, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len batt_info = ( f"Batt: Mode {batt_mode} | Boot Count {boot_count} | " f"Charge current {batt_current} | Voltage {batt_voltage}" ) temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | " pw.dlog(temps) pw.dlog(batt_info) printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9) if set_id == SetIds.P60_AUX: pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA") current_idx = 0 latchup_list = [] pw.dlog("P60 Dock Latchups") for idx in range(0, 13): latchup_list.append( struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] ) content_line = f"{P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}" pw.dlog(content_line) current_idx += 2 fmt_str = "!IIHBBHHhhB" inc_len = struct.calcsize(fmt_str) ( boot_cause, uptime, reset_cause, heater_on, conv_5v_on, dock_vbat, dock_vcc_c, batt_temp_0, batt_temp_1, dearm_status, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len wdt = WdtInfo(pw=pw) current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) fmt_str = "!hhbb" inc_len = struct.calcsize(fmt_str) ( batt_charge_current, batt_discharge_current, ant6_depl, ar6_depl, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len device_types = [] device_statuses = [] for idx in range(8): device_types.append(hk_data[current_idx]) current_idx += 1 for idx in range(8): device_statuses.append(hk_data[current_idx]) current_idx += 1 util_info = ( f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}" ) util_info_2 = ( f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | " f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}" ) pw.dlog(util_info) pw.dlog(util_info_2) wdt.print() misc_info = ( f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}" ) pw.dlog(misc_info) batt_info = ( f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | " f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}" ) pw.dlog(batt_info) printer.print_validity_buffer( validity_buffer=hk_data[current_idx:], num_vars=27 ) def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): pw = PrintWrapper(printer=printer) if set_id == SetIds.ACU_CORE: pw.dlog("Received ACU Core HK. Voltages in mV, currents in mA") pass if set_id == SetIds.ACU_AUX: pw.dlog("Received ACU Aux HK. Voltages in mV, currents in mA") pass