rework it

This commit is contained in:
Robin Müller 2023-05-23 09:54:51 +02:00
parent 6182369e4f
commit aab093cc0a
Signed by: muellerr
GPG Key ID: A649FB78196E3849
17 changed files with 106 additions and 94 deletions

View File

@ -1,19 +1,27 @@
import logging import logging
from typing import Optional
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
class PrintWrapper: class PrintWrapper:
def __init__(self, printer: FsfwTmTcPrinter): def __init__(self, file_logger: Optional[logging.Logger]):
self.printer = printer self.file_logger = file_logger
def dlog(self, string: str): def dlog(self, string: str):
print(string) print(string)
self.printer.file_logger.info(string) if self.file_logger:
self.file_logger.info(string)
def wlog(self, logger: logging.Logger, string: str):
logger.warning(string)
if self.file_logger:
self.file_logger.warning(string)
def ilog(self, logger: logging.Logger, string: str): def ilog(self, logger: logging.Logger, string: str):
logger.info(string) logger.info(string)
self.printer.file_logger.info(string) if self.file_logger:
self.file_logger.info(string)
def log_to_both(printer: FsfwTmTcPrinter, string: str): def log_to_both(printer: FsfwTmTcPrinter, string: str):

View File

@ -34,7 +34,7 @@ from eive_tmtc.tmtc.power.tm import (
from eive_tmtc.tmtc.acs.imtq import ( from eive_tmtc.tmtc.acs.imtq import (
handle_imtq_hk, handle_imtq_hk,
) )
from eive_tmtc.pus_tm.defs import FsfwTmTcPrinter from eive_tmtc.pus_tm.defs import FsfwTmTcPrinter, PrintWrapper
from eive_tmtc.tmtc.core import handle_core_hk_data from eive_tmtc.tmtc.core import handle_core_hk_data
from eive_tmtc.tmtc.acs.mgms import handle_mgm_hk_data from eive_tmtc.tmtc.acs.mgms import handle_mgm_hk_data
import eive_tmtc.config.object_ids as obj_ids import eive_tmtc.config.object_ids as obj_ids
@ -95,13 +95,14 @@ def handle_regular_hk_print(
objb = object_id.as_bytes objb = object_id.as_bytes
set_id = hk_packet.set_id set_id = hk_packet.set_id
packet_dt = tm.time_provider.as_date_time() packet_dt = tm.time_provider.as_date_time()
pw = PrintWrapper(printer.file_logger)
"""This function is called when a Service 3 Housekeeping packet is received.""" """This function is called when a Service 3 Housekeeping packet is received."""
if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
return handle_rw_hk_data(printer, object_id, set_id, hk_data) return handle_rw_hk_data(pw, object_id, set_id, hk_data)
elif objb == obj_ids.SYRLINKS_HANDLER_ID: elif objb == obj_ids.SYRLINKS_HANDLER_ID:
return handle_syrlinks_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) return handle_syrlinks_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.IMTQ_HANDLER_ID: elif objb == obj_ids.IMTQ_HANDLER_ID:
return handle_imtq_hk(printer=printer, hk_data=hk_data, set_id=set_id) return handle_imtq_hk(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.GPS_CONTROLLER: elif objb == obj_ids.GPS_CONTROLLER:
return handle_gps_data(printer=printer, hk_data=hk_data) return handle_gps_data(printer=printer, hk_data=hk_data)
elif objb == obj_ids.PCDU_HANDLER_ID: elif objb == obj_ids.PCDU_HANDLER_ID:

View File

@ -721,7 +721,7 @@ def handle_acs_ctrl_sus_raw_data(pw: PrintWrapper, hk_data: bytes):
sus_list_formatted = vec_fmt.format(*sus_list) sus_list_formatted = vec_fmt.format(*sus_list)
current_idx += length current_idx += length
pw.dlog(f"SUS {idx} RAW: {sus_list_formatted}") pw.dlog(f"SUS {idx} RAW: {sus_list_formatted}")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=12) pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=12)
def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes): def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes):
@ -757,7 +757,7 @@ def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes):
sun_ijk_model = vec_fmt.format(*sun_ijk_model) sun_ijk_model = vec_fmt.format(*sun_ijk_model)
current_idx += inc_len current_idx += inc_len
pw.dlog(f"{'SUS ijk Model'.ljust(25)}: {sun_ijk_model}") pw.dlog(f"{'SUS ijk Model'.ljust(25)}: {sun_ijk_model}")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=15) pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=15)
def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes): def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes):
@ -812,7 +812,7 @@ def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"{entry[0].ljust(28)}: {entry[1]}") pw.dlog(f"{entry[0].ljust(28)}: {entry[1]}")
current_idx += 1 current_idx += 1
assert current_idx == 61 assert current_idx == 61
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=6) pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=6)
def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes): def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes):
@ -866,7 +866,7 @@ def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes):
current_idx += inc_len current_idx += inc_len
if PERFORM_MGM_CALIBRATION: if PERFORM_MGM_CALIBRATION:
perform_mgm_calibration(pw, mgm_3) perform_mgm_calibration(pw, mgm_3)
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=8) pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=8)
def handle_gyr_data_raw(pw: PrintWrapper, hk_data: bytes): def handle_gyr_data_raw(pw: PrintWrapper, hk_data: bytes):
@ -900,7 +900,7 @@ def handle_gyr_data_raw(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"{'GYR 1 L3'.ljust(15)}: {float_str_fmt.format(*gyr_1_l3)}") pw.dlog(f"{'GYR 1 L3'.ljust(15)}: {float_str_fmt.format(*gyr_1_l3)}")
pw.dlog(f"{'GYR 2 ADIS'.ljust(15)}: {float_str_fmt.format(*gyr_2_adis)}") pw.dlog(f"{'GYR 2 ADIS'.ljust(15)}: {float_str_fmt.format(*gyr_2_adis)}")
pw.dlog(f"{'GYR 3 L3'.ljust(15)}: {float_str_fmt.format(*gyr_3_l3)}") pw.dlog(f"{'GYR 3 L3'.ljust(15)}: {float_str_fmt.format(*gyr_3_l3)}")
pw.printer.print_validity_buffer(hk_data[current_idx:], 4) pw.printer.get_validity_buffer(hk_data[current_idx:], 4)
GYR_NAMES = ["GYR 0 ADIS", "GYR 1 L3", "GYR 2 ADIS", "GYR 3 L3"] GYR_NAMES = ["GYR 0 ADIS", "GYR 1 L3", "GYR 2 ADIS", "GYR 3 L3"]
@ -926,7 +926,7 @@ def handle_gyr_data_processed(pw: PrintWrapper, hk_data: bytes):
] ]
pw.dlog(f"GYR Vec Total: {gyr_vec_tot}") pw.dlog(f"GYR Vec Total: {gyr_vec_tot}")
current_idx += inc_len current_idx += inc_len
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=5) pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=5)
def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes): def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
@ -979,7 +979,7 @@ def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"GPS Altitude: {alt} [m]") pw.dlog(f"GPS Altitude: {alt} [m]")
pw.dlog(f"GPS Position: {pos} [m]") pw.dlog(f"GPS Position: {pos} [m]")
pw.dlog(f"GPS Velocity: {velo} [m/s]") pw.dlog(f"GPS Velocity: {velo} [m/s]")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=5) pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=5)
def handle_mekf_data(pw: PrintWrapper, hk_data: bytes): def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
@ -1023,7 +1023,7 @@ def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"{'MEKF Raw Status (key unknown)'.ljust(25)}: {status}") pw.dlog(f"{'MEKF Raw Status (key unknown)'.ljust(25)}: {status}")
pw.dlog(f"{'MEKF Quaternion'.ljust(25)}: {fmt_str_4.format(*quat)}") pw.dlog(f"{'MEKF Quaternion'.ljust(25)}: {fmt_str_4.format(*quat)}")
pw.dlog(f"{'MEKF Rotational Rate'.ljust(25)}: {fmt_str_3.format(*rates)}") pw.dlog(f"{'MEKF Rotational Rate'.ljust(25)}: {fmt_str_3.format(*rates)}")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3) pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=3)
def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes): def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
@ -1091,7 +1091,7 @@ def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"Control Values Error Quaternion: {err_quat}") pw.dlog(f"Control Values Error Quaternion: {err_quat}")
pw.dlog(f"Control Values Error Angle: {err_ang} [deg]") pw.dlog(f"Control Values Error Angle: {err_ang} [deg]")
pw.dlog(f"Control Values Target Rotational Rate: {tgt_rot} [deg/s]") pw.dlog(f"Control Values Target Rotational Rate: {tgt_rot} [deg/s]")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=5) pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=5)
def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes): def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):
@ -1130,7 +1130,7 @@ def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"Actuator Commands RW Target Torque: {rw_tgt_torque}") pw.dlog(f"Actuator Commands RW Target Torque: {rw_tgt_torque}")
pw.dlog(f"Actuator Commands RW Target Speed: {rw_tgt_speed}") pw.dlog(f"Actuator Commands RW Target Speed: {rw_tgt_speed}")
pw.dlog(f"Actuator Commands MTQ Target Dipole: {mtq_tgt_dipole}") pw.dlog(f"Actuator Commands MTQ Target Dipole: {mtq_tgt_dipole}")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3) pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=3)
def perform_mgm_calibration(pw: PrintWrapper, mgm_tuple: Tuple): def perform_mgm_calibration(pw: PrintWrapper, mgm_tuple: Tuple):

View File

@ -124,4 +124,4 @@ def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
) )
pw.dlog(f"GNSS Date: {date_string}") pw.dlog(f"GNSS Date: {date_string}")
pw.dlog(f"Unix seconds {unix_seconds}") pw.dlog(f"Unix seconds {unix_seconds}")
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=14) printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=14)

View File

@ -426,11 +426,11 @@ ENG_HK_HEADERS = [
] ]
def handle_imtq_hk(printer: FsfwTmTcPrinter, hk_data: bytes, set_id: int): def handle_imtq_hk(pw: PrintWrapper, hk_data: bytes, set_id: int):
if (set_id >= ImtqSetId.POSITIVE_X_TEST) and (set_id <= ImtqSetId.NEGATIVE_Z_TEST): if (set_id >= ImtqSetId.POSITIVE_X_TEST) and (set_id <= ImtqSetId.NEGATIVE_Z_TEST):
return handle_self_test_data(printer, hk_data) return handle_self_test_data(pw, hk_data)
elif set_id == ImtqSetId.ENG_HK_NO_TORQUE: elif set_id == ImtqSetId.ENG_HK_NO_TORQUE:
return handle_eng_set(printer, hk_data, False) return handle_eng_set(pw, hk_data, False)
elif set_id == ImtqSetId.ENG_HK_SET_WITH_TORQUE: elif set_id == ImtqSetId.ENG_HK_SET_WITH_TORQUE:
return handle_eng_set(printer, hk_data, True) return handle_eng_set(printer, hk_data, True)
elif set_id == ImtqSetId.CAL_MTM_SET: elif set_id == ImtqSetId.CAL_MTM_SET:
@ -444,8 +444,9 @@ def handle_imtq_hk(printer: FsfwTmTcPrinter, hk_data: bytes, set_id: int):
elif set_id == ImtqSetId.STATUS_SET: elif set_id == ImtqSetId.STATUS_SET:
return handle_status_set(printer, hk_data) return handle_status_set(printer, hk_data)
else: else:
_LOGGER.warning( pw.wlog(
f"IMTQ handler HK reply with unknown or unimplemented set id {set_id}" _LOGGER,
f"IMTQ handler HK reply with unknown or unimplemented set id {set_id}",
) )
@ -469,7 +470,7 @@ def handle_dipole_set(printer: FsfwTmTcPrinter, hk_data: bytes):
pw.dlog(f"Dipole Y: {dipole_y}") pw.dlog(f"Dipole Y: {dipole_y}")
pw.dlog(f"Dipole Z: {dipole_z}") pw.dlog(f"Dipole Z: {dipole_z}")
pw.dlog(f"Current torque duration: {current_torque_duration}") pw.dlog(f"Current torque duration: {current_torque_duration}")
pw.printer.print_validity_buffer(hk_data[fmt_len:], 2) pw.printer.get_validity_buffer(hk_data[fmt_len:], 2)
def unpack_eng_hk(hk_data: bytes) -> List: def unpack_eng_hk(hk_data: bytes) -> List:
@ -500,8 +501,7 @@ def unpack_eng_hk(hk_data: bytes) -> List:
return content_list return content_list
def handle_eng_set(printer: FsfwTmTcPrinter, hk_data: bytes, torque_on: bool): def handle_eng_set(pw: PrintWrapper, hk_data: bytes, torque_on: bool):
pw = PrintWrapper(printer)
pw.dlog(f"Found engineering HK. Torque Status: {torque_on}") pw.dlog(f"Found engineering HK. Torque Status: {torque_on}")
content_list = unpack_eng_hk(hk_data) content_list = unpack_eng_hk(hk_data)
validity_buffer = hk_data[32:] validity_buffer = hk_data[32:]
@ -509,22 +509,28 @@ def handle_eng_set(printer: FsfwTmTcPrinter, hk_data: bytes, torque_on: bool):
num_of_vars = len(ENG_HK_HEADERS) num_of_vars = len(ENG_HK_HEADERS)
for k, v in zip(ENG_HK_HEADERS, content_list): for k, v in zip(ENG_HK_HEADERS, content_list):
pw.dlog(f"{k.ljust(30)}: {v}") pw.dlog(f"{k.ljust(30)}: {v}")
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=validity_buffer, num_vars=num_of_vars
)
)
def handle_status_set(printer: FsfwTmTcPrinter, hk_data: bytes): def handle_status_set(pw: PrintWrapper, hk_data: bytes):
pw = PrintWrapper(printer)
content_list = unpack_status_set(hk_data) content_list = unpack_status_set(hk_data)
validity_buffer = hk_data[7:] validity_buffer = hk_data[7:]
num_of_vars = 4 num_of_vars = 4
for k, v in zip(STATUS_HEADERS, content_list): for k, v in zip(STATUS_HEADERS, content_list):
pw.dlog(f"{k.ljust(30)}: {v}") pw.dlog(f"{k.ljust(30)}: {v}")
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=validity_buffer, num_vars=num_of_vars
)
)
def handle_calibrated_mtm_measurement(printer: FsfwTmTcPrinter, hk_data: bytes): def handle_calibrated_mtm_measurement(pw: PrintWrapper, hk_data: bytes):
pw = PrintWrapper(printer)
header_list = [ header_list = [
"Calibrated MTM X [nT]", "Calibrated MTM X [nT]",
"Calibrated MTM Y [nT]", "Calibrated MTM Y [nT]",
@ -540,13 +546,14 @@ def handle_calibrated_mtm_measurement(printer: FsfwTmTcPrinter, hk_data: bytes):
num_of_vars = len(header_list) num_of_vars = len(header_list)
pw.dlog(str(header_list)) pw.dlog(str(header_list))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=validity_buffer, num_vars=num_of_vars
)
)
def handle_raw_mtm_measurement( def handle_raw_mtm_measurement(pw: PrintWrapper, hk_data: bytes, torque_status: bool):
printer: FsfwTmTcPrinter, hk_data: bytes, torque_status: bool
):
pw = PrintWrapper(printer)
pw.dlog(f"Found raw MTM measurement. Torque Status: {torque_status}") pw.dlog(f"Found raw MTM measurement. Torque Status: {torque_status}")
header_list = [ header_list = [
"Raw MTM X [nT]", "Raw MTM X [nT]",
@ -563,11 +570,14 @@ def handle_raw_mtm_measurement(
num_of_vars = 2 num_of_vars = 2
pw.dlog(str(header_list)) pw.dlog(str(header_list))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=validity_buffer, num_vars=num_of_vars
)
)
def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes): def handle_self_test_data(pw: PrintWrapper, hk_data: bytes):
pw = PrintWrapper(printer)
header_list = [ header_list = [
"Init Err", "Init Err",
"Init Raw Mag X [nT]", "Init Raw Mag X [nT]",
@ -699,4 +709,8 @@ def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
num_of_vars = len(header_list) num_of_vars = len(header_list)
pw.dlog(str(header_list)) pw.dlog(str(header_list))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=validity_buffer, num_vars=num_of_vars
)
)

View File

@ -288,10 +288,9 @@ def pack_set_speed_command(
def handle_rw_hk_data( def handle_rw_hk_data(
printer: FsfwTmTcPrinter, object_id: ObjectIdU32, set_id: int, hk_data: bytes pw: PrintWrapper, object_id: ObjectIdU32, set_id: int, hk_data: bytes
): ):
pw = PrintWrapper(printer)
current_idx = 0 current_idx = 0
if set_id == RwSetId.STATUS_SET_ID: if set_id == RwSetId.STATUS_SET_ID:
pw.dlog( pw.dlog(
@ -316,7 +315,7 @@ def handle_rw_hk_data(
f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), " f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), "
f"1: High Current Mode (0.6 A)" f"1: High Current Mode (0.6 A)"
) )
printer.print_validity_buffer(hk_data[current_idx:], 5) pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 5))
if set_id == RwSetId.LAST_RESET: if set_id == RwSetId.LAST_RESET:
pw.dlog( pw.dlog(
f"Received Last Reset HK (ID {set_id}) from Reaction Wheel {object_id.name}" f"Received Last Reset HK (ID {set_id}) from Reaction Wheel {object_id.name}"
@ -401,9 +400,11 @@ def handle_rw_hk_data(
f"{spi_total_num_errors}" f"{spi_total_num_errors}"
) )
if current_idx > 0: if current_idx > 0:
printer.print_validity_buffer( pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=27 validity_buffer=hk_data[current_idx:], num_vars=27
) )
)
def rw_speed_up_cmd_consec( def rw_speed_up_cmd_consec(

View File

@ -747,7 +747,7 @@ def handle_temperature_set(hk_data: bytes, pw: PrintWrapper):
pw.dlog(f"CMOS Temperature: {cmos_temp}") pw.dlog(f"CMOS Temperature: {cmos_temp}")
pw.dlog(f"FPGA Temperature: {fpga_temp}") pw.dlog(f"FPGA Temperature: {fpga_temp}")
current_idx += fmt_len current_idx += fmt_len
pw.printer.print_validity_buffer(hk_data[current_idx:], 5) pw.printer.get_validity_buffer(hk_data[current_idx:], 5)
def handle_solution_set(hk_data: bytes, pw: PrintWrapper): def handle_solution_set(hk_data: bytes, pw: PrintWrapper):
@ -821,7 +821,7 @@ def handle_solution_set(hk_data: bytes, pw: PrintWrapper):
solution_strategy = hk_data[current_idx] solution_strategy = hk_data[current_idx]
pw.dlog(f"Solution strategy: {solution_strategy}") pw.dlog(f"Solution strategy: {solution_strategy}")
current_idx += 1 current_idx += 1
pw.printer.print_validity_buffer(hk_data[current_idx:], 23) pw.printer.get_validity_buffer(hk_data[current_idx:], 23)
@tmtc_definitions_provider @tmtc_definitions_provider

View File

@ -27,4 +27,4 @@ def handle_sus_hk(
pw.dlog(f"AIN Channel | Raw Value (hex) | Raw Value (dec)") pw.dlog(f"AIN Channel | Raw Value (hex) | Raw Value (dec)")
for idx, val in enumerate(channels): for idx, val in enumerate(channels):
pw.dlog(f"{idx} | {val[0]:#06x} |" + str(val[0]).ljust(5)) pw.dlog(f"{idx} | {val[0]:#06x} |" + str(val[0]).ljust(5))
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=7) printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=7)

View File

@ -254,31 +254,28 @@ def pack_syrlinks_command(
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
def handle_syrlinks_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): def handle_syrlinks_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
if set_id == SetId.RX_REGISTERS_DATASET: if set_id == SetId.RX_REGISTERS_DATASET:
return handle_syrlinks_rx_registers_dataset(printer, hk_data) return handle_syrlinks_rx_registers_dataset(pw, hk_data)
elif set_id == SetId.TX_REGISTERS_DATASET: elif set_id == SetId.TX_REGISTERS_DATASET:
return handle_syrlinks_tx_registers_dataset(printer, hk_data) return handle_syrlinks_tx_registers_dataset(pw, hk_data)
elif set_id == SetId.TEMPERATURE_SET_ID: elif set_id == SetId.TEMPERATURE_SET_ID:
return handle_syrlinks_temp_dataset(printer, hk_data) return handle_syrlinks_temp_dataset(pw, hk_data)
else: else:
pw = PrintWrapper(printer)
pw.dlog(f"Service 3 TM: Syrlinks handler reply with unknown set ID {set_id}") pw.dlog(f"Service 3 TM: Syrlinks handler reply with unknown set ID {set_id}")
def handle_syrlinks_temp_dataset(printer: FsfwTmTcPrinter, hk_data: bytes): def handle_syrlinks_temp_dataset(pw: PrintWrapper, hk_data: bytes):
pw = PrintWrapper(printer)
if len(hk_data) < 8: if len(hk_data) < 8:
raise ValueError("expected at least 8 bytes of HK data") raise ValueError("expected at least 8 bytes of HK data")
temp_power_amplifier = struct.unpack("!f", hk_data[0:4])[0] temp_power_amplifier = struct.unpack("!f", hk_data[0:4])[0]
temp_baseband_board = struct.unpack("!f", hk_data[4:8])[0] temp_baseband_board = struct.unpack("!f", hk_data[4:8])[0]
pw.dlog(f"Temperatur Power Amplifier [C]: {temp_power_amplifier}") pw.dlog(f"Temperatur Power Amplifier [C]: {temp_power_amplifier}")
pw.dlog(f"Temperatur Baseband Board [C]: {temp_baseband_board}") pw.dlog(f"Temperatur Baseband Board [C]: {temp_baseband_board}")
printer.print_validity_buffer(hk_data[8:], 2) pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[8:], 2))
def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes): def handle_syrlinks_rx_registers_dataset(pw: PrintWrapper, hk_data: bytes):
pw = PrintWrapper(printer)
header_list = [ header_list = [
"RX Status", "RX Status",
"RX Sensitivity", "RX Sensitivity",
@ -338,7 +335,9 @@ def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: byte
validity_buffer = hk_data[22:] validity_buffer = hk_data[22:]
for header, content in zip(header_list, content_list): for header, content in zip(header_list, content_list):
pw.dlog(f"{header}: {content}") pw.dlog(f"{header}: {content}")
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8) pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
)
pw.dlog(f"Carrier Detect: {carrier_detect}") pw.dlog(f"Carrier Detect: {carrier_detect}")
pw.dlog(f"Carrier Lock: {carrier_lock}") pw.dlog(f"Carrier Lock: {carrier_lock}")
pw.dlog(f"Data Lock (data clock recovery loop lock status): {data_lock}") pw.dlog(f"Data Lock (data clock recovery loop lock status): {data_lock}")
@ -372,10 +371,9 @@ WAVEFORM_STRINGS = ["OFF", "CW", "QPSK", "0QPSK", "PCM/PM", "PSK/PM", "BPSK"]
def handle_syrlinks_tx_registers_dataset( def handle_syrlinks_tx_registers_dataset(
printer: FsfwTmTcPrinter, pw: PrintWrapper,
hk_data: bytes, hk_data: bytes,
): ):
pw = PrintWrapper(printer)
header_list = ["TX Status Raw", "TX Waveform", "TX AGC value"] header_list = ["TX Status Raw", "TX Waveform", "TX AGC value"]
tx_status = hk_data[0] tx_status = hk_data[0]
""" """
@ -412,7 +410,9 @@ def handle_syrlinks_tx_registers_dataset(
validity_buffer = hk_data[4:] validity_buffer = hk_data[4:]
for header, content in zip(header_list, content_list): for header, content in zip(header_list, content_list):
pw.dlog(f"{header}: {content}") pw.dlog(f"{header}: {content}")
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3) pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(validity_buffer=validity_buffer, num_vars=3)
)
# pw.dlog(f"TX CONV: {tx_conv!r}") # pw.dlog(f"TX CONV: {tx_conv!r}")
# pw.dlog(f"TX DIFF (differential encoder enable): {tx_diff_encoder_enable}") # pw.dlog(f"TX DIFF (differential encoder enable): {tx_diff_encoder_enable}")
pw.dlog(f"TX Status: {tx_status_status!r}") pw.dlog(f"TX Status: {tx_status_status!r}")

View File

@ -594,7 +594,7 @@ def handle_core_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
f"PL Voltage [mV] {pl_voltage}" f"PL Voltage [mV] {pl_voltage}"
) )
pw.dlog(printout) pw.dlog(printout)
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=3) printer.get_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=3)
def handle_core_ctrl_action_replies( def handle_core_ctrl_action_replies(

View File

@ -524,8 +524,7 @@ def get_sequence_file() -> str:
return file return file
def handle_ploc_mpsoc_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes, set_id: int): def handle_ploc_mpsoc_hk_data(pw: PrintWrapper, hk_data: bytes, set_id: int):
pw = PrintWrapper(printer)
if set_id == SetId.HK_ID: if set_id == SetId.HK_ID:
fmt_str = "!IBBBBBBB" fmt_str = "!IBBBBBBB"
current_idx = 0 current_idx = 0
@ -640,10 +639,7 @@ class DirElement:
size: int size: int
def handle_mpsoc_data_reply( def handle_mpsoc_data_reply(action_id: int, pw: PrintWrapper, custom_data: bytearray):
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
pw = PrintWrapper(printer)
if action_id == ActionId.TM_MEM_READ_RPT: if action_id == ActionId.TM_MEM_READ_RPT:
header_list = [ header_list = [
"PLOC Memory Address", "PLOC Memory Address",
@ -655,20 +651,16 @@ def handle_mpsoc_data_reply(
struct.unpack("!H", custom_data[4:6])[0], struct.unpack("!H", custom_data[4:6])[0],
"0x" + custom_data[6:10].hex(), "0x" + custom_data[6:10].hex(),
] ]
print(header_list) pw.dlog(f"{header_list}")
print(content_list) pw.dlog(f"{content_list}")
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)
elif action_id == ActionId.TM_CAM_CMD_RPT: elif action_id == ActionId.TM_CAM_CMD_RPT:
header_list = ["Camera reply string", "ACK"] header_list = ["Camera reply string", "ACK"]
content_list = [ content_list = [
custom_data[: len(custom_data) - 1].decode("utf-8"), custom_data[: len(custom_data) - 1].decode("utf-8"),
hex(custom_data[-1]), hex(custom_data[-1]),
] ]
print(header_list) pw.dlog(f"{header_list}")
print(content_list) pw.dlog(f"{content_list}")
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)
elif action_id == ActionId.TM_FLASH_DIRECTORY_CONTENT: elif action_id == ActionId.TM_FLASH_DIRECTORY_CONTENT:
if len(custom_data) < 16: if len(custom_data) < 16:
_LOGGER.warning( _LOGGER.warning(

View File

@ -752,7 +752,7 @@ def handle_supv_hk_data(set_id: int, hk_data: bytes, printer: FsfwTmTcPrinter):
f"BP0 State {bp_0_state} | BP1 State {bp_1_state} | BP2 State {bp_2_state}" f"BP0 State {bp_0_state} | BP1 State {bp_1_state} | BP2 State {bp_2_state}"
) )
pw.dlog(f"Boot State {boot_state} | Boot Cycles {boot_cycles}") pw.dlog(f"Boot State {boot_state} | Boot Cycles {boot_cycles}")
pw.printer.print_validity_buffer(hk_data[current_idx:], 10) pw.printer.get_validity_buffer(hk_data[current_idx:], 10)
else: else:
pw.dlog(f"PLOC SUPV: HK handling not implemented for set ID {set_id}") pw.dlog(f"PLOC SUPV: HK handling not implemented for set ID {set_id}")
pw.dlog(f"Raw Data: 0x[{hk_data.hex(sep=',')}]") pw.dlog(f"Raw Data: 0x[{hk_data.hex(sep=',')}]")

View File

@ -116,4 +116,4 @@ def handle_rad_sensor_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
for idx, val in ain_dict.items(): for idx, val in ain_dict.items():
pw.dlog(f"{idx} | {val:#06x} | {str(val).ljust(5)}") pw.dlog(f"{idx} | {val:#06x} | {str(val).ljust(5)}")
current_idx += inc_len current_idx += inc_len
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=7) printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=7)

View File

@ -158,7 +158,7 @@ def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
validity_buffer = hk_data[inc_len:] validity_buffer = hk_data[inc_len:]
pw.dlog(str(HEADER_LIST)) pw.dlog(str(HEADER_LIST))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) printer.get_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
elif set_id == BpxSetId.GET_CFG_SET: elif set_id == BpxSetId.GET_CFG_SET:
battheat_mode = hk_data[0] battheat_mode = hk_data[0]
battheat_low = struct.unpack("!b", hk_data[1:2])[0] battheat_low = struct.unpack("!b", hk_data[1:2])[0]
@ -172,4 +172,4 @@ def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
validity_buffer = hk_data[3:] validity_buffer = hk_data[3:]
pw.dlog(str(header_list)) pw.dlog(str(header_list))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) printer.get_validity_buffer(validity_buffer=validity_buffer, num_vars=10)

View File

@ -496,4 +496,4 @@ def handle_plpcdu_hk(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw.dlog(ch_print) pw.dlog(ch_print)
for i in range(12): for i in range(12):
pw.dlog(f"{ADC_CHANNELS_NAMED[i].ljust(24)} | {processed_vals[i]}") pw.dlog(f"{ADC_CHANNELS_NAMED[i].ljust(24)} | {processed_vals[i]}")
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=3) printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=3)

View File

@ -276,7 +276,7 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
temps = f"In C: Temp 0 {temp_0} | Temp 1 {temp_1} | " temps = f"In C: Temp 0 {temp_0} | Temp 1 {temp_1} | "
pw.dlog(temps) pw.dlog(temps)
pw.dlog(batt_info) pw.dlog(batt_info)
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9) printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9)
if set_id == SetId.AUX: if set_id == SetId.AUX:
pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA") pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA")
current_idx = 0 current_idx = 0
@ -341,9 +341,7 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
"6:TempSens(BatPack)|7:TempSens(BatPack)" "6:TempSens(BatPack)|7:TempSens(BatPack)"
) )
dev_parser.print(pw=pw) dev_parser.print(pw=pw)
printer.print_validity_buffer( printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=27)
validity_buffer=hk_data[current_idx:], num_vars=27
)
def gen_six_entry_u16_list(hk_data: bytes, current_idx: int) -> Tuple[int, List[int]]: def gen_six_entry_u16_list(hk_data: bytes, current_idx: int) -> Tuple[int, List[int]]:
@ -397,9 +395,7 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
f"Boot Count {bootcnt} | Uptime {uptime} sec | " f"Boot Count {bootcnt} | Uptime {uptime} sec | "
f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec" f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec"
) )
printer.print_validity_buffer( printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=12)
validity_buffer=hk_data[current_idx:], num_vars=12
)
if set_id == SetId.AUX: if set_id == SetId.AUX:
current_idx = 0 current_idx = 0
fmt_str = "!BBB" fmt_str = "!BBB"
@ -434,7 +430,7 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
f"5:DAC|6:TempSens|7:Reserved" f"5:DAC|6:TempSens|7:Reserved"
) )
dev_parser.print(pw=pw) dev_parser.print(pw=pw)
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=8) printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=8)
def handle_get_param_data_reply( def handle_get_param_data_reply(
@ -578,4 +574,4 @@ def handle_pcdu_hk(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw.dlog(f"{name.ljust(25)}: {val}") pw.dlog(f"{name.ljust(25)}: {val}")
pw.dlog(f"{'P60 Dock 5V Stack'.ljust(25)}: {p60_stack_5v_val}") pw.dlog(f"{'P60 Dock 5V Stack'.ljust(25)}: {p60_stack_5v_val}")
pw.dlog(f"{'P60 Dock 3V3 Stack'.ljust(25)}: {p60_stack_3v3_val}") pw.dlog(f"{'P60 Dock 3V3 Stack'.ljust(25)}: {p60_stack_3v3_val}")
pw.printer.print_validity_buffer(hk_data[current_idx:], 4) pw.printer.get_validity_buffer(hk_data[current_idx:], 4)

View File

@ -162,7 +162,7 @@ def handle_rtd_hk(object_id: bytes, hk_data: bytes, printer: FsfwTmTcPrinter):
pw.dlog(f"RTD Value: {rtd_val}") pw.dlog(f"RTD Value: {rtd_val}")
pw.dlog(f"Error Byte: {error_byte}") pw.dlog(f"Error Byte: {error_byte}")
pw.dlog(f"Last Error Byte: {last_err_byte}") pw.dlog(f"Last Error Byte: {last_err_byte}")
pw.printer.print_validity_buffer(hk_data[fmt_len:], 4) pw.printer.get_validity_buffer(hk_data[fmt_len:], 4)
def prompt_rtd_idx(): def prompt_rtd_idx():