681 lines
24 KiB
Python
681 lines
24 KiB
Python
import dataclasses
|
|
import struct
|
|
import logging
|
|
import sqlite3
|
|
from typing import List, Tuple
|
|
from eive_tmtc.pus_tm.hk import HkTmInfo
|
|
|
|
from eive_tmtc.tmtc.power.acu import acu_config_table_handler
|
|
from eive_tmtc.tmtc.power.common_power import (
|
|
SetId,
|
|
unpack_array_in_data,
|
|
OBC_ENDIANNESS,
|
|
)
|
|
from eive_tmtc.tmtc.power.power import PcduSetIds
|
|
from tmtccmd.util import ObjectIdBase
|
|
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
|
from eive_tmtc.pus_tm.defs import PrintWrapper
|
|
from eive_tmtc.gomspace.gomspace_common import GomspaceDeviceActionId
|
|
from eive_tmtc.config.object_ids import (
|
|
PDU_1_HANDLER_ID,
|
|
PDU_2_HANDLER_ID,
|
|
P60_DOCK_HANDLER,
|
|
ACU_HANDLER_ID,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
P60_INDEX_LIST = [
|
|
"ACU VCC",
|
|
"PDU1 VCC",
|
|
"X3 IDLE VCC",
|
|
"PDU2 VCC",
|
|
"ACU VBAT",
|
|
"PDU1 VBAT",
|
|
"X3 IDLE VBAT",
|
|
"PDU2 VBAT",
|
|
"STACK VBAT",
|
|
"STACK 3V3",
|
|
"STACK 5V",
|
|
"GS3V3",
|
|
"GS5V",
|
|
]
|
|
|
|
WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"]
|
|
|
|
PDU1_CHANNELS_NAMES = [
|
|
"TCS Board",
|
|
"Syrlinks",
|
|
"Startracker",
|
|
"MGT",
|
|
"SUS Nominal",
|
|
"SCEX",
|
|
"PLOC",
|
|
"ACS A Side",
|
|
"Unused Channel 8",
|
|
]
|
|
|
|
PDU2_CHANNELS_NAMES = [
|
|
"Q7S",
|
|
"Payload PCDU CH1",
|
|
"RW",
|
|
"TCS Heater In",
|
|
"SUS Redundant",
|
|
"Deployment Mechanism",
|
|
"Payload PCDU CH6",
|
|
"ACS B Side",
|
|
"Payload Camera",
|
|
]
|
|
|
|
PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES]
|
|
|
|
|
|
class WdtInfo:
|
|
def __init__(self, pw: PrintWrapper):
|
|
self.wdt_reboots_list = []
|
|
self.time_pings_left_list = []
|
|
self.pw = pw
|
|
|
|
def print(self):
|
|
wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
|
|
self.pw.dlog(wdt_info)
|
|
for idx in range(len(self.wdt_reboots_list)):
|
|
self.pw.dlog(
|
|
f"{WDT_LIST[idx].ljust(5)} | {self.wdt_reboots_list[idx]:010} |"
|
|
f" {self.time_pings_left_list[idx]:010}",
|
|
)
|
|
|
|
def parse(self, wdt_data: bytes, current_idx: int) -> int:
|
|
priv_idx = 0
|
|
self.wdt_reboots_list = []
|
|
self.time_pings_left_list = []
|
|
for idx in range(5):
|
|
self.wdt_reboots_list.append(
|
|
struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0]
|
|
)
|
|
priv_idx += 4
|
|
current_idx += 4
|
|
for idx in range(3):
|
|
self.time_pings_left_list.append(
|
|
struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0]
|
|
)
|
|
priv_idx += 4
|
|
current_idx += 4
|
|
for idx in range(2):
|
|
self.time_pings_left_list.append(wdt_data[priv_idx])
|
|
current_idx += 1
|
|
priv_idx += 1
|
|
return current_idx
|
|
|
|
|
|
class DevicesInfoParser:
|
|
def __init__(self):
|
|
self.dev_types = None
|
|
self.dev_statuses = None
|
|
|
|
def parse(self, hk_data: bytes, current_idx: int) -> int:
|
|
self.dev_types = []
|
|
self.dev_statuses = []
|
|
for idx in range(8):
|
|
self.dev_types.append(hk_data[current_idx])
|
|
current_idx += 1
|
|
for idx in range(8):
|
|
self.dev_statuses.append(hk_data[current_idx])
|
|
current_idx += 1
|
|
return current_idx
|
|
|
|
def print(self, pw: PrintWrapper):
|
|
pw.dlog("Device Type | Device State (0:None | 1:OK | 3:ERROR | 4:NOT FOUND)")
|
|
for i in range(len(self.dev_types)):
|
|
pw.dlog(
|
|
f"{self.map_idx_to_type(self.dev_types[i])} | {self.dev_statuses[i]}"
|
|
)
|
|
|
|
@staticmethod
|
|
def map_idx_to_type(devtype: int) -> str:
|
|
if devtype == 0:
|
|
return "Reserved"
|
|
if devtype == 1:
|
|
return "ADC"
|
|
if devtype == 2:
|
|
return "ADC"
|
|
if devtype == 3:
|
|
return "DAC"
|
|
if devtype == 4:
|
|
return "Temperature Sensor"
|
|
if devtype == 5:
|
|
return "Temperature Sensor (Bat Pack)"
|
|
if devtype == 6:
|
|
return "RTC"
|
|
if devtype == 7:
|
|
return "FRAM"
|
|
return "Unknown Type"
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class PduData:
|
|
boot_count: int
|
|
batt_mode: int
|
|
temperature: float
|
|
vcc: int
|
|
vbat: int
|
|
out_enables: List[bool]
|
|
voltages: List[int]
|
|
currents: List[int]
|
|
|
|
|
|
def handle_pdu_data(
|
|
hk_data: bytes,
|
|
hk_info: HkTmInfo,
|
|
pw: PrintWrapper,
|
|
pdu_idx: int,
|
|
set_id: int,
|
|
):
|
|
current_idx = 0
|
|
priv_idx = pdu_idx - 1
|
|
if set_id == SetId.AUX or set_id == SetId.AUX:
|
|
fmt_str = "!BBBIIH"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(
|
|
conv_enb_0,
|
|
conv_enb_1,
|
|
conv_enb_2,
|
|
boot_cause,
|
|
uptime,
|
|
reset_cause,
|
|
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
|
pw.dlog(f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]")
|
|
pw.dlog(
|
|
f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}",
|
|
)
|
|
current_idx += inc_len
|
|
latchup_list = []
|
|
pw.dlog("Latchups")
|
|
for idx in range(len(PDU1_CHANNELS_NAMES)):
|
|
latchup_list.append(
|
|
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
|
|
)
|
|
content_line = (
|
|
f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}"
|
|
)
|
|
pw.dlog(content_line)
|
|
current_idx += 2
|
|
dev_parser = DevicesInfoParser()
|
|
current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx)
|
|
wdt = WdtInfo(pw=pw)
|
|
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
|
|
wdt.print()
|
|
pw.dlog("PDU Device Types: 0:FRAM|1:ADC|2:ADC|3:TempSens|4,5,6,7:Reserved")
|
|
dev_parser.print(pw=pw)
|
|
if set_id == SetId.CORE or set_id == SetId.CORE:
|
|
pw.dlog(f"Received PDU HK from PDU {pdu_idx}")
|
|
current_list = []
|
|
for idx in range(len(PDU1_CHANNELS_NAMES)):
|
|
current_list.append(
|
|
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
|
|
)
|
|
current_idx += 2
|
|
voltage_list = []
|
|
for idx in range(len(PDU1_CHANNELS_NAMES)):
|
|
voltage_list.append(
|
|
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
|
|
)
|
|
current_idx += 2
|
|
output_enb_list = []
|
|
for idx in range(len(PDU1_CHANNELS_NAMES)):
|
|
output_enb_list.append(hk_data[current_idx])
|
|
current_idx += 1
|
|
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
|
|
pw.dlog(header_str)
|
|
for idx in range(len(PDU1_CHANNELS_NAMES)):
|
|
out_enb = f"{output_enb_list[idx]}".ljust(6)
|
|
content_line = (
|
|
f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | "
|
|
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
|
|
)
|
|
pw.dlog(content_line)
|
|
fmt_str = "!IBfhh"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(boot_count, batt_mode, temperature, vcc, vbat) = struct.unpack(
|
|
fmt_str, hk_data[current_idx : current_idx + inc_len]
|
|
)
|
|
info = (
|
|
f"Boot Count {boot_count} | Battery Mode {batt_mode} | "
|
|
f"Temperature {temperature} | VCC {vcc} | VBAT {vbat}"
|
|
)
|
|
try:
|
|
handle_pdu_db_insertion(
|
|
hk_info,
|
|
pdu_idx,
|
|
PduData(
|
|
boot_count,
|
|
batt_mode,
|
|
temperature,
|
|
vcc,
|
|
vbat,
|
|
output_enb_list,
|
|
voltage_list,
|
|
current_list,
|
|
),
|
|
)
|
|
except sqlite3.OperationalError as e:
|
|
_LOGGER.warning(f"SQLite error {e}")
|
|
_LOGGER.info(info)
|
|
|
|
|
|
def handle_pdu_db_insertion(
|
|
hk_info: HkTmInfo,
|
|
pdu_idx: int,
|
|
pdu_data: PduData,
|
|
):
|
|
cursor = hk_info.db_con.cursor()
|
|
if pdu_idx == 1:
|
|
tbl_base_name = "pdu1"
|
|
channel_list = PDU1_CHANNELS_NAMES
|
|
else:
|
|
tbl_base_name = "pdu2"
|
|
channel_list = PDU2_CHANNELS_NAMES
|
|
cursor.execute(
|
|
f"""
|
|
CREATE TABLE IF NOT EXISTS {tbl_base_name}(
|
|
packet_uuid TEXT PRIMARY KEY,
|
|
generation_time TEXT,
|
|
boot_count NUM,
|
|
bat_mode NUM,
|
|
temp REAL,
|
|
vcc NUM,
|
|
vbat NUM
|
|
)"""
|
|
)
|
|
cursor.execute(
|
|
f"INSERT INTO {tbl_base_name} VALUES(?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
str(hk_info.packet_uuid),
|
|
hk_info.packet_datetime,
|
|
pdu_data.boot_count,
|
|
pdu_data.batt_mode,
|
|
pdu_data.temperature,
|
|
pdu_data.vcc,
|
|
pdu_data.vbat,
|
|
),
|
|
)
|
|
|
|
for idx, name in enumerate(channel_list):
|
|
words = name.split()
|
|
camel_case_name = "_".join(word.lower() for word in words)
|
|
tbl_name = f"{tbl_base_name}_{camel_case_name}"
|
|
cursor.execute(
|
|
f"""
|
|
CREATE TABLE IF NOT EXISTS {tbl_name}(
|
|
packet_uuid TEXT PRIMARY KEY,
|
|
generation_time TEXT,
|
|
out_enable NUM,
|
|
voltage NUM,
|
|
current NUM
|
|
)"""
|
|
)
|
|
value_tuple = (
|
|
str(hk_info.packet_uuid),
|
|
hk_info.packet_datetime,
|
|
pdu_data.out_enables[idx],
|
|
pdu_data.voltages[idx],
|
|
pdu_data.currents[idx],
|
|
)
|
|
cursor.execute(f"INSERT INTO {tbl_name} VALUES(?, ?, ?, ?, ?)", value_tuple)
|
|
hk_info.db_con.commit()
|
|
|
|
|
|
def handle_p60_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
|
if set_id == SetId.CORE:
|
|
pw.dlog("Received P60 Core HK. Voltages in mV, currents in mA")
|
|
current_idx = 0
|
|
current_list = []
|
|
for idx in range(13):
|
|
current_list.append(
|
|
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
|
|
)
|
|
current_idx += 2
|
|
voltage_list = []
|
|
for idx in range(13):
|
|
voltage_list.append(
|
|
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
|
|
)
|
|
current_idx += 2
|
|
out_enb_list = []
|
|
for idx in range(13):
|
|
out_enb_list.append(hk_data[current_idx])
|
|
current_idx += 1
|
|
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
|
|
pw.dlog(header_str)
|
|
for idx in range(13):
|
|
out_enb = f"{out_enb_list[idx]}".ljust(6)
|
|
content_line = (
|
|
f"{P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | "
|
|
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
|
|
)
|
|
pw.dlog(content_line)
|
|
fmt_str = "!IBhHff"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(
|
|
boot_count,
|
|
batt_mode,
|
|
batt_current,
|
|
batt_voltage,
|
|
temp_0,
|
|
temp_1,
|
|
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
|
current_idx += inc_len
|
|
batt_info = (
|
|
f"Batt: Mode {batt_mode} | Boot Count {boot_count} | "
|
|
f"Charge current {batt_current} | Voltage {batt_voltage}"
|
|
)
|
|
temps = f"In C: Temp 0 {temp_0} | Temp 1 {temp_1} | "
|
|
pw.dlog(temps)
|
|
pw.dlog(batt_info)
|
|
FsfwTmTcPrinter.get_validity_buffer(
|
|
validity_buffer=hk_data[current_idx:], num_vars=9
|
|
)
|
|
if set_id == SetId.AUX:
|
|
pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA")
|
|
current_idx = 0
|
|
latchup_list = []
|
|
pw.dlog("P60 Dock Latchups")
|
|
for idx in range(0, 13):
|
|
latchup_list.append(
|
|
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
|
|
)
|
|
content_line = f"{P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}"
|
|
pw.dlog(content_line)
|
|
current_idx += 2
|
|
fmt_str = "!IIHBBHHhhB"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(
|
|
boot_cause,
|
|
uptime,
|
|
reset_cause,
|
|
heater_on,
|
|
conv_5v_on,
|
|
dock_vbat,
|
|
dock_vcc_c,
|
|
batt_temp_0,
|
|
batt_temp_1,
|
|
dearm_status,
|
|
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
|
current_idx += inc_len
|
|
wdt = WdtInfo(pw=pw)
|
|
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
|
|
fmt_str = "!hhbb"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(
|
|
batt_charge_current,
|
|
batt_discharge_current,
|
|
ant6_depl,
|
|
ar6_depl,
|
|
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
|
current_idx += inc_len
|
|
dev_parser = DevicesInfoParser()
|
|
current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx)
|
|
util_info = (
|
|
f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}"
|
|
)
|
|
util_info_2 = (
|
|
f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | "
|
|
f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}"
|
|
)
|
|
pw.dlog(util_info)
|
|
pw.dlog(util_info_2)
|
|
wdt.print()
|
|
misc_info = (
|
|
f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}"
|
|
)
|
|
pw.dlog(misc_info)
|
|
batt_info = (
|
|
f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} |"
|
|
f" Charge Current {batt_charge_current} | Discharge Current"
|
|
f" {batt_discharge_current}"
|
|
)
|
|
pw.dlog(batt_info)
|
|
pw.dlog(
|
|
"P60 Dock Dev Types: 0:FRAM|1:ADC|2:ADC|3:ADC|4:TempSens|5:RTC|"
|
|
"6:TempSens(BatPack)|7:TempSens(BatPack)"
|
|
)
|
|
dev_parser.print(pw=pw)
|
|
FsfwTmTcPrinter.get_validity_buffer(
|
|
validity_buffer=hk_data[current_idx:], num_vars=27
|
|
)
|
|
|
|
|
|
def gen_six_entry_u16_list(hk_data: bytes, current_idx: int) -> Tuple[int, List[int]]:
|
|
u16_list = []
|
|
for idx in range(6):
|
|
u16_list.append(struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0])
|
|
current_idx += 2
|
|
return current_idx, u16_list
|
|
|
|
|
|
def handle_acu_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
|
if set_id == SetId.CORE:
|
|
mppt_mode = hk_data[0]
|
|
current_idx = 1
|
|
current_idx, currents = gen_six_entry_u16_list(
|
|
hk_data=hk_data, current_idx=current_idx
|
|
)
|
|
current_idx, voltages = gen_six_entry_u16_list(
|
|
hk_data=hk_data, current_idx=current_idx
|
|
)
|
|
vcc = struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
|
|
current_idx += 2
|
|
vbat = struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
|
|
current_idx += 2
|
|
current_idx, vboosts = gen_six_entry_u16_list(
|
|
hk_data=hk_data, current_idx=current_idx
|
|
)
|
|
current_idx, powers = gen_six_entry_u16_list(
|
|
hk_data=hk_data, current_idx=current_idx
|
|
)
|
|
fmt_str = "!fffIIHH"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(tmp0, tmp1, tmp2, bootcnt, uptime, mppt_time, mppt_period) = struct.unpack(
|
|
fmt_str, hk_data[current_idx : current_idx + inc_len]
|
|
)
|
|
current_idx += inc_len
|
|
pw.dlog("Received ACU Core HK. Voltages in mV, currents in mA")
|
|
pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV | MPPT Mode {mppt_mode}")
|
|
header_str = "Channel | Input U [mV] | Input I [mA] | U Boost [mV] | Power [mW]"
|
|
pw.dlog(header_str)
|
|
for i in range(6):
|
|
pw.dlog(
|
|
f"{i} | {str(voltages[i]).ljust(4)} | {str(currents[i]).ljust(4)} | "
|
|
f"{str(vboosts[i]).ljust(4)} | {str(powers[i]).ljust(2)}"
|
|
)
|
|
pw.dlog(f"Temperatures in C: Ch0 {tmp0} | Ch1 {tmp1} | Ch2 {tmp2}")
|
|
pw.dlog(
|
|
f"Boot Count {bootcnt} | Uptime {uptime} sec | "
|
|
f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec"
|
|
)
|
|
FsfwTmTcPrinter.get_validity_buffer(
|
|
validity_buffer=hk_data[current_idx:], num_vars=12
|
|
)
|
|
if set_id == SetId.AUX:
|
|
current_idx = 0
|
|
fmt_str = "!BBB"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
enb_tuple = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
|
(dac_enb0, dac_enb1, dac_enb2) = enb_tuple
|
|
dac_enb_str = ["on" if entry == 1 else "off" for entry in enb_tuple]
|
|
current_idx += inc_len
|
|
current_idx, dac_channels_raw = gen_six_entry_u16_list(
|
|
hk_data=hk_data, current_idx=current_idx
|
|
)
|
|
fmt_str = "!IHII"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(boot_cause, reset_cause, wdt_cnt_gnd, wdt_gnd_time_left) = struct.unpack(
|
|
fmt_str, hk_data[current_idx : current_idx + inc_len]
|
|
)
|
|
current_idx += inc_len
|
|
dev_parser = DevicesInfoParser()
|
|
current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx)
|
|
pw.dlog("Received ACU Aux HK. Voltages in mV, currents in mA")
|
|
|
|
pw.dlog(
|
|
f"DAC Enable States: DAC 0 {dac_enb_str[0]} | DAC 1 {dac_enb_str[1]} | "
|
|
f"DAC 2 {dac_enb_str[2]}"
|
|
)
|
|
pw.dlog(f"Boot Cause {boot_cause} | Reset Cause {reset_cause}")
|
|
pw.dlog(
|
|
f"Ground WDT: Reboot Count {wdt_cnt_gnd} | Time Left"
|
|
f" {wdt_gnd_time_left} sec"
|
|
)
|
|
|
|
pw.dlog(
|
|
"ACU Dev Types: 0:FRAM|1:ADC|2:ADC|3:DAC|4:DAC|5:DAC|6:TempSens|7:Reserved"
|
|
)
|
|
dev_parser.print(pw=pw)
|
|
FsfwTmTcPrinter.get_validity_buffer(
|
|
validity_buffer=hk_data[current_idx:], num_vars=8
|
|
)
|
|
|
|
|
|
def handle_get_param_data_reply(
|
|
obj_id: ObjectIdBase, action_id: int, pw: PrintWrapper, custom_data: bytearray
|
|
):
|
|
if action_id == GomspaceDeviceActionId.PARAM_GET:
|
|
pw.dlog(f"Parameter Get Request received for object {obj_id}")
|
|
header_list = [
|
|
"Gomspace Request Code",
|
|
"Table ID",
|
|
"Memory Address",
|
|
"Payload length",
|
|
"Payload",
|
|
]
|
|
fmt_str = "!BBHH"
|
|
(gs_request_code, table_id, address, payload_length) = struct.unpack(
|
|
fmt_str, custom_data[:6]
|
|
)
|
|
content_list = [
|
|
hex(gs_request_code),
|
|
table_id,
|
|
hex(address),
|
|
payload_length,
|
|
f"0x[{custom_data[6:].hex(sep=',')}]",
|
|
]
|
|
pw.dlog(f"{header_list}")
|
|
pw.dlog(f"{content_list}")
|
|
elif action_id == GomspaceDeviceActionId.REQUEST_CONFIG_TABLE:
|
|
print(f"Received config table with size {len(custom_data)} for object {obj_id}")
|
|
if obj_id.as_bytes == PDU_1_HANDLER_ID or obj_id.as_bytes == PDU_2_HANDLER_ID:
|
|
pdu_config_table_handler(pw, custom_data, obj_id)
|
|
elif obj_id.as_bytes == ACU_HANDLER_ID:
|
|
acu_config_table_handler(pw, custom_data)
|
|
elif obj_id.as_bytes == P60_DOCK_HANDLER:
|
|
p60_dock_config_table_handler(pw, custom_data)
|
|
|
|
|
|
def pdu_config_table_handler(
|
|
pw: PrintWrapper, custom_data: bytes, obj_id: ObjectIdBase
|
|
):
|
|
if obj_id.as_bytes == PDU_1_HANDLER_ID:
|
|
pw.dlog("[tcs, syrlinks, str, mgt, sus-n, scex, ploc, acs-a, unused]")
|
|
elif obj_id.as_bytes == PDU_2_HANDLER_ID:
|
|
pw.dlog(
|
|
"[obc, pl-pcdu-bat-nom, rw, heaters, sus-r, sa-depl, pl-pcdu-bat-red,"
|
|
" acs-b, pl-cam]"
|
|
)
|
|
out_on_cnt = unpack_array_in_data(custom_data, 0x52, 2, 9, "H")
|
|
out_off_cnt = unpack_array_in_data(custom_data, 0x64, 2, 9, "H")
|
|
init_out_norm = unpack_array_in_data(custom_data, 0x76, 1, 9, "B")
|
|
init_out_safe = unpack_array_in_data(custom_data, 0x80, 1, 9, "B")
|
|
init_on_dly = unpack_array_in_data(custom_data, 0x8A, 2, 9, "H")
|
|
init_off_dly = unpack_array_in_data(custom_data, 0x9C, 2, 9, "H")
|
|
safe_off_dly = unpack_array_in_data(custom_data, 0xAE, 1, 9, "B")
|
|
cur_lu_lim = unpack_array_in_data(custom_data, 0xB8, 2, 9, "H")
|
|
cur_lim = unpack_array_in_data(custom_data, 0xCA, 2, 9, "H")
|
|
cur_ema = unpack_array_in_data(custom_data, 0xDC, 2, 9, "H")
|
|
wdt_can_rst = custom_data[0x127]
|
|
wdt_can = struct.unpack(f"{OBC_ENDIANNESS}I", custom_data[0x12C : 0x12C + 4])[0]
|
|
batt_hwmax = struct.unpack(f"{OBC_ENDIANNESS}H", custom_data[0x11C : 0x11C + 2])[0]
|
|
batt_max = struct.unpack(f"{OBC_ENDIANNESS}H", custom_data[0x11E : 0x11E + 2])[0]
|
|
batt_norm = struct.unpack(f"{OBC_ENDIANNESS}H", custom_data[0x120 : 0x120 + 2])[0]
|
|
batt_safe = struct.unpack(f"{OBC_ENDIANNESS}H", custom_data[0x122 : 0x122 + 2])[0]
|
|
batt_crit = struct.unpack(f"{OBC_ENDIANNESS}H", custom_data[0x124 : 0x124 + 2])[0]
|
|
pw.dlog(f"{'out_on_cnt'.ljust(15)}: {out_on_cnt}")
|
|
pw.dlog(f"{'out_off_cnt'.ljust(15)}: {out_off_cnt}")
|
|
pw.dlog(f"{'init_out_norm'.ljust(15)}: {init_out_norm}")
|
|
pw.dlog(f"{'init_out_safe'.ljust(15)}: {init_out_safe}")
|
|
pw.dlog(f"{'init_on_dly'.ljust(15)}: {init_on_dly}")
|
|
pw.dlog(f"{'init_off_dly'.ljust(15)}: {init_off_dly}")
|
|
pw.dlog(f"{'safe_off_dly'.ljust(15)}: {safe_off_dly}")
|
|
pw.dlog(f"{'cur_lu_lim'.ljust(15)}: {cur_lu_lim}")
|
|
pw.dlog(f"{'cur_lim'.ljust(15)}: {cur_lim}")
|
|
pw.dlog(f"{'cur_ema'.ljust(15)}: {cur_ema}")
|
|
pw.dlog(f"{'batt_hwmax'.ljust(15)}: {batt_hwmax}")
|
|
pw.dlog(f"{'batt_max'.ljust(15)}: {batt_max}")
|
|
pw.dlog(f"{'batt_norm'.ljust(15)}: {batt_norm}")
|
|
pw.dlog(f"{'batt_safe'.ljust(15)}: {batt_safe}")
|
|
pw.dlog(f"{'batt_crit'.ljust(15)}: {batt_crit}")
|
|
pw.dlog(f"{'wdt_can_rst'.ljust(15)}: {wdt_can_rst}")
|
|
pw.dlog(f"{'wdt_can'.ljust(15)}: {wdt_can}")
|
|
|
|
|
|
def p60_dock_config_table_handler(pw: PrintWrapper, custom_data: bytes):
|
|
ch_names = parse_name_list(custom_data[0:0x68], 13)
|
|
out_on_cnt = unpack_array_in_data(custom_data, 0x76, 2, 13, "H")
|
|
out_off_cnt = unpack_array_in_data(custom_data, 0x90, 2, 13, "H")
|
|
init_out_norm = unpack_array_in_data(custom_data, 0xAA, 1, 13, "B")
|
|
init_out_safe = unpack_array_in_data(custom_data, 0xB7, 1, 13, "B")
|
|
init_on_dly = unpack_array_in_data(custom_data, 0xC4, 2, 13, "H")
|
|
init_off_dly = unpack_array_in_data(custom_data, 0xDE, 2, 13, "H")
|
|
acu_channel_addrs = unpack_array_in_data(custom_data, 0x180, 1, 2, "B")
|
|
pdu_channel_addrs = unpack_array_in_data(custom_data, 0x186, 1, 4, "B")
|
|
pw.dlog(f"Ch Names: {ch_names}")
|
|
pw.dlog(f"{'out_on_cnt'.ljust(15)}: {out_on_cnt}")
|
|
pw.dlog(f"{'out_off_cnt'.ljust(15)}: {out_off_cnt}")
|
|
pw.dlog(f"{'init_out_norm'.ljust(15)}: {init_out_norm}")
|
|
pw.dlog(f"{'init_out_safe'.ljust(15)}: {init_out_safe}")
|
|
pw.dlog(f"{'init_on_dly'.ljust(15)}: {init_on_dly}")
|
|
pw.dlog(f"{'init_off_dly'.ljust(15)}: {init_off_dly}")
|
|
pw.dlog(f"{'p60acu_addr'.ljust(15)}: {acu_channel_addrs}")
|
|
pw.dlog(f"{'p60pdu_addr'.ljust(15)}: {pdu_channel_addrs}")
|
|
|
|
|
|
def parse_name_list(data: bytes, name_len: int):
|
|
ch_list = []
|
|
idx = 0
|
|
while len(ch_list) < name_len:
|
|
next_byte = data[idx]
|
|
if next_byte != 0:
|
|
string_end_found = False
|
|
string_end_idx = idx
|
|
while not string_end_found:
|
|
string_end_idx += 1
|
|
if data[string_end_idx] == 0:
|
|
string_end_found = True
|
|
name = data[idx:string_end_idx].decode()
|
|
ch_list.append(name)
|
|
idx += len(name)
|
|
idx += 1
|
|
return ch_list
|
|
|
|
|
|
def handle_pcdu_hk(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
|
pw.dlog("Received PCDU HK")
|
|
if set_id == PcduSetIds.SWITCHER_SET:
|
|
current_idx = 0
|
|
pdu1_vals = [hk_data[i] for i in range(len(PDU1_CHANNELS_NAMES))]
|
|
current_idx += len(PDU1_CHANNELS_NAMES)
|
|
pdu2_vals = [hk_data[i + current_idx] for i in range(len(PDU2_CHANNELS_NAMES))]
|
|
current_idx += len(PDU2_CHANNELS_NAMES)
|
|
p60_stack_5v_val = hk_data[current_idx]
|
|
current_idx += 1
|
|
p60_stack_3v3_val = hk_data[current_idx]
|
|
current_idx += 1
|
|
pw.dlog("PDU1 Switcher States")
|
|
for name, val in zip(PDU1_CHANNELS_NAMES, pdu1_vals):
|
|
pw.dlog(f"{name.ljust(25)}: {val}")
|
|
pw.dlog("PDU2 Switcher States")
|
|
for name, val in zip(PDU2_CHANNELS_NAMES, pdu2_vals):
|
|
pw.dlog(f"{name.ljust(25)}: {val}")
|
|
pw.dlog(f"{'P60 Dock 5V Stack'.ljust(25)}: {p60_stack_5v_val}")
|
|
pw.dlog(f"{'P60 Dock 3V3 Stack'.ljust(25)}: {p60_stack_3v3_val}")
|
|
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 4)
|