hk handling working for both core and aux from pdus

This commit is contained in:
Robin Mueller 2022-04-12 16:35:34 +02:00
parent c995ca2dda
commit ecb973c37f
No known key found for this signature in database
GPG Key ID: 11D4952C8CCEF814
4 changed files with 118 additions and 30 deletions

View File

@ -335,7 +335,12 @@ def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE,
info="PDU1: Request HK once",
info=GsInfo.REQUEST_CORE_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
info=GsInfo.REQUEST_AUX_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
@ -442,7 +447,12 @@ def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE,
info="PDU2: Request HK once",
info=GsInfo.REQUEST_CORE_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
info=GsInfo.REQUEST_AUX_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,

View File

@ -5,7 +5,11 @@
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.service_3_housekeeping import generate_one_hk_command, make_sid
from tmtccmd.tc.service_3_housekeeping import (
generate_one_hk_command,
make_sid,
generate_one_diag_command,
)
from gomspace.gomspace_common import *
from gomspace.gomspace_pdu_definitions import *
from config.object_ids import PDU_1_HANDLER_ID
@ -213,6 +217,11 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
if op_code in GomspaceOpCodes.REQUEST_CORE_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, f"PDU1: {Info.REQUEST_CORE_HK_ONCE}"))
hk_sid = make_sid(object_id=PDU_1_HANDLER_ID, set_id=SetIds.PDU_1_CORE)
command = generate_one_diag_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.REQUEST_AUX_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, f"PDU1: {Info.REQUEST_AUX_HK_ONCE}"))
hk_sid = make_sid(object_id=PDU_1_HANDLER_ID, set_id=SetIds.PDU_1_AUX)
command = generate_one_hk_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:

View File

@ -8,7 +8,11 @@
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.service_3_housekeeping import generate_one_hk_command, make_sid
from tmtccmd.tc.service_3_housekeeping import (
generate_one_hk_command,
generate_one_diag_command,
make_sid,
)
from gomspace.gomspace_common import *
from gomspace.gomspace_pdu_definitions import *
from config.object_ids import PDU_2_HANDLER_ID
@ -235,6 +239,11 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
if op_code in GomspaceOpCodes.REQUEST_CORE_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, f"PDU2: {Info.REQUEST_CORE_HK_ONCE}"))
hk_sid = make_sid(object_id=PDU_2_HANDLER_ID, set_id=SetIds.PDU_2_CORE)
command = generate_one_diag_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.REQUEST_AUX_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, f"PDU2: {Info.REQUEST_AUX_HK_ONCE}"))
hk_sid = make_sid(object_id=PDU_2_HANDLER_ID, set_id=SetIds.PDU_2_AUX)
command = generate_one_hk_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:

View File

@ -456,16 +456,96 @@ PDU2_CHANNELS_NAMES = [
PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES]
class WdtInfo:
def __init__(self):
self.wdt_reboots_list = []
self.time_pings_left_list = []
def print(self, printer: FsfwTmTcPrinter):
wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
log_to_both(printer, wdt_info)
for idx in range(len(self.wdt_reboots_list)):
log_to_both(
printer,
f"{WDT_LIST[idx].ljust(5)} | "
f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}",
)
def parse(self, wdt_data: bytes, current_idx: int) -> int:
priv_idx = 0
self.wdt_reboots_list = []
self.time_pings_left_list = []
for idx in range(5):
self.wdt_reboots_list.append(
struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0]
)
priv_idx += 4
current_idx += 4
for idx in range(3):
self.time_pings_left_list.append(
struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0]
)
priv_idx += 4
current_idx += 4
for idx in range(2):
self.time_pings_left_list.append(wdt_data[priv_idx])
current_idx += 1
priv_idx += 1
return current_idx
def handle_pdu_data(
printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes
):
current_idx = 0
priv_idx = pdu_idx - 1
if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX:
log_to_both(printer, "PDU AUX HK TODO")
fmt_str = "!hhBBBIIH"
inc_len = struct.calcsize(fmt_str)
(
vcc,
vbat,
conv_enb_0,
conv_enb_1,
conv_enb_2,
boot_cause,
uptime,
reset_cause,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
log_to_both(printer, f"VCC {vcc} mV | VBAT {vbat} mV")
log_to_both(
printer, f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]"
)
log_to_both(
printer,
f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}",
)
current_idx += inc_len
latchup_list = []
log_to_both(printer, "Latchups")
for idx in range(len(PDU1_CHANNELS_NAMES)):
latchup_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
content_line = (
f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}"
)
log_to_both(printer, content_line)
current_idx += 2
device_types = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
device_types.append(hk_data[current_idx])
current_idx += 1
device_statuses = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
device_statuses.append(hk_data[current_idx])
current_idx += 1
wdt = WdtInfo()
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
wdt.print(printer=printer)
if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE:
log_to_both(printer, f"Received PDU HK from PDU {pdu_idx}")
current_list = []
current_idx = 0
priv_idx = pdu_idx - 1
for idx in range(len(PDU1_CHANNELS_NAMES)):
current_list.append(
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
@ -579,21 +659,8 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
dearm_status,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
wdt_reboots_list = []
for idx in range(5):
wdt_reboots_list.append(
struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
)
current_idx += 4
time_pings_left_list = []
for idx in range(3):
time_pings_left_list.append(
struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
)
current_idx += 4
for idx in range(2):
time_pings_left_list.append(hk_data[current_idx])
current_idx += 1
wdt = WdtInfo()
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
fmt_str = "!hhbb"
inc_len = struct.calcsize(fmt_str)
(
@ -620,14 +687,7 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
)
log_to_both(printer, util_info)
log_to_both(printer, util_info_2)
wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
log_to_both(printer, wdt_info)
for idx in range(len(wdt_reboots_list)):
log_to_both(
printer,
f"{WDT_LIST[idx].ljust(5)} | "
f"{wdt_reboots_list[idx]:010} | {time_pings_left_list[idx]:010}",
)
wdt.print(printer)
misc_info = (
f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}"
)