rework it
This commit is contained in:
@ -426,11 +426,11 @@ ENG_HK_HEADERS = [
|
||||
]
|
||||
|
||||
|
||||
def handle_imtq_hk(printer: FsfwTmTcPrinter, hk_data: bytes, set_id: int):
|
||||
def handle_imtq_hk(pw: PrintWrapper, hk_data: bytes, set_id: int):
|
||||
if (set_id >= ImtqSetId.POSITIVE_X_TEST) and (set_id <= ImtqSetId.NEGATIVE_Z_TEST):
|
||||
return handle_self_test_data(printer, hk_data)
|
||||
return handle_self_test_data(pw, hk_data)
|
||||
elif set_id == ImtqSetId.ENG_HK_NO_TORQUE:
|
||||
return handle_eng_set(printer, hk_data, False)
|
||||
return handle_eng_set(pw, hk_data, False)
|
||||
elif set_id == ImtqSetId.ENG_HK_SET_WITH_TORQUE:
|
||||
return handle_eng_set(printer, hk_data, True)
|
||||
elif set_id == ImtqSetId.CAL_MTM_SET:
|
||||
@ -444,8 +444,9 @@ def handle_imtq_hk(printer: FsfwTmTcPrinter, hk_data: bytes, set_id: int):
|
||||
elif set_id == ImtqSetId.STATUS_SET:
|
||||
return handle_status_set(printer, hk_data)
|
||||
else:
|
||||
_LOGGER.warning(
|
||||
f"IMTQ handler HK reply with unknown or unimplemented set id {set_id}"
|
||||
pw.wlog(
|
||||
_LOGGER,
|
||||
f"IMTQ handler HK reply with unknown or unimplemented set id {set_id}",
|
||||
)
|
||||
|
||||
|
||||
@ -469,7 +470,7 @@ def handle_dipole_set(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
pw.dlog(f"Dipole Y: {dipole_y}")
|
||||
pw.dlog(f"Dipole Z: {dipole_z}")
|
||||
pw.dlog(f"Current torque duration: {current_torque_duration}")
|
||||
pw.printer.print_validity_buffer(hk_data[fmt_len:], 2)
|
||||
pw.printer.get_validity_buffer(hk_data[fmt_len:], 2)
|
||||
|
||||
|
||||
def unpack_eng_hk(hk_data: bytes) -> List:
|
||||
@ -500,8 +501,7 @@ def unpack_eng_hk(hk_data: bytes) -> List:
|
||||
return content_list
|
||||
|
||||
|
||||
def handle_eng_set(printer: FsfwTmTcPrinter, hk_data: bytes, torque_on: bool):
|
||||
pw = PrintWrapper(printer)
|
||||
def handle_eng_set(pw: PrintWrapper, hk_data: bytes, torque_on: bool):
|
||||
pw.dlog(f"Found engineering HK. Torque Status: {torque_on}")
|
||||
content_list = unpack_eng_hk(hk_data)
|
||||
validity_buffer = hk_data[32:]
|
||||
@ -509,22 +509,28 @@ def handle_eng_set(printer: FsfwTmTcPrinter, hk_data: bytes, torque_on: bool):
|
||||
num_of_vars = len(ENG_HK_HEADERS)
|
||||
for k, v in zip(ENG_HK_HEADERS, content_list):
|
||||
pw.dlog(f"{k.ljust(30)}: {v}")
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
pw.dlog(
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=validity_buffer, num_vars=num_of_vars
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def handle_status_set(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
def handle_status_set(pw: PrintWrapper, hk_data: bytes):
|
||||
content_list = unpack_status_set(hk_data)
|
||||
validity_buffer = hk_data[7:]
|
||||
|
||||
num_of_vars = 4
|
||||
for k, v in zip(STATUS_HEADERS, content_list):
|
||||
pw.dlog(f"{k.ljust(30)}: {v}")
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
pw.dlog(
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=validity_buffer, num_vars=num_of_vars
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def handle_calibrated_mtm_measurement(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
def handle_calibrated_mtm_measurement(pw: PrintWrapper, hk_data: bytes):
|
||||
header_list = [
|
||||
"Calibrated MTM X [nT]",
|
||||
"Calibrated MTM Y [nT]",
|
||||
@ -540,13 +546,14 @@ def handle_calibrated_mtm_measurement(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
num_of_vars = len(header_list)
|
||||
pw.dlog(str(header_list))
|
||||
pw.dlog(str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
pw.dlog(
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=validity_buffer, num_vars=num_of_vars
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def handle_raw_mtm_measurement(
|
||||
printer: FsfwTmTcPrinter, hk_data: bytes, torque_status: bool
|
||||
):
|
||||
pw = PrintWrapper(printer)
|
||||
def handle_raw_mtm_measurement(pw: PrintWrapper, hk_data: bytes, torque_status: bool):
|
||||
pw.dlog(f"Found raw MTM measurement. Torque Status: {torque_status}")
|
||||
header_list = [
|
||||
"Raw MTM X [nT]",
|
||||
@ -563,11 +570,14 @@ def handle_raw_mtm_measurement(
|
||||
num_of_vars = 2
|
||||
pw.dlog(str(header_list))
|
||||
pw.dlog(str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
pw.dlog(
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=validity_buffer, num_vars=num_of_vars
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
def handle_self_test_data(pw: PrintWrapper, hk_data: bytes):
|
||||
header_list = [
|
||||
"Init Err",
|
||||
"Init Raw Mag X [nT]",
|
||||
@ -699,4 +709,8 @@ def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
num_of_vars = len(header_list)
|
||||
pw.dlog(str(header_list))
|
||||
pw.dlog(str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
pw.dlog(
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=validity_buffer, num_vars=num_of_vars
|
||||
)
|
||||
)
|
||||
|
Reference in New Issue
Block a user