From efc8e3b4b2078d070a39efe5338ea89ba56cd248 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Sat, 27 Aug 2022 01:01:04 +0200 Subject: [PATCH] basic pdu config parsing --- tmtc/power/acu.py | 23 ++----------------- tmtc/power/tm.py | 58 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 21 deletions(-) diff --git a/tmtc/power/acu.py b/tmtc/power/acu.py index 5f130e0..7930c2d 100644 --- a/tmtc/power/acu.py +++ b/tmtc/power/acu.py @@ -5,7 +5,7 @@ """ from config.definitions import CustomServiceList -from tmtc.power.common_power import add_gomspace_cmds +from tmtc.power.common_power import add_gomspace_cmds, add_gomspace_cmd_defs from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry from tmtccmd.config.tmtc import tmtc_definitions_provider @@ -52,26 +52,7 @@ class Info: @tmtc_definitions_provider def add_acu_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() - oce.add( - keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, - info=GsInfo.REQUEST_CORE_HK_ONCE, - ) - oce.add( - keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, - info=GsInfo.REQUEST_AUX_HK_ONCE, - ) - oce.add( - keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, - info=GsInfo.REQUEST_AUX_HK_ONCE, - ) - oce.add( - keys=GomspaceOpCodes.GET_PARAM, - info=GsInfo.GET_PARAMETER, - ) - oce.add( - keys=GomspaceOpCodes.SET_PARAM, - info=GsInfo.SET_PARAMETER, - ) + add_gomspace_cmd_defs(oce) oce.add(keys=OpCodes.TEST, info=Info.TEST) defs.add_service( name=CustomServiceList.ACU.value, diff --git a/tmtc/power/tm.py b/tmtc/power/tm.py index 7407c85..bda8012 100644 --- a/tmtc/power/tm.py +++ b/tmtc/power/tm.py @@ -5,6 +5,12 @@ from tmtccmd.util import ObjectIdBase from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from pus_tm.defs import PrintWrapper from gomspace.gomspace_common import SetIds, GomspaceDeviceActionIds +from config.object_ids import ( + PDU_1_HANDLER_ID, + PDU_2_HANDLER_ID, + P60_DOCK_HANDLER, + ACU_HANDLER_ID, +) P60_INDEX_LIST = [ "ACU VCC", @@ -424,6 +430,9 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=8) +OBC_ENDIANNESS = "<" + + def handle_get_param_data_reply( obj_id: ObjectIdBase, action_id: int, pw: PrintWrapper, custom_data: bytearray ): @@ -449,3 +458,52 @@ def handle_get_param_data_reply( ] pw.dlog(f"{header_list}") pw.dlog(f"{content_list}") + elif action_id == GomspaceDeviceActionIds.REQUEST_CONFIG_TABLE: + print(f"Received config table with size {len(custom_data)} for object {obj_id}") + if obj_id.as_bytes == PDU_1_HANDLER_ID or obj_id.as_bytes == PDU_2_HANDLER_ID: + out_on_cnt = unpack_array_in_data(custom_data, 0x52, 2, 9, "H") + out_off_cnt = unpack_array_in_data(custom_data, 0x64, 2, 9, "H") + init_out_norm = unpack_array_in_data(custom_data, 0x76, 1, 9, "B") + init_out_safe = unpack_array_in_data(custom_data, 0x80, 1, 9, "B") + init_on_dly = unpack_array_in_data(custom_data, 0x8A, 2, 9, "H") + init_off_dly = unpack_array_in_data(custom_data, 0x9C, 2, 9, "H") + safe_off_dly = unpack_array_in_data(custom_data, 0xAE, 1, 9, "B") + batt_hwmax = struct.unpack( + f"{OBC_ENDIANNESS}H", custom_data[0x11C : 0x11C + 2] + )[0] + batt_max = struct.unpack( + f"{OBC_ENDIANNESS}H", custom_data[0x11E : 0x11E + 2] + )[0] + batt_norm = struct.unpack( + f"{OBC_ENDIANNESS}H", custom_data[0x120 : 0x120 + 2] + )[0] + batt_safe = struct.unpack( + f"{OBC_ENDIANNESS}H", custom_data[0x122 : 0x122 + 2] + )[0] + batt_crit = struct.unpack( + f"{OBC_ENDIANNESS}H", custom_data[0x124 : 0x124 + 2] + )[0] + pw.dlog(f"{'out_on_cnt'.ljust(15)}: {out_on_cnt}") + pw.dlog(f"{'out_off_cnt'.ljust(15)}: {out_off_cnt}") + pw.dlog(f"{'init_out_norm'.ljust(15)}: {init_out_norm}") + pw.dlog(f"{'init_out_safe'.ljust(15)}: {init_out_safe}") + pw.dlog(f"{'init_on_dly'.ljust(15)}: {init_on_dly}") + pw.dlog(f"{'init_off_dly'.ljust(15)}: {init_off_dly}") + pw.dlog(f"{'safe_off_dly'.ljust(15)}: {safe_off_dly}") + pw.dlog(f"{'batt_hwmax'.ljust(15)}: {batt_hwmax}") + pw.dlog(f"{'batt_max'.ljust(15)}: {batt_max}") + pw.dlog(f"{'batt_norm'.ljust(15)}: {batt_norm}") + pw.dlog(f"{'batt_safe'.ljust(15)}: {batt_safe}") + pw.dlog(f"{'batt_crit'.ljust(15)}: {batt_crit}") + + +def unpack_array_in_data( + data: bytes, start_addr: int, width: int, entries: int, struct_spec: str +) -> List: + return [ + struct.unpack( + f"{OBC_ENDIANNESS}{struct_spec}", + data[start_addr + (i * width) : start_addr + ((i + 1) * width)], + )[0] + for i in range(entries) + ]