From d4ebb1b8aa8166fd43e32f0f0391b12dbda3b0aa Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 6 Apr 2022 17:01:01 +0200 Subject: [PATCH] update hk handling --- pus_tm/action_reply_handler.py | 26 +++----- pus_tm/hk_handling.py | 117 +++++++++++++++++---------------- 2 files changed, 70 insertions(+), 73 deletions(-) diff --git a/pus_tm/action_reply_handler.py b/pus_tm/action_reply_handler.py index 470f640..cee50e1 100644 --- a/pus_tm/action_reply_handler.py +++ b/pus_tm/action_reply_handler.py @@ -23,7 +23,9 @@ def handle_action_reply( object_id = obj_id_dict.get(tm_packet.source_object_id) custom_data = tm_packet.custom_data action_id = tm_packet.action_id - generic_print_str = printer.generic_action_packet_tm_print(packet=tm_packet, obj_id=object_id) + generic_print_str = printer.generic_action_packet_tm_print( + packet=tm_packet, obj_id=object_id + ) print(generic_print_str) printer.file_logger.info(generic_print_str) if object_id == IMTQ_HANDLER_ID: @@ -37,9 +39,7 @@ def handle_action_reply( def handle_imtq_replies( - action_id: int, - printer: FsfwTmTcPrinter, - custom_data: bytearray + action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray ): if action_id == struct.unpack("!I", ImtqActionIds.get_commanded_dipole)[0]: header_list = [ @@ -47,11 +47,7 @@ def handle_imtq_replies( "Commanded Y-Dipole", "Commanded Z-Dipole", ] - [ - x_dipole, - y_dipole, - z_dipole - ] = struct.unpack("!HHH", custom_data[0:6]) + [x_dipole, y_dipole, z_dipole] = struct.unpack("!HHH", custom_data[0:6]) content_list = [x_dipole, y_dipole, z_dipole] print(header_list) print(content_list) @@ -60,9 +56,7 @@ def handle_imtq_replies( def handle_ploc_replies( - action_id: int, - printer: FsfwTmTcPrinter, - custom_data: bytearray + action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray ): if action_id == PlocReplyIds.tm_mem_read_report: header_list = [ @@ -82,9 +76,7 @@ def handle_ploc_replies( def handle_supervisor_replies( - action_id: int, - printer: FsfwTmTcPrinter, - custom_data: bytearray + action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray ): reply = DataReplyUnpacked() if action_id == SupvActionIds.DUMP_MRAM: @@ -97,9 +89,7 @@ def handle_supervisor_replies( def handle_startracker_replies( - action_id: int, - printer: FsfwTmTcPrinter, - custom_data: bytearray + action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray ): if action_id == StarTrackerActionIds.CHECKSUM: if len(custom_data) != 5: diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index 0a62047..c2a3bbb 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -66,22 +66,22 @@ def handle_regular_hk_print( """This function is called when a Service 3 Housekeeping packet is received.""" if object_id == SYRLINKS_HANDLER_ID: if set_id == SetIds.RX_REGISTERS_DATASET: - return handle_syrlinks_rx_registers_dataset(hk_data) + return handle_syrlinks_rx_registers_dataset(printer, hk_data) elif set_id == SetIds.TX_REGISTERS_DATASET: - return handle_syrlinks_tx_registers_dataset(hk_data) + return handle_syrlinks_tx_registers_dataset(printer, hk_data) else: LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id") elif object_id == IMTQ_HANDLER_ID: if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and ( set_id <= ImtqSetIds.NEGATIVE_Z_TEST ): - return handle_self_test_data(hk_data) + return handle_self_test_data(printer, hk_data) else: LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id") elif object_id == GPS_HANDLER_0_ID or object_id == GPS_HANDLER_1_ID: - return handle_gps_data(hk_data=hk_data) + return handle_gps_data(printer=printer, hk_data=hk_data) elif object_id == BPX_HANDLER_ID: - return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id) + return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer) elif object_id == CORE_CONTROLLER_ID: return handle_core_hk_data(printer=printer, hk_data=hk_data) elif object_id == P60_DOCK_HANDLER: @@ -91,11 +91,9 @@ def handle_regular_hk_print( return HkReplyUnpacked() -def handle_syrlinks_rx_registers_dataset( - hk_data: bytes, -) -> HkReplyUnpacked: +def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes): reply = HkReplyUnpacked() - reply.header_list = [ + header_list = [ "RX Status", "RX Sensitivity", "RX Frequency Shift", @@ -113,7 +111,7 @@ def handle_syrlinks_rx_registers_dataset( rx_demod_eb = struct.unpack("!I", hk_data[13:17]) rx_demod_n0 = struct.unpack("!I", hk_data[17:21]) rx_data_rate = hk_data[21] - reply.content_list = [ + content_list = [ rx_status, rx_sensitivity, rx_frequency_shift, @@ -123,28 +121,30 @@ def handle_syrlinks_rx_registers_dataset( rx_demod_n0, rx_data_rate, ] - reply.validity_buffer = hk_data[22:] - reply.num_of_vars = 8 - return reply + validity_buffer = hk_data[22:] + log_to_both(printer, str(header_list)) + log_to_both(printer, str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8) def handle_syrlinks_tx_registers_dataset( + printer: FsfwTmTcPrinter, hk_data: bytes, -) -> HkReplyUnpacked: +): reply = HkReplyUnpacked() - reply.header_list = ["TX Status", "TX Waveform", "TX AGC value"] + header_list = ["TX Status", "TX Waveform", "TX AGC value"] tx_status = hk_data[0] tx_waveform = hk_data[1] tx_agc_value = struct.unpack("!H", hk_data[2:4]) - reply.content_list = [tx_status, tx_waveform, tx_agc_value] - reply.validity_buffer = hk_data[4:] - reply.num_of_vars = 3 - return reply + content_list = [tx_status, tx_waveform, tx_agc_value] + validity_buffer = hk_data[4:] + log_to_both(printer, str(header_list)) + log_to_both(printer, str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3) -def handle_self_test_data(hk_data: bytes) -> HkReplyUnpacked: - reply = HkReplyUnpacked() - reply.hk_header = [ +def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes): + header_list = [ "Init Err", "Init Raw Mag X [nT]", "Init Raw Mag Y [nT]", @@ -230,8 +230,8 @@ def handle_self_test_data(hk_data: bytes) -> HkReplyUnpacked: fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0] fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0] - reply.validity_buffer = hk_data[129:] - reply.content_list = [ + validity_buffer = hk_data[129:] + content_list = [ init_err, init_raw_mag_x, init_raw_mag_y, @@ -272,17 +272,17 @@ def handle_self_test_data(hk_data: bytes) -> HkReplyUnpacked: fina_coil_y_temperature, fina_coil_z_temperature, ] - reply.num_of_vars = len(reply.hk_header) - return reply + num_of_vars = len(header_list) + log_to_both(printer, str(header_list)) + log_to_both(printer, str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) -def handle_gps_data(hk_data: bytes) -> HkReplyUnpacked: +def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes): LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}") reply = HkReplyUnpacked() var_index = 0 - header_array = [] - content_array = [] - reply.header_list = [ + header_list = [ "Latitude", "Longitude", "Altitude", @@ -304,7 +304,7 @@ def handle_gps_data(hk_data: bytes) -> HkReplyUnpacked: seconds = hk_data[32] date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}" unix_seconds = struct.unpack("!I", hk_data[33:37])[0] - content_array = [ + content_list = [ latitude, longitude, altitude, @@ -326,27 +326,29 @@ def handle_gps_data(hk_data: bytes) -> HkReplyUnpacked: f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, " f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n" ) - reply.header_list = header_array - reply.content_list = content_array - reply.validity_buffer = hk_data[37:39] - return reply + validity_buffer = hk_data[37:39] + log_to_both(printer, str(header_list)) + log_to_both(printer, str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) -def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked: - LOGGER.info(f"Received BPX data, HK data length {len(hk_data)}") - reply = HkReplyUnpacked() +def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): if set_id == BpxSetIds.GET_HK_SET: - charge_current = struct.unpack("!H", hk_data[0:2])[0] - discharge_current = struct.unpack("!H", hk_data[2:4])[0] - heater_current = struct.unpack("!H", hk_data[4:6])[0] - batt_voltage = struct.unpack("!H", hk_data[6:8])[0] - batt_temp_1 = struct.unpack("!h", hk_data[8:10])[0] - batt_temp_2 = struct.unpack("!h", hk_data[10:12])[0] - batt_temp_3 = struct.unpack("!h", hk_data[12:14])[0] - batt_temp_4 = struct.unpack("!h", hk_data[14:16])[0] - reboot_cntr = struct.unpack("!I", hk_data[16:20])[0] - boot_cause = hk_data[20] - reply.header_list = [ + fmt_str = "!HHHHhhhhI" + inc_len = struct.calcsize(fmt_str) + ( + charge_current, + discharge_current, + heater_current, + batt_voltage, + batt_temp_1, + batt_temp_2, + batt_temp_3, + batt_temp_4, + reboot_cntr, + boot_cause, + ) = struct.unpack(fmt_str, hk_data[0:inc_len]) + header_list = [ "Charge Current", "Discharge Current", "Heater Current", @@ -358,7 +360,7 @@ def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked: "Reboot Counter", "Boot Cause", ] - reply.content_list = [ + content_list = [ charge_current, discharge_current, heater_current, @@ -370,19 +372,24 @@ def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked: reboot_cntr, boot_cause, ] - reply.validity_buffer = hk_data[21:] + validity_buffer = hk_data[inc_len:] + log_to_both(printer, str(header_list)) + log_to_both(printer, str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) elif set_id == BpxSetIds.GET_CFG_SET: battheat_mode = hk_data[0] battheat_low = struct.unpack("!b", hk_data[1:2])[0] battheat_high = struct.unpack("!b", hk_data[2:3])[0] - reply.header_list = [ + header_list = [ "Battery Heater Mode", "Battery Heater Low Limit", "Battery Heater High Limit", ] - reply.content_list = [battheat_mode, battheat_low, battheat_high] - reply.validity_buffer = hk_data[3:] - return reply + content_list = [battheat_mode, battheat_low, battheat_high] + validity_buffer = hk_data[3:] + log_to_both(printer, str(header_list)) + log_to_both(printer, str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) def handle_core_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):