From ecd16da2f465fcfd821ee9e386db4ae322b81700 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 28 Apr 2022 19:43:16 +0200 Subject: [PATCH] continued RW HK handling --- pus_tc/devs/rws.py | 11 +++++ pus_tm/hk_handling.py | 107 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 108 insertions(+), 10 deletions(-) diff --git a/pus_tc/devs/rws.py b/pus_tc/devs/rws.py index b838c9f..44c51e9 100644 --- a/pus_tc/devs/rws.py +++ b/pus_tc/devs/rws.py @@ -1,5 +1,16 @@ from config.object_ids import RW1_ID, RW2_ID, RW3_ID, RW4_ID +class SetIds: + TEMP_SET = 8 + STATUS = 4 + LAST_RESET = 2 + TM_SET = 9 + + +class Info: + pass + + def pack_rw_cmds(op_code: str): pass diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index 0902f36..a21d35b 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -55,40 +55,42 @@ def handle_regular_hk_print( hk_packet: Service3Base, hk_data: bytes, ): - object_id = object_id.as_bytes + objb = object_id.as_bytes set_id = hk_packet.set_id """This function is called when a Service 3 Housekeeping packet is received.""" - if object_id == obj_ids.SYRLINKS_HANDLER_ID: + if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: + handle_rw_hk_data(printer, object_id, set_id, hk_data) + if objb == obj_ids.SYRLINKS_HANDLER_ID: if set_id == SetIds.RX_REGISTERS_DATASET: return handle_syrlinks_rx_registers_dataset(printer, hk_data) elif set_id == SetIds.TX_REGISTERS_DATASET: return handle_syrlinks_tx_registers_dataset(printer, hk_data) else: LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id") - elif object_id == obj_ids.IMTQ_HANDLER_ID: + if objb == obj_ids.IMTQ_HANDLER_ID: if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and ( set_id <= ImtqSetIds.NEGATIVE_Z_TEST ): return handle_self_test_data(printer, hk_data) else: LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id") - elif object_id == obj_ids.GPS_HANDLER_0_ID or object_id == obj_ids.GPS_HANDLER_1_ID: + if objb == obj_ids.GPS_HANDLER_0_ID or object_id == obj_ids.GPS_HANDLER_1_ID: handle_gps_data(printer=printer, hk_data=hk_data) - elif object_id == obj_ids.BPX_HANDLER_ID: + if objb == obj_ids.BPX_HANDLER_ID: handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer) - elif object_id == obj_ids.CORE_CONTROLLER_ID: + if objb == obj_ids.CORE_CONTROLLER_ID: return handle_core_hk_data(printer=printer, hk_data=hk_data) - elif object_id == obj_ids.PDU_1_HANDLER_ID: + if objb == obj_ids.PDU_1_HANDLER_ID: return handle_pdu_data( printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data ) - elif object_id == obj_ids.PDU_2_HANDLER_ID: + if objb == obj_ids.PDU_2_HANDLER_ID: return handle_pdu_data( printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data ) - elif object_id == obj_ids.P60_DOCK_HANDLER: + if objb == obj_ids.P60_DOCK_HANDLER: handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data) - elif object_id == obj_ids.PL_PCDU_ID: + if objb == obj_ids.PL_PCDU_ID: log_to_both(printer, "Received PL PCDU HK data") else: LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.") @@ -702,6 +704,91 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): ) +def handle_rw_hk_data( + printer: FsfwTmTcPrinter, object_id: ObjectId, set_id: int, hk_data: bytes +): + from pus_tc.devs.rws import SetIds + + current_idx = 0 + if set_id == SetIds.TEMP_SET: + log_to_both( + printer, + f"Received Temperature HK (ID {set_id}) from Reaction Wheel {object_id.name}", + ) + temp = struct.unpack("!I", hk_data[0:4]) + log_to_both(printer, f"Temperature {temp}") + current_idx += 4 + if set_id == SetIds.STATUS: + log_to_both( + printer, + f"Received Status HK (ID {set_id}) from Reaction Wheel {object_id.name}", + ) + fmt_str = "!iiBB" + inc_len = struct.calcsize(fmt_str) + (speed, ref_speed, state, clc_mode) = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + ) + current_idx += inc_len + log_to_both( + printer, + f"Speed {speed} rpm | Reference Speed {ref_speed} rpm | State {state} | CLC Mode {clc_mode}", + ) + if set_id == SetIds.LAST_RESET: + log_to_both( + printer, + f"Received Last Reset HK (ID {set_id}) from Reaction Wheel {object_id.name}", + ) + fmt_str = "!BB" + inc_len = struct.calcsize(fmt_str) + (last_reset_status, curr_reset_status) = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + ) + current_idx += inc_len + log_to_both( + printer, + f"Last Reset Status {last_reset_status} | Current Reset Status {curr_reset_status}", + ) + if set_id == SetIds.TM_SET: + log_to_both( + printer, + f"Received TM HK (ID {set_id}) from Reaction Wheel {object_id.name}", + ) + fmt_str = "!BiffBBiiIIIIIIIIIIIIIIII" + inc_len = struct.calcsize(fmt_str) + ( + last_reset_status, + mcu_temp, + pressure_sens_temp, + pressure, + state, + clc_mode, + current_speed, + ref_speed, + num_invalid_crc_packets, + num_invalid_len_packets, + num_invalid_cmd_packets, + num_of_cmd_executed_requests, + num_of_cmd_replies, + uart_num_of_bytes_written, + uart_num_of_bytes_read, + uart_num_parity_errors, + uart_num_noise_errors, + uart_num_frame_errors, + uart_num_reg_overrun_errors, + uart_total_num_errors, + spi_num_bytes_written, + spi_num_bytes_read, + spi_num_reg_overrun_errors, + spi_total_num_errors, + ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + pass + if current_idx > 0: + printer.print_validity_buffer( + validity_buffer=hk_data[current_idx:], num_vars=27 + ) + pass + + def log_to_both(printer: FsfwTmTcPrinter, string: str): print(string) printer.file_logger.info(string)