eive-tmtc/pus_tm/devs/pcdu.py

426 lines
15 KiB
Python

import struct
from typing import List, Tuple
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from pus_tm.defs import PrintWrapper
from gomspace.gomspace_common import SetIds
P60_INDEX_LIST = [
"ACU VCC",
"PDU1 VCC",
"X3 IDLE VCC",
"PDU2 VCC",
"ACU VBAT",
"PDU1 VBAT",
"X3 IDLE VBAT",
"PDU2 VBAT",
"STACK VBAT",
"STACK 3V3",
"STACK 5V",
"GS3V3",
"GS5V",
]
WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"]
PDU1_CHANNELS_NAMES = [
"TCS Board",
"Syrlinks",
"Startracker",
"MGT",
"SUS Nominal",
"SCEX",
"PLOC",
"ACS A Side",
"Unused Channel 8",
]
PDU2_CHANNELS_NAMES = [
"Q7S",
"Payload PCDU CH1",
"RW",
"TCS Heater In",
"SUS Redundant",
"Deployment Mechanism",
"Payload PCDU CH6",
"ACS B Side",
"Payload Camera",
]
PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES]
class WdtInfo:
def __init__(self, pw: PrintWrapper):
self.wdt_reboots_list = []
self.time_pings_left_list = []
self.pw = pw
def print(self):
wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
self.pw.dlog(wdt_info)
for idx in range(len(self.wdt_reboots_list)):
self.pw.dlog(
f"{WDT_LIST[idx].ljust(5)} | "
f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}",
)
def parse(self, wdt_data: bytes, current_idx: int) -> int:
priv_idx = 0
self.wdt_reboots_list = []
self.time_pings_left_list = []
for idx in range(5):
self.wdt_reboots_list.append(
struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0]
)
priv_idx += 4
current_idx += 4
for idx in range(3):
self.time_pings_left_list.append(
struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0]
)
priv_idx += 4
current_idx += 4
for idx in range(2):
self.time_pings_left_list.append(wdt_data[priv_idx])
current_idx += 1
priv_idx += 1
return current_idx
class DevicesInfoParser:
def __init__(self):
self.dev_types = None
self.dev_statuses = None
def parse(self, hk_data: bytes, current_idx: int) -> int:
self.dev_types = []
self.dev_statuses = []
for idx in range(8):
self.dev_types.append(hk_data[current_idx])
current_idx += 1
for idx in range(8):
self.dev_statuses.append(hk_data[current_idx])
current_idx += 1
return current_idx
def print(self, pw: PrintWrapper):
pw.dlog(f"Device Type | Device State (0:None | 1:OK | 3:ERROR | 4:NOT FOUND)")
for i in range(len(self.dev_types)):
pw.dlog(
f"{self.map_idx_to_type(self.dev_types[i])} | {self.dev_statuses[i]}"
)
@staticmethod
def map_idx_to_type(devtype: int) -> str:
if devtype == 0:
return "Reserved"
if devtype == 1:
return "ADC"
if devtype == 2:
return "ADC"
if devtype == 3:
return "DAC"
if devtype == 4:
return "Temperature Sensor"
if devtype == 5:
return "Temperature Sensor (Bat Pack)"
if devtype == 6:
return "RTC"
if devtype == 7:
return "FRAM"
return "Unknown Type"
def handle_pdu_data(
printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes
):
pw = PrintWrapper(printer=printer)
current_idx = 0
priv_idx = pdu_idx - 1
if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX:
fmt_str = "!hhBBBIIH"
inc_len = struct.calcsize(fmt_str)
(
vcc,
vbat,
conv_enb_0,
conv_enb_1,
conv_enb_2,
boot_cause,
uptime,
reset_cause,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV")
pw.dlog(f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]")
pw.dlog(
f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}",
)
current_idx += inc_len
latchup_list = []
pw.dlog("Latchups")
for idx in range(len(PDU1_CHANNELS_NAMES)):
latchup_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
content_line = (
f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}"
)
pw.dlog(content_line)
current_idx += 2
dev_parser = DevicesInfoParser()
current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx)
wdt = WdtInfo(pw=pw)
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
wdt.print()
pw.dlog(f"PDU Device Types: 0:FRAM|1:ADC|2:ADC|3:TempSens|4,5,6,7:Reserved")
dev_parser.print(pw=pw)
if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE:
pw.dlog(f"Received PDU HK from PDU {pdu_idx}")
current_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
current_list.append(
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
voltage_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
voltage_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
output_enb_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
output_enb_list.append(hk_data[current_idx])
current_idx += 1
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
pw.dlog(header_str)
for idx in range(len(PDU1_CHANNELS_NAMES)):
out_enb = f"{output_enb_list[idx]}".ljust(6)
content_line = (
f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | "
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
)
pw.dlog(content_line)
fmt_str = "!IBf"
inc_len = struct.calcsize(fmt_str)
(boot_count, batt_mode, temperature) = struct.unpack(
fmt_str, hk_data[current_idx : current_idx + inc_len]
)
info = (
f"Boot Count {boot_count} | Battery Mode {batt_mode} | "
f"Temperature {temperature}"
)
pw.dlog(info)
def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer=printer)
if set_id == SetIds.P60_CORE:
pw.dlog("Received P60 Core HK. Voltages in mV, currents in mA")
current_idx = 0
current_list = []
for idx in range(13):
current_list.append(
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
voltage_list = []
for idx in range(13):
voltage_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
out_enb_list = []
for idx in range(13):
out_enb_list.append(hk_data[current_idx])
current_idx += 1
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
pw.dlog(header_str)
for idx in range(13):
out_enb = f"{out_enb_list[idx]}".ljust(6)
content_line = (
f"{P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | "
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
)
pw.dlog(content_line)
fmt_str = "!IBhHhh"
inc_len = struct.calcsize(fmt_str)
(
boot_count,
batt_mode,
batt_current,
batt_voltage,
temp_0,
temp_1,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
batt_info = (
f"Batt: Mode {batt_mode} | Boot Count {boot_count} | "
f"Charge current {batt_current} | Voltage {batt_voltage}"
)
temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | "
pw.dlog(temps)
pw.dlog(batt_info)
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9)
if set_id == SetIds.P60_AUX:
pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA")
current_idx = 0
latchup_list = []
pw.dlog("P60 Dock Latchups")
for idx in range(0, 13):
latchup_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
content_line = f"{P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}"
pw.dlog(content_line)
current_idx += 2
fmt_str = "!IIHBBHHhhB"
inc_len = struct.calcsize(fmt_str)
(
boot_cause,
uptime,
reset_cause,
heater_on,
conv_5v_on,
dock_vbat,
dock_vcc_c,
batt_temp_0,
batt_temp_1,
dearm_status,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
wdt = WdtInfo(pw=pw)
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
fmt_str = "!hhbb"
inc_len = struct.calcsize(fmt_str)
(
batt_charge_current,
batt_discharge_current,
ant6_depl,
ar6_depl,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
dev_parser = DevicesInfoParser()
current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx)
util_info = (
f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}"
)
util_info_2 = (
f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | "
f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}"
)
pw.dlog(util_info)
pw.dlog(util_info_2)
wdt.print()
misc_info = (
f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}"
)
pw.dlog(misc_info)
batt_info = (
f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | "
f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}"
)
pw.dlog(batt_info)
pw.dlog(
"P60 Dock Dev Types: 0:FRAM|1:ADC|2:ADC|3:ADC|4:TempSens|5:RTC|"
"6:TempSens(BatPack)|7:TempSens(BatPack)"
)
dev_parser.print(pw=pw)
printer.print_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=27
)
def gen_six_entry_u16_list(hk_data: bytes, current_idx: int) -> Tuple[int, List[int]]:
u16_list = []
for idx in range(6):
u16_list.append(struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0])
current_idx += 2
return current_idx, u16_list
def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer=printer)
if set_id == SetIds.ACU_CORE:
mppt_mode = hk_data[0]
current_idx = 1
current_idx, currents = gen_six_entry_u16_list(
hk_data=hk_data, current_idx=current_idx
)
current_idx, voltages = gen_six_entry_u16_list(
hk_data=hk_data, current_idx=current_idx
)
vcc = struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
current_idx += 2
vbat = struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
current_idx += 2
current_idx, vboosts = gen_six_entry_u16_list(
hk_data=hk_data, current_idx=current_idx
)
current_idx, powers = gen_six_entry_u16_list(
hk_data=hk_data, current_idx=current_idx
)
fmt_str = "!HHHIIHH"
inc_len = struct.calcsize(fmt_str)
(tmp0, tmp1, tmp2, bootcnt, uptime, mppt_time, mppt_period) = struct.unpack(
fmt_str, hk_data[current_idx : current_idx + inc_len]
)
current_idx += inc_len
pw.dlog("Received ACU Core HK. Voltages in mV, currents in mA")
pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV | MPPT Mode {mppt_mode}")
header_str = (
f"Channel | Input U [mV] | Input I [mA] | U Boost [mV] | Power [mW]"
)
pw.dlog(header_str)
for i in range(6):
pw.dlog(
f"{i} | {str(voltages[i]).ljust(4)} | {str(currents[i]).ljust(4)} | "
f"{str(vboosts[i]).ljust(4)} | {str(powers[i]).ljust(2)}"
)
pw.dlog(
f"Temperatures in C: Ch0 {tmp0/10.0} | Ch1 {tmp1/10.0} | Ch2 {tmp2/10.0}"
)
pw.dlog(
f"Boot Count {bootcnt} | Uptime {uptime} sec | "
f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec"
)
printer.print_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=12
)
if set_id == SetIds.ACU_AUX:
current_idx = 0
fmt_str = "!BBB"
inc_len = struct.calcsize(fmt_str)
enb_tuple = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
(dac_enb0, dac_enb1, dac_enb2) = enb_tuple
dac_enb_str = ["on" if entry == 1 else "off" for entry in enb_tuple]
current_idx += inc_len
current_idx, dac_channels_raw = gen_six_entry_u16_list(
hk_data=hk_data, current_idx=current_idx
)
fmt_str = "!IHII"
inc_len = struct.calcsize(fmt_str)
(boot_cause, reset_cause, wdt_cnt_gnd, wdt_gnd_time_left) = struct.unpack(
fmt_str, hk_data[current_idx : current_idx + inc_len]
)
current_idx += inc_len
dev_parser = DevicesInfoParser()
current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx)
pw.dlog("Received ACU Aux HK. Voltages in mV, currents in mA")
pw.dlog(
f"DAC Enable States: DAC 0 {dac_enb_str[0]} | DAC 1 {dac_enb_str[1]} | DAC 2 {dac_enb_str[2]}"
)
pw.dlog(f"Boot Cause {boot_cause} | Reset Cause {reset_cause}")
pw.dlog(
f"Ground WDT: Reboot Count {wdt_cnt_gnd} | Time Left {wdt_gnd_time_left} sec"
)
pw.dlog(
f"ACU Dev Types: 0:FRAM|1:ADC|2:ADC|3:DAC|4:DAC|"
f"5:DAC|6:TempSens|7:Reserved"
)
dev_parser.print(pw=pw)
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=8)