From 580b3818489a87f9ea1b31754c7b0285c2bb6a10 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 23 May 2022 12:38:50 +0200 Subject: [PATCH] completed ACU hk parsing --- pus_tc/cmd_definitions.py | 1 + pus_tm/devs/pcdu.py | 136 ++++++++++++++++++++++++++++++++------ 2 files changed, 115 insertions(+), 22 deletions(-) diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 5aa0075..8f812c7 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -1,4 +1,5 @@ from pus_tc.devs.gps import GpsOpCodes +from pus_tc.devs.pcdu import add_pcdu_cmds from tmtccmd.config import ( add_op_code_entry, add_service_op_code_entry, diff --git a/pus_tm/devs/pcdu.py b/pus_tm/devs/pcdu.py index e0ac54e..2134352 100644 --- a/pus_tm/devs/pcdu.py +++ b/pus_tm/devs/pcdu.py @@ -1,4 +1,5 @@ import struct +from typing import List, Tuple from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter from pus_tm.defs import PrintWrapper @@ -87,6 +88,26 @@ class WdtInfo: return current_idx +class DevicesInfoParser: + def __int__(self): + self.dev_types = [] + self.dev_statuses = [] + + def parse(self, hk_data: bytes, current_idx: int) -> int: + for idx in range(8): + self.dev_types.append(hk_data[current_idx]) + current_idx += 1 + for idx in range(8): + self.dev_statuses.append(hk_data[current_idx]) + current_idx += 1 + return current_idx + + def print(self, pw: PrintWrapper): + pw.dlog(f"Device Type | Device State (0:None | 1:OK | 3:ERROR | 4:NOT FOUND)") + for i in range(len(self.dev_types)): + pw.dlog(f"{self.dev_types} | {self.dev_statuses}") + + def handle_pdu_data( printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes ): @@ -123,17 +144,13 @@ def handle_pdu_data( ) pw.dlog(content_line) current_idx += 2 - device_types = [] - for idx in range(len(PDU1_CHANNELS_NAMES)): - device_types.append(hk_data[current_idx]) - current_idx += 1 - device_statuses = [] - for idx in range(len(PDU1_CHANNELS_NAMES)): - device_statuses.append(hk_data[current_idx]) - current_idx += 1 + dev_parser = DevicesInfoParser() + current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx) wdt = WdtInfo(pw=pw) current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) wdt.print() + pw.dlog(f"PDU Device Types: 0:FRAM|1:ADC|2:ADC|3:TempSens|4,5,6,7:Reserved") + dev_parser.print(pw=pw) if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE: pw.dlog(f"Received PDU HK from PDU {pdu_idx}") current_list = [] @@ -153,8 +170,7 @@ def handle_pdu_data( output_enb_list.append(hk_data[current_idx]) current_idx += 1 header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" - print(header_str) - printer.file_logger.info(header_str) + pw.dlog(header_str) for idx in range(len(PDU1_CHANNELS_NAMES)): out_enb = f"{output_enb_list[idx]}".ljust(6) content_line = ( @@ -196,8 +212,7 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): out_enb_list.append(hk_data[current_idx]) current_idx += 1 header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" - print(header_str) - printer.file_logger.info(header_str) + pw.dlog(header_str) for idx in range(13): out_enb = f"{out_enb_list[idx]}".ljust(6) content_line = ( @@ -262,14 +277,8 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): ar6_depl, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len - device_types = [] - device_statuses = [] - for idx in range(8): - device_types.append(hk_data[current_idx]) - current_idx += 1 - for idx in range(8): - device_statuses.append(hk_data[current_idx]) - current_idx += 1 + dev_parser = DevicesInfoParser() + current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx) util_info = ( f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}" ) @@ -289,16 +298,99 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}" ) pw.dlog(batt_info) + pw.dlog( + "P60 Dock Dev Types: 0:FRAM|1:ADC|2:ADC|3:ADC|4:TempSens|5:RTC|" + "6:TempSens(BatPack)|7:TempSens(BatPack)" + ) + dev_parser.print(pw=pw) printer.print_validity_buffer( validity_buffer=hk_data[current_idx:], num_vars=27 ) +def gen_six_entry_u16_list(hk_data: bytes, current_idx: int) -> Tuple[int, List[int]]: + u16_list = [] + for idx in range(6): + u16_list.append(hk_data[current_idx : current_idx + 2]) + current_idx += 2 + return current_idx, u16_list + + def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): pw = PrintWrapper(printer=printer) if set_id == SetIds.ACU_CORE: + current_idx = 0 + fmt_str = "!B" + inc_len = struct.calcsize(fmt_str) + mppt_mode = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + current_idx, currents = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + current_idx, voltages = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + vcc = hk_data[current_idx : current_idx + 2] + current_idx += 2 + vbat = hk_data[current_idx : current_idx + 2] + current_idx += 2 + current_idx, vboosts = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + current_idx, powers = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + fmt_str = "!HHHIIHH" + inc_len = struct.calcsize(fmt_str) + (tmp0, tmp1, tmp2, bootcnt, uptime, mppt_time, mppt_period) = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + ) + current_idx += inc_len pw.dlog("Received ACU Core HK. Voltages in mV, currents in mA") - pass + pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV | MPPT Mode {mppt_mode}") + header_str = f"Channel | Input U [mV] | Input I [mA] | U Boost [mV] | Power [?]" + pw.dlog(header_str) + for i in range(6): + pw.dlog(f"{i} | {voltages[i]} | {currents[i]} | {vboosts[i]} | {powers[i]}") + pw.dlog(f"Temperatures in C: Ch0 {tmp0} | Ch1 {tmp1} | Ch2 {tmp2}") + pw.dlog( + f"Boot Count {bootcnt} | Uptime {uptime} | " + f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec" + ) + printer.print_validity_buffer( + validity_buffer=hk_data[current_idx:], num_vars=12 + ) if set_id == SetIds.ACU_AUX: + current_idx = 0 + fmt_str = "!IBhHhh" + inc_len = struct.calcsize(fmt_str) + (dac_enb0, dac_enb1, dac_enb2) = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + ) + current_idx += inc_len + current_idx, dac_channels_raw = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + fmt_str = "!IHII" + inc_len = struct.calcsize(fmt_str) + (boot_cause, reset_cause, wdt_cnt_gnd, wdt_gnd_time_left) = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + ) + current_idx += inc_len + dev_parser = DevicesInfoParser() + current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx) pw.dlog("Received ACU Aux HK. Voltages in mV, currents in mA") - pass + pw.dlog( + f"DAC Enable States: DAC 0 {dac_enb0} | DAC 1 {dac_enb1} | DAC 2 {dac_enb2}" + ) + pw.dlog(f"Boot Cause {boot_cause} | Reset Cause {reset_cause}") + pw.dlog( + f"Ground WDT: Reboot Count {wdt_cnt_gnd} | Time Left {wdt_gnd_time_left} sec" + ) + + pw.dlog( + f"ACU Dev Types: 0:FRAM|1:ADC|2:ADC|3:DAC|4:DAC|" + f"5:DAC|6:TempSens|7:Reserved" + ) + dev_parser.print(pw=pw) + printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=8)