Refactored IMTQ handling #136
@ -397,9 +397,9 @@ def pack_proc_commands(q: DefaultPusQueueHelper, op_code: str):
|
||||
if op_code in OpCode.MGT_FT:
|
||||
key = KAI.MGT_FT[0]
|
||||
imtq_pairs = [
|
||||
(oids.IMTQ_HANDLER_ID, ImtqSetId.ENG_HK_SET),
|
||||
(oids.IMTQ_HANDLER_ID, ImtqSetId.ENG_HK_NO_TORQUE),
|
||||
(oids.IMTQ_HANDLER_ID, ImtqSetId.CAL_MTM_SET),
|
||||
(oids.IMTQ_HANDLER_ID, ImtqSetId.RAW_MTM_SET),
|
||||
(oids.IMTQ_HANDLER_ID, ImtqSetId.RAW_MTM_NO_TORQUE),
|
||||
]
|
||||
diag_list = [
|
||||
True,
|
||||
@ -444,9 +444,9 @@ def pack_proc_commands(q: DefaultPusQueueHelper, op_code: str):
|
||||
(oids.GPS_CONTROLLER, GpsSetIds.HK),
|
||||
]
|
||||
imtq_pairs = [
|
||||
(oids.IMTQ_HANDLER_ID, ImtqSetId.ENG_HK_SET),
|
||||
(oids.IMTQ_HANDLER_ID, ImtqSetId.ENG_HK_NO_TORQUE),
|
||||
(oids.IMTQ_HANDLER_ID, ImtqSetId.CAL_MTM_SET),
|
||||
(oids.IMTQ_HANDLER_ID, ImtqSetId.RAW_MTM_SET),
|
||||
(oids.IMTQ_HANDLER_ID, ImtqSetId.RAW_MTM_NO_TORQUE),
|
||||
]
|
||||
d_side_and_imtq_pairs = a_side_pairs + b_side_pairs + imtq_pairs
|
||||
diag_list = [
|
||||
|
@ -115,9 +115,7 @@ def handle_event_packet(raw_tm: bytes, printer: FsfwTmTcPrinter):
|
||||
if info.name == "CHANGING_MODE":
|
||||
mode = event_def.param1
|
||||
submode = event_def.param2
|
||||
pw.dlog(
|
||||
f"Mode Number {mode}, Submode: {submode}"
|
||||
)
|
||||
pw.dlog(f"Mode Number {mode}, Submode: {submode}")
|
||||
if not specific_handler:
|
||||
additional_event_info = f"Additional info: {info.info} | P1: {event_def.param1} | P2: {event_def.param2}"
|
||||
pw.dlog(additional_event_info)
|
||||
|
@ -101,11 +101,11 @@ def handle_regular_hk_print(
|
||||
set_id <= ImtqSetId.NEGATIVE_Z_TEST
|
||||
):
|
||||
return handle_self_test_data(printer, hk_data)
|
||||
elif set_id == ImtqSetId.ENG_HK_SET:
|
||||
elif set_id == ImtqSetId.ENG_HK_NO_TORQUE:
|
||||
return handle_eng_set(printer, hk_data)
|
||||
elif set_id == ImtqSetId.CAL_MTM_SET:
|
||||
return handle_calibrated_mtm_measurement(printer, hk_data)
|
||||
elif set_id == ImtqSetId.RAW_MTM_SET:
|
||||
elif set_id == ImtqSetId.RAW_MTM_NO_TORQUE:
|
||||
return handle_raw_mtm_measurement(printer, hk_data)
|
||||
else:
|
||||
_LOGGER.info("Service 3 TM: IMTQ handler reply with unknown set id")
|
||||
|
@ -35,15 +35,20 @@ class OpCode:
|
||||
|
||||
|
||||
class ImtqSetId:
|
||||
ENG_HK_SET = 1
|
||||
CAL_MTM_SET = 2
|
||||
RAW_MTM_SET = 3
|
||||
POSITIVE_X_TEST = 4
|
||||
NEGATIVE_X_TEST = 5
|
||||
POSITIVE_Y_TEST = 6
|
||||
NEGATIVE_Y_TEST = 7
|
||||
POSITIVE_Z_TEST = 8
|
||||
NEGATIVE_Z_TEST = 9
|
||||
ENG_HK_NO_TORQUE = 1
|
||||
RAW_MTM_NO_TORQUE = 2
|
||||
ENG_HK_SET_WITH_TORQUE = 3
|
||||
RAW_MTM_WITH_TORQUE = 4
|
||||
STATUS_SET = 5
|
||||
DIPOLES = 6
|
||||
|
||||
CAL_MTM_SET = 9
|
||||
POSITIVE_X_TEST = 10
|
||||
NEGATIVE_X_TEST = 11
|
||||
POSITIVE_Y_TEST = 12
|
||||
NEGATIVE_Y_TEST = 13
|
||||
POSITIVE_Z_TEST = 14
|
||||
NEGATIVE_Z_TEST = 15
|
||||
|
||||
|
||||
class ImtqActionId:
|
||||
@ -199,7 +204,9 @@ def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_cod
|
||||
q.add_log_cmd("IMTQ: Get engineering hk set")
|
||||
q.add_pus_tc(
|
||||
generate_one_diag_command(
|
||||
sid=make_sid(object_id=object_id.as_bytes, set_id=ImtqSetId.ENG_HK_SET)
|
||||
sid=make_sid(
|
||||
object_id=object_id.as_bytes, set_id=ImtqSetId.ENG_HK_NO_TORQUE
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@ -215,7 +222,9 @@ def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_cod
|
||||
q.add_log_cmd("IMTQ: Get raw MTM hk set")
|
||||
q.add_pus_tc(
|
||||
generate_one_diag_command(
|
||||
sid=make_sid(object_id=object_id.as_bytes, set_id=ImtqSetId.RAW_MTM_SET)
|
||||
sid=make_sid(
|
||||
object_id=object_id.as_bytes, set_id=ImtqSetId.RAW_MTM_NO_TORQUE
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@ -262,11 +271,15 @@ def raise_dipole_error(dipole_str: str, value: int):
|
||||
)
|
||||
|
||||
|
||||
ENG_HK_HEADERS = [
|
||||
STATUS_HEADERS = [
|
||||
"Status Byte Mode",
|
||||
"Status Byte Error",
|
||||
"Status Byte Config",
|
||||
"Status Byte Uptime [s]",
|
||||
]
|
||||
|
||||
|
||||
ENG_HK_HEADERS = [
|
||||
"Digital Voltage [mV]",
|
||||
"Analog Voltage [mV]",
|
||||
"Digital Current [mA]",
|
||||
@ -281,27 +294,27 @@ ENG_HK_HEADERS = [
|
||||
]
|
||||
|
||||
|
||||
def unpack_eng_hk(hk_data: bytes) -> List:
|
||||
def unpack_status_set(hk_data: bytes) -> List:
|
||||
status_mode = hk_data[0]
|
||||
status_error = hk_data[1]
|
||||
status_conf = hk_data[2]
|
||||
status_uptime = struct.unpack("!I", hk_data[3:7])[0]
|
||||
digital_voltage = struct.unpack("!H", hk_data[7:9])[0]
|
||||
analog_voltage = struct.unpack("!H", hk_data[9:11])[0]
|
||||
digital_current = struct.unpack("!f", hk_data[11:15])[0]
|
||||
analog_current = struct.unpack("!f", hk_data[15:19])[0]
|
||||
coil_x_current = struct.unpack("!f", hk_data[19:23])[0]
|
||||
coil_y_current = struct.unpack("!f", hk_data[23:27])[0]
|
||||
coil_z_current = struct.unpack("!f", hk_data[27:31])[0]
|
||||
coil_x_temperature = struct.unpack("!h", hk_data[31:33])[0]
|
||||
coil_y_temperature = struct.unpack("!h", hk_data[33:35])[0]
|
||||
coil_z_temperature = struct.unpack("!h", hk_data[35:37])[0]
|
||||
mcu_temperature = struct.unpack("!h", hk_data[37:39])[0]
|
||||
return [status_mode, status_error, status_conf, status_uptime]
|
||||
|
||||
|
||||
def unpack_eng_hk(hk_data: bytes) -> List:
|
||||
digital_voltage = struct.unpack("!H", hk_data[0:2])[0]
|
||||
analog_voltage = struct.unpack("!H", hk_data[2:4])[0]
|
||||
digital_current = struct.unpack("!f", hk_data[4:8])[0]
|
||||
analog_current = struct.unpack("!f", hk_data[8:12])[0]
|
||||
coil_x_current = struct.unpack("!f", hk_data[12:16])[0]
|
||||
coil_y_current = struct.unpack("!f", hk_data[16:20])[0]
|
||||
coil_z_current = struct.unpack("!f", hk_data[20:24])[0]
|
||||
coil_x_temperature = struct.unpack("!h", hk_data[24:26])[0]
|
||||
coil_y_temperature = struct.unpack("!h", hk_data[26:28])[0]
|
||||
coil_z_temperature = struct.unpack("!h", hk_data[28:30])[0]
|
||||
mcu_temperature = struct.unpack("!h", hk_data[30:32])[0]
|
||||
content_list = [
|
||||
status_mode,
|
||||
status_error,
|
||||
status_conf,
|
||||
status_uptime,
|
||||
digital_voltage,
|
||||
analog_voltage,
|
||||
digital_current,
|
||||
@ -320,7 +333,7 @@ def unpack_eng_hk(hk_data: bytes) -> List:
|
||||
def handle_eng_set(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
content_list = unpack_eng_hk(hk_data)
|
||||
validity_buffer = hk_data[39:]
|
||||
validity_buffer = hk_data[32:]
|
||||
|
||||
num_of_vars = len(ENG_HK_HEADERS)
|
||||
for k, v in zip(ENG_HK_HEADERS, content_list):
|
||||
@ -328,6 +341,17 @@ def handle_eng_set(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
|
||||
|
||||
def handle_status_set(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
content_list = unpack_status_set(hk_data)
|
||||
validity_buffer = hk_data[7:]
|
||||
|
||||
num_of_vars = 4
|
||||
for k, v in zip(STATUS_HEADERS, content_list):
|
||||
pw.dlog(f"{k.ljust(30)}: {v}")
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
|
||||
|
||||
def handle_calibrated_mtm_measurement(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
header_list = [
|
||||
|
@ -84,7 +84,10 @@ class TmStoreSelect(enum.IntEnum):
|
||||
|
||||
STORE_DICT = {
|
||||
TmStoreSelect.OK_TM_STORE: (OK_TM_STORE, "OK Store (Verification)"),
|
||||
TmStoreSelect.NOT_OK_TM_STORE: (NOT_OK_TM_STORE, "NOT OK Store (Events, Verification Failures..)"),
|
||||
TmStoreSelect.NOT_OK_TM_STORE: (
|
||||
NOT_OK_TM_STORE,
|
||||
"NOT OK Store (Events, Verification Failures..)",
|
||||
),
|
||||
TmStoreSelect.MISC_TM_STORE: (MISC_TM_STORE, "Miscellaneous Store"),
|
||||
TmStoreSelect.HK_TM_STORE: (HK_TM_STORE, "HK TM Store"),
|
||||
TmStoreSelect.CFDP_TM_STORE: (CFDP_TM_STORE, "CFDP TM Store"),
|
||||
|
Loading…
x
Reference in New Issue
Block a user