import struct import sqlite3 from typing import List, Optional, Tuple from eive_tmtc.tmtc.power.acu import acu_config_table_handler from eive_tmtc.tmtc.power.common_power import ( SetId, unpack_array_in_data, OBC_ENDIANNESS, ) from eive_tmtc.tmtc.power.power import PcduSetIds from tmtccmd.pus.tm.s3_fsfw_hk import Service3FsfwTm from tmtccmd.util import ObjectIdBase from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.gomspace.gomspace_common import GomspaceDeviceActionId from eive_tmtc.config.object_ids import ( PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, P60_DOCK_HANDLER, ACU_HANDLER_ID, ) P60_INDEX_LIST = [ "ACU VCC", "PDU1 VCC", "X3 IDLE VCC", "PDU2 VCC", "ACU VBAT", "PDU1 VBAT", "X3 IDLE VBAT", "PDU2 VBAT", "STACK VBAT", "STACK 3V3", "STACK 5V", "GS3V3", "GS5V", ] WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"] PDU1_CHANNELS_NAMES = [ "TCS Board", "Syrlinks", "Startracker", "MGT", "SUS Nominal", "SCEX", "PLOC", "ACS A Side", "Unused Channel 8", ] PDU2_CHANNELS_NAMES = [ "Q7S", "Payload PCDU CH1", "RW", "TCS Heater In", "SUS Redundant", "Deployment Mechanism", "Payload PCDU CH6", "ACS B Side", "Payload Camera", ] PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES] class WdtInfo: def __init__(self, pw: PrintWrapper): self.wdt_reboots_list = [] self.time_pings_left_list = [] self.pw = pw def print(self): wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)" self.pw.dlog(wdt_info) for idx in range(len(self.wdt_reboots_list)): self.pw.dlog( f"{WDT_LIST[idx].ljust(5)} | {self.wdt_reboots_list[idx]:010} |" f" {self.time_pings_left_list[idx]:010}", ) def parse(self, wdt_data: bytes, current_idx: int) -> int: priv_idx = 0 self.wdt_reboots_list = [] self.time_pings_left_list = [] for idx in range(5): self.wdt_reboots_list.append( struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0] ) priv_idx += 4 current_idx += 4 for idx in range(3): self.time_pings_left_list.append( struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0] ) priv_idx += 4 current_idx += 4 for idx in range(2): self.time_pings_left_list.append(wdt_data[priv_idx]) current_idx += 1 priv_idx += 1 return current_idx class DevicesInfoParser: def __init__(self): self.dev_types = None self.dev_statuses = None def parse(self, hk_data: bytes, current_idx: int) -> int: self.dev_types = [] self.dev_statuses = [] for idx in range(8): self.dev_types.append(hk_data[current_idx]) current_idx += 1 for idx in range(8): self.dev_statuses.append(hk_data[current_idx]) current_idx += 1 return current_idx def print(self, pw: PrintWrapper): pw.dlog("Device Type | Device State (0:None | 1:OK | 3:ERROR | 4:NOT FOUND)") for i in range(len(self.dev_types)): pw.dlog( f"{self.map_idx_to_type(self.dev_types[i])} | {self.dev_statuses[i]}" ) @staticmethod def map_idx_to_type(devtype: int) -> str: if devtype == 0: return "Reserved" if devtype == 1: return "ADC" if devtype == 2: return "ADC" if devtype == 3: return "DAC" if devtype == 4: return "Temperature Sensor" if devtype == 5: return "Temperature Sensor (Bat Pack)" if devtype == 6: return "RTC" if devtype == 7: return "FRAM" return "Unknown Type" def handle_pdu_data( hk_packet: Service3FsfwTm, con: Optional[sqlite3.Connection], pw: PrintWrapper, pdu_idx: int, set_id: int, hk_data: bytes, ): current_idx = 0 priv_idx = pdu_idx - 1 if set_id == SetId.AUX or set_id == SetId.AUX: fmt_str = "!BBBIIH" inc_len = struct.calcsize(fmt_str) ( conv_enb_0, conv_enb_1, conv_enb_2, boot_cause, uptime, reset_cause, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) pw.dlog(f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]") pw.dlog( f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}", ) current_idx += inc_len latchup_list = [] pw.dlog("Latchups") for idx in range(len(PDU1_CHANNELS_NAMES)): latchup_list.append( struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] ) content_line = ( f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}" ) pw.dlog(content_line) current_idx += 2 dev_parser = DevicesInfoParser() current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx) wdt = WdtInfo(pw=pw) current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) wdt.print() pw.dlog("PDU Device Types: 0:FRAM|1:ADC|2:ADC|3:TempSens|4,5,6,7:Reserved") dev_parser.print(pw=pw) if set_id == SetId.CORE or set_id == SetId.CORE: pw.dlog(f"Received PDU HK from PDU {pdu_idx}") current_list = [] for idx in range(len(PDU1_CHANNELS_NAMES)): current_list.append( struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0] ) current_idx += 2 voltage_list = [] for idx in range(len(PDU1_CHANNELS_NAMES)): voltage_list.append( struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] ) current_idx += 2 output_enb_list = [] for idx in range(len(PDU1_CHANNELS_NAMES)): output_enb_list.append(hk_data[current_idx]) current_idx += 1 header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" pw.dlog(header_str) for idx in range(len(PDU1_CHANNELS_NAMES)): out_enb = f"{output_enb_list[idx]}".ljust(6) content_line = ( f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | " f"{voltage_list[idx]:05} | {current_list[idx]:04}" ) pw.dlog(content_line) fmt_str = "!IBfhh" inc_len = struct.calcsize(fmt_str) (boot_count, batt_mode, temperature, vcc, vbat) = struct.unpack( fmt_str, hk_data[current_idx : current_idx + inc_len] ) info = ( f"Boot Count {boot_count} | Battery Mode {batt_mode} | " f"Temperature {temperature} | VCC {vcc} | VBAT {vbat}" ) if con is not None: cursor = con.cursor() if pdu_idx == 1: tbl_name = "Pdu1" channel_list = PDU1_CHANNELS_NAMES else: tbl_name = "Pdu2" channel_list = PDU2_CHANNELS_NAMES for idx, name in enumerate(channel_list): cursor.execute( f"CREATE TABLE {tbl_name}{name} IF NOT EXISTS " f"(GenerationTime, OutEnable, Voltage, Current)" ) value_list = [ hk_packet.pus_tm.time_provider.as_datetime(), # type: ignore output_enb_list[idx], voltage_list[idx], current_list[idx], ] cursor.execute( f"INSERT INTO {tbl_name}{name} VALUES(?, ?, ?, ?)", value_list ) pw.dlog(info) def handle_p60_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes): if set_id == SetId.CORE: pw.dlog("Received P60 Core HK. Voltages in mV, currents in mA") current_idx = 0 current_list = [] for idx in range(13): current_list.append( struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0] ) current_idx += 2 voltage_list = [] for idx in range(13): voltage_list.append( struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] ) current_idx += 2 out_enb_list = [] for idx in range(13): out_enb_list.append(hk_data[current_idx]) current_idx += 1 header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" pw.dlog(header_str) for idx in range(13): out_enb = f"{out_enb_list[idx]}".ljust(6) content_line = ( f"{P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | " f"{voltage_list[idx]:05} | {current_list[idx]:04}" ) pw.dlog(content_line) fmt_str = "!IBhHff" inc_len = struct.calcsize(fmt_str) ( boot_count, batt_mode, batt_current, batt_voltage, temp_0, temp_1, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len batt_info = ( f"Batt: Mode {batt_mode} | Boot Count {boot_count} | " f"Charge current {batt_current} | Voltage {batt_voltage}" ) temps = f"In C: Temp 0 {temp_0} | Temp 1 {temp_1} | " pw.dlog(temps) pw.dlog(batt_info) FsfwTmTcPrinter.get_validity_buffer( validity_buffer=hk_data[current_idx:], num_vars=9 ) if set_id == SetId.AUX: pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA") current_idx = 0 latchup_list = [] pw.dlog("P60 Dock Latchups") for idx in range(0, 13): latchup_list.append( struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] ) content_line = f"{P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}" pw.dlog(content_line) current_idx += 2 fmt_str = "!IIHBBHHhhB" inc_len = struct.calcsize(fmt_str) ( boot_cause, uptime, reset_cause, heater_on, conv_5v_on, dock_vbat, dock_vcc_c, batt_temp_0, batt_temp_1, dearm_status, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len wdt = WdtInfo(pw=pw) current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) fmt_str = "!hhbb" inc_len = struct.calcsize(fmt_str) ( batt_charge_current, batt_discharge_current, ant6_depl, ar6_depl, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len dev_parser = DevicesInfoParser() current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx) util_info = ( f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}" ) util_info_2 = ( f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | " f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}" ) pw.dlog(util_info) pw.dlog(util_info_2) wdt.print() misc_info = ( f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}" ) pw.dlog(misc_info) batt_info = ( f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} |" f" Charge Current {batt_charge_current} | Discharge Current" f" {batt_discharge_current}" ) pw.dlog(batt_info) pw.dlog( "P60 Dock Dev Types: 0:FRAM|1:ADC|2:ADC|3:ADC|4:TempSens|5:RTC|" "6:TempSens(BatPack)|7:TempSens(BatPack)" ) dev_parser.print(pw=pw) FsfwTmTcPrinter.get_validity_buffer( validity_buffer=hk_data[current_idx:], num_vars=27 ) def gen_six_entry_u16_list(hk_data: bytes, current_idx: int) -> Tuple[int, List[int]]: u16_list = [] for idx in range(6): u16_list.append(struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]) current_idx += 2 return current_idx, u16_list def handle_acu_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes): if set_id == SetId.CORE: mppt_mode = hk_data[0] current_idx = 1 current_idx, currents = gen_six_entry_u16_list( hk_data=hk_data, current_idx=current_idx ) current_idx, voltages = gen_six_entry_u16_list( hk_data=hk_data, current_idx=current_idx ) vcc = struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] current_idx += 2 vbat = struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] current_idx += 2 current_idx, vboosts = gen_six_entry_u16_list( hk_data=hk_data, current_idx=current_idx ) current_idx, powers = gen_six_entry_u16_list( hk_data=hk_data, current_idx=current_idx ) fmt_str = "!fffIIHH" inc_len = struct.calcsize(fmt_str) (tmp0, tmp1, tmp2, bootcnt, uptime, mppt_time, mppt_period) = struct.unpack( fmt_str, hk_data[current_idx : current_idx + inc_len] ) current_idx += inc_len pw.dlog("Received ACU Core HK. Voltages in mV, currents in mA") pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV | MPPT Mode {mppt_mode}") header_str = "Channel | Input U [mV] | Input I [mA] | U Boost [mV] | Power [mW]" pw.dlog(header_str) for i in range(6): pw.dlog( f"{i} | {str(voltages[i]).ljust(4)} | {str(currents[i]).ljust(4)} | " f"{str(vboosts[i]).ljust(4)} | {str(powers[i]).ljust(2)}" ) pw.dlog(f"Temperatures in C: Ch0 {tmp0} | Ch1 {tmp1} | Ch2 {tmp2}") pw.dlog( f"Boot Count {bootcnt} | Uptime {uptime} sec | " f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec" ) FsfwTmTcPrinter.get_validity_buffer( validity_buffer=hk_data[current_idx:], num_vars=12 ) if set_id == SetId.AUX: current_idx = 0 fmt_str = "!BBB" inc_len = struct.calcsize(fmt_str) enb_tuple = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) (dac_enb0, dac_enb1, dac_enb2) = enb_tuple dac_enb_str = ["on" if entry == 1 else "off" for entry in enb_tuple] current_idx += inc_len current_idx, dac_channels_raw = gen_six_entry_u16_list( hk_data=hk_data, current_idx=current_idx ) fmt_str = "!IHII" inc_len = struct.calcsize(fmt_str) (boot_cause, reset_cause, wdt_cnt_gnd, wdt_gnd_time_left) = struct.unpack( fmt_str, hk_data[current_idx : current_idx + inc_len] ) current_idx += inc_len dev_parser = DevicesInfoParser() current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx) pw.dlog("Received ACU Aux HK. Voltages in mV, currents in mA") pw.dlog( f"DAC Enable States: DAC 0 {dac_enb_str[0]} | DAC 1 {dac_enb_str[1]} | " f"DAC 2 {dac_enb_str[2]}" ) pw.dlog(f"Boot Cause {boot_cause} | Reset Cause {reset_cause}") pw.dlog( f"Ground WDT: Reboot Count {wdt_cnt_gnd} | Time Left" f" {wdt_gnd_time_left} sec" ) pw.dlog( "ACU Dev Types: 0:FRAM|1:ADC|2:ADC|3:DAC|4:DAC|5:DAC|6:TempSens|7:Reserved" ) dev_parser.print(pw=pw) FsfwTmTcPrinter.get_validity_buffer( validity_buffer=hk_data[current_idx:], num_vars=8 ) def handle_get_param_data_reply( obj_id: ObjectIdBase, action_id: int, pw: PrintWrapper, custom_data: bytearray ): if action_id == GomspaceDeviceActionId.PARAM_GET: pw.dlog(f"Parameter Get Request received for object {obj_id}") header_list = [ "Gomspace Request Code", "Table ID", "Memory Address", "Payload length", "Payload", ] fmt_str = "!BBHH" (gs_request_code, table_id, address, payload_length) = struct.unpack( fmt_str, custom_data[:6] ) content_list = [ hex(gs_request_code), table_id, hex(address), payload_length, f"0x[{custom_data[6:].hex(sep=',')}]", ] pw.dlog(f"{header_list}") pw.dlog(f"{content_list}") elif action_id == GomspaceDeviceActionId.REQUEST_CONFIG_TABLE: print(f"Received config table with size {len(custom_data)} for object {obj_id}") if obj_id.as_bytes == PDU_1_HANDLER_ID or obj_id.as_bytes == PDU_2_HANDLER_ID: pdu_config_table_handler(pw, custom_data, obj_id) elif obj_id.as_bytes == ACU_HANDLER_ID: acu_config_table_handler(pw, custom_data) elif obj_id.as_bytes == P60_DOCK_HANDLER: p60_dock_config_table_handler(pw, custom_data) def pdu_config_table_handler( pw: PrintWrapper, custom_data: bytes, obj_id: ObjectIdBase ): if obj_id.as_bytes == PDU_1_HANDLER_ID: pw.dlog("[tcs, syrlinks, str, mgt, sus-n, scex, ploc, acs-a, unused]") elif obj_id.as_bytes == PDU_2_HANDLER_ID: pw.dlog( "[obc, pl-pcdu-bat-nom, rw, heaters, sus-r, sa-depl, pl-pcdu-bat-red," " acs-b, pl-cam]" ) out_on_cnt = unpack_array_in_data(custom_data, 0x52, 2, 9, "H") out_off_cnt = unpack_array_in_data(custom_data, 0x64, 2, 9, "H") init_out_norm = unpack_array_in_data(custom_data, 0x76, 1, 9, "B") init_out_safe = unpack_array_in_data(custom_data, 0x80, 1, 9, "B") init_on_dly = unpack_array_in_data(custom_data, 0x8A, 2, 9, "H") init_off_dly = unpack_array_in_data(custom_data, 0x9C, 2, 9, "H") safe_off_dly = unpack_array_in_data(custom_data, 0xAE, 1, 9, "B") cur_lu_lim = unpack_array_in_data(custom_data, 0xB8, 2, 9, "H") cur_lim = unpack_array_in_data(custom_data, 0xCA, 2, 9, "H") cur_ema = unpack_array_in_data(custom_data, 0xDC, 2, 9, "H") wdt_can_rst = custom_data[0x127] wdt_can = struct.unpack(f"{OBC_ENDIANNESS}I", custom_data[0x12C : 0x12C + 4])[0] batt_hwmax = struct.unpack(f"{OBC_ENDIANNESS}H", custom_data[0x11C : 0x11C + 2])[0] batt_max = struct.unpack(f"{OBC_ENDIANNESS}H", custom_data[0x11E : 0x11E + 2])[0] batt_norm = struct.unpack(f"{OBC_ENDIANNESS}H", custom_data[0x120 : 0x120 + 2])[0] batt_safe = struct.unpack(f"{OBC_ENDIANNESS}H", custom_data[0x122 : 0x122 + 2])[0] batt_crit = struct.unpack(f"{OBC_ENDIANNESS}H", custom_data[0x124 : 0x124 + 2])[0] pw.dlog(f"{'out_on_cnt'.ljust(15)}: {out_on_cnt}") pw.dlog(f"{'out_off_cnt'.ljust(15)}: {out_off_cnt}") pw.dlog(f"{'init_out_norm'.ljust(15)}: {init_out_norm}") pw.dlog(f"{'init_out_safe'.ljust(15)}: {init_out_safe}") pw.dlog(f"{'init_on_dly'.ljust(15)}: {init_on_dly}") pw.dlog(f"{'init_off_dly'.ljust(15)}: {init_off_dly}") pw.dlog(f"{'safe_off_dly'.ljust(15)}: {safe_off_dly}") pw.dlog(f"{'cur_lu_lim'.ljust(15)}: {cur_lu_lim}") pw.dlog(f"{'cur_lim'.ljust(15)}: {cur_lim}") pw.dlog(f"{'cur_ema'.ljust(15)}: {cur_ema}") pw.dlog(f"{'batt_hwmax'.ljust(15)}: {batt_hwmax}") pw.dlog(f"{'batt_max'.ljust(15)}: {batt_max}") pw.dlog(f"{'batt_norm'.ljust(15)}: {batt_norm}") pw.dlog(f"{'batt_safe'.ljust(15)}: {batt_safe}") pw.dlog(f"{'batt_crit'.ljust(15)}: {batt_crit}") pw.dlog(f"{'wdt_can_rst'.ljust(15)}: {wdt_can_rst}") pw.dlog(f"{'wdt_can'.ljust(15)}: {wdt_can}") def p60_dock_config_table_handler(pw: PrintWrapper, custom_data: bytes): ch_names = parse_name_list(custom_data[0:0x68], 13) out_on_cnt = unpack_array_in_data(custom_data, 0x76, 2, 13, "H") out_off_cnt = unpack_array_in_data(custom_data, 0x90, 2, 13, "H") init_out_norm = unpack_array_in_data(custom_data, 0xAA, 1, 13, "B") init_out_safe = unpack_array_in_data(custom_data, 0xB7, 1, 13, "B") init_on_dly = unpack_array_in_data(custom_data, 0xC4, 2, 13, "H") init_off_dly = unpack_array_in_data(custom_data, 0xDE, 2, 13, "H") acu_channel_addrs = unpack_array_in_data(custom_data, 0x180, 1, 2, "B") pdu_channel_addrs = unpack_array_in_data(custom_data, 0x186, 1, 4, "B") pw.dlog(f"Ch Names: {ch_names}") pw.dlog(f"{'out_on_cnt'.ljust(15)}: {out_on_cnt}") pw.dlog(f"{'out_off_cnt'.ljust(15)}: {out_off_cnt}") pw.dlog(f"{'init_out_norm'.ljust(15)}: {init_out_norm}") pw.dlog(f"{'init_out_safe'.ljust(15)}: {init_out_safe}") pw.dlog(f"{'init_on_dly'.ljust(15)}: {init_on_dly}") pw.dlog(f"{'init_off_dly'.ljust(15)}: {init_off_dly}") pw.dlog(f"{'p60acu_addr'.ljust(15)}: {acu_channel_addrs}") pw.dlog(f"{'p60pdu_addr'.ljust(15)}: {pdu_channel_addrs}") def parse_name_list(data: bytes, name_len: int): ch_list = [] idx = 0 while len(ch_list) < name_len: next_byte = data[idx] if next_byte != 0: string_end_found = False string_end_idx = idx while not string_end_found: string_end_idx += 1 if data[string_end_idx] == 0: string_end_found = True name = data[idx:string_end_idx].decode() ch_list.append(name) idx += len(name) idx += 1 return ch_list def handle_pcdu_hk(pw: PrintWrapper, set_id: int, hk_data: bytes): pw.dlog("Received PCDU HK") if set_id == PcduSetIds.SWITCHER_SET: current_idx = 0 pdu1_vals = [hk_data[i] for i in range(len(PDU1_CHANNELS_NAMES))] current_idx += len(PDU1_CHANNELS_NAMES) pdu2_vals = [hk_data[i + current_idx] for i in range(len(PDU2_CHANNELS_NAMES))] current_idx += len(PDU2_CHANNELS_NAMES) p60_stack_5v_val = hk_data[current_idx] current_idx += 1 p60_stack_3v3_val = hk_data[current_idx] current_idx += 1 pw.dlog("PDU1 Switcher States") for name, val in zip(PDU1_CHANNELS_NAMES, pdu1_vals): pw.dlog(f"{name.ljust(25)}: {val}") pw.dlog("PDU2 Switcher States") for name, val in zip(PDU2_CHANNELS_NAMES, pdu2_vals): pw.dlog(f"{name.ljust(25)}: {val}") pw.dlog(f"{'P60 Dock 5V Stack'.ljust(25)}: {p60_stack_5v_val}") pw.dlog(f"{'P60 Dock 3V3 Stack'.ljust(25)}: {p60_stack_3v3_val}") FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 4)