# -*- coding: utf-8 -*- """ @file imtq.py @brief Tests for the ISIS IMTQ (Magnettorquer) device handler @author J. Meier @date 25.03.2021 """ import struct from typing import List from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.pus_tm.defs import PrintWrapper from spacepackets.ecss.tc import PusTelecommand from tmtccmd.config.tmtc import ( tmtc_definitions_provider, OpCodeEntry, TmtcDefinitionWrapper, ) from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc.pus_3_fsfw_hk import ( make_sid, generate_one_diag_command, generate_one_hk_command, ) from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.util import ObjectIdU32 from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter class OpCodes: ON = ["on"] NORMAL = ["normal"] OFF = ["off"] SET_DIPOLE = ["set_dipole"] class ImtqSetIds: ENG_HK_SET = 1 CAL_MTM_SET = 2 RAW_MTM_SET = 3 POSITIVE_X_TEST = 4 NEGATIVE_X_TEST = 5 POSITIVE_Y_TEST = 6 NEGATIVE_Y_TEST = 7 POSITIVE_Z_TEST = 8 NEGATIVE_Z_TEST = 9 class ImtqActionIds: start_actuation_dipole = bytearray([0x0, 0x0, 0x0, 0x02]) get_commanded_dipole = bytearray([0x0, 0x0, 0x0, 0x03]) perform_positive_x_test = bytearray([0x0, 0x0, 0x0, 0x07]) perform_negative_x_test = bytearray([0x0, 0x0, 0x0, 0x08]) perform_positive_y_test = bytearray([0x0, 0x0, 0x0, 0x09]) perform_negative_y_test = bytearray([0x0, 0x0, 0x0, 0x0A]) perform_positive_z_test = bytearray([0x0, 0x0, 0x0, 0x0B]) perform_negative_z_test = bytearray([0x0, 0x0, 0x0, 0x0C]) # Initiates the reading of the last performed self test. After sending this command the results # can be downlinked via the housekeeping service by using the appropriate set ids listed above. read_self_test_results = bytearray([0x0, 0x0, 0x0, 0x0D]) @tmtc_definitions_provider def add_imtq_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(OpCodes.OFF, "Mode Off") oce.add(OpCodes.ON, "Mode On") oce.add(OpCodes.NORMAL, "Mode Normal") oce.add("3", "IMTQ perform pos X self test") oce.add("4", "IMTQ perform neg X self test") oce.add("5", "IMTQ perform pos Y self test") oce.add("6", "IMTQ perform neg Y self test") oce.add("7", "IMTQ perform pos Z self test") oce.add("8", "IMTQ perform neg Z self test") oce.add(OpCodes.SET_DIPOLE, "IMTQ command dipole") oce.add("10", "IMTQ get commanded dipole") oce.add("11", "IMTQ get engineering hk set") oce.add("12", "IMTQ get calibrated MTM measurement one shot") oce.add("13", "IMTQ get raw MTM measurement one shot") defs.add_service(CustomServiceList.IMTQ.value, "IMQT Device", oce) def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str): q.add_log_cmd( f"Testing ISIS IMTQ handler with object id: {object_id.as_hex_string}" ) if op_code in OpCodes.OFF: q.add_log_cmd("IMTQ: Set mode off") command = pack_mode_data(object_id.as_bytes, Modes.OFF, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if op_code in OpCodes.ON: q.add_log_cmd("IMTQ: Set mode on") command = pack_mode_data(object_id.as_bytes, Modes.ON, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if op_code in OpCodes.NORMAL: q.add_log_cmd("IMTQ: Mode Normal") command = pack_mode_data(object_id.as_bytes, Modes.NORMAL, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if op_code == "3": q.add_log_cmd("IMTQ: Perform positive x self test") command = object_id.as_bytes + ImtqActionIds.perform_positive_x_test q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_log_cmd("IMTQ: Initiate reading of positive x self test results") command = object_id.as_bytes + ImtqActionIds.read_self_test_results q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_log_cmd("IMTQ: Request dataset with positive x self test results") sid = make_sid(object_id.as_bytes, ImtqSetIds.POSITIVE_X_TEST) q.add_pus_tc(generate_one_hk_command(sid)) if op_code == "4": q.add_log_cmd("IMTQ: Perform negative x self test") command = object_id.as_bytes + ImtqActionIds.perform_negative_x_test q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_log_cmd("IMTQ: Initiate reading of negative x self test results") command = object_id.as_bytes + ImtqActionIds.read_self_test_results q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_log_cmd("IMTQ: Request dataset with negative x self test results") sid = make_sid(object_id.as_bytes, ImtqSetIds.NEGATIVE_X_TEST) q.add_pus_tc(generate_one_hk_command(sid)) if op_code == "5": q.add_log_cmd("IMTQ: Perform positive y self test") command = object_id.as_bytes + ImtqActionIds.perform_positive_y_test q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_log_cmd("IMTQ: Initiate reading of positive y self test results") command = object_id.as_bytes + ImtqActionIds.read_self_test_results q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_log_cmd("IMTQ: Request dataset with positive y self test results") sid = make_sid(object_id.as_bytes, ImtqSetIds.POSITIVE_Y_TEST) q.add_pus_tc(generate_one_hk_command(sid)) if op_code == "6": q.add_log_cmd("IMTQ: Perform negative y self test") command = object_id.as_bytes + ImtqActionIds.perform_negative_y_test q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_log_cmd("IMTQ: Initiate reading of negative y self test results") command = object_id.as_bytes + ImtqActionIds.read_self_test_results q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_log_cmd("IMTQ: Request dataset with negative y self test results") sid = make_sid(object_id.as_bytes, ImtqSetIds.NEGATIVE_Y_TEST) q.add_pus_tc(generate_one_hk_command(sid)) if op_code == "7": q.add_log_cmd("IMTQ: Perform positive z self test") command = object_id.as_bytes + ImtqActionIds.perform_positive_z_test q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_log_cmd("IMTQ: Initiate reading of positive z self test results") command = object_id.as_bytes + ImtqActionIds.read_self_test_results q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_log_cmd("IMTQ: Request dataset with positive z self test results") sid = make_sid(object_id.as_bytes, ImtqSetIds.POSITIVE_Y_TEST) q.add_pus_tc(generate_one_hk_command(sid)) if op_code == "8": q.add_log_cmd("IMTQ: Perform negative z self test") command = object_id.as_bytes + ImtqActionIds.perform_negative_z_test q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_log_cmd("IMTQ: Initiate reading of negative z self test results") command = object_id.as_bytes + ImtqActionIds.read_self_test_results q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_log_cmd("IMTQ: Request dataset with negative z self test results") sid = make_sid(object_id.as_bytes, ImtqSetIds.NEGATIVE_Z_TEST) q.add_pus_tc(generate_one_hk_command(sid)) if op_code in OpCodes.SET_DIPOLE: x_dipole = int(input("Specify X dipole [range [0, 2000] * 10^-4 * Am^2]: ")) y_dipole = int(input("Specify Y dipole [range [0, 2000] * 10^-4 * Am^2]: ")) z_dipole = int(input("Specify Z dipole [range [0, 2000] * 10^-4 * Am^2]: ")) duration = int( input( f"Specify torque duration [range [0, {pow(2, 16) - 1}, " f"0 for continuous generation until update]: " ) ) dur_str = "infinite" if duration == 0 else str(duration) q.add_log_cmd( f"IMTQ: Commanding dipole X={x_dipole}, Y={y_dipole}, Z={y_dipole}, duration={dur_str}" ) q.add_pus_tc( pack_dipole_command( object_id.as_bytes, x_dipole, y_dipole, z_dipole, duration ) ) if op_code == "10": q.add_log_cmd("IMTQ: Get commanded dipole") command = object_id.as_bytes + ImtqActionIds.get_commanded_dipole q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "11": q.add_log_cmd("IMTQ: Get engineering hk set") q.add_pus_tc( generate_one_diag_command( sid=make_sid(object_id=object_id.as_bytes, set_id=ImtqSetIds.ENG_HK_SET) ) ) if op_code == "12": q.add_log_cmd("IMTQ: Get calibrated MTM hk set") q.add_pus_tc( generate_one_diag_command( sid=make_sid( object_id=object_id.as_bytes, set_id=ImtqSetIds.CAL_MTM_SET ) ) ) if op_code == "13": q.add_log_cmd("IMTQ: Get raw MTM hk set") q.add_pus_tc( generate_one_diag_command( sid=make_sid( object_id=object_id.as_bytes, set_id=ImtqSetIds.RAW_MTM_SET ) ) ) def pack_dipole_command( object_id: bytes, x_dipole: int, y_dipole: int, z_dipole: int, duration: int ) -> PusTelecommand: """This function packs the command causing the ISIS IMTQ to generate a dipole. :param object_id: The object id of the IMTQ handler. :param x_dipole: The dipole of the x coil in 10^-4*Am^2 (max. 2000) :param y_dipole: The dipole of the y coil in 10^-4*Am^2 (max. 2000) :param z_dipole: The dipole of the z coil in 10^-4*Am^2 (max. 2000) :param duration: The duration in milliseconds the dipole will be generated by the coils. When set to 0, the dipole will be generated until a new dipole actuation command is sent. """ action_id = ImtqActionIds.start_actuation_dipole command = object_id + action_id x_dipole = int(round(x_dipole)) y_dipole = int(round(y_dipole)) z_dipole = int(round(z_dipole)) if x_dipole < -2000 or x_dipole > 2000: raise_dipole_error("X dipole", x_dipole) if y_dipole < -2000 or y_dipole > 2000: raise_dipole_error("Y dipole", y_dipole) if z_dipole < -2000 or z_dipole > 2000: raise_dipole_error("Z dipole", z_dipole) duration = int(round(duration)) if duration < 0 or duration > pow(2, 16) - 1: raise ValueError( f"Duration in ms of {duration} smaller than 0 or larger than allowed {pow(2, 16) - 1}" ) command += struct.pack("!h", x_dipole) command += struct.pack("!h", y_dipole) command += struct.pack("!h", z_dipole) command += struct.pack("!H", duration) command = PusTelecommand(service=8, subservice=128, app_data=command) return command def raise_dipole_error(dipole_str: str, value: int): raise ValueError( f"{dipole_str} {value} negative or larger than maximum allowed 2000 * 10^-4*Am^2" ) ENG_HK_HEADERS = [ "Digital Voltage [mV]", "Analog Voltage [mV]", "Digital Current [mA]", "Analog Current [mA]", "Coil Current X [mA]", "Coil Current Y [mA]", "Coil Current Z [mA]", "Coil X Temperature [°C]", "Coil Y Temperature [°C]", "Coil Z Temperature [°C]", "MCU Temperature [°C]", ] def unpack_eng_hk(hk_data: bytes) -> List: digital_voltage = struct.unpack("!H", hk_data[0:2])[0] analog_voltage = struct.unpack("!H", hk_data[2:4])[0] digital_current = struct.unpack("!f", hk_data[4:8])[0] analog_current = struct.unpack("!f", hk_data[8:12])[0] coil_x_current = struct.unpack("!f", hk_data[12:16])[0] coil_y_current = struct.unpack("!f", hk_data[16:20])[0] coil_z_current = struct.unpack("!f", hk_data[20:24])[0] coil_x_temperature = struct.unpack("!h", hk_data[24:26])[0] coil_y_temperature = struct.unpack("!h", hk_data[26:28])[0] coil_z_temperature = struct.unpack("!h", hk_data[28:30])[0] mcu_temperature = struct.unpack("!h", hk_data[30:32])[0] content_list = [ digital_voltage, analog_voltage, digital_current, analog_current, coil_x_current, coil_y_current, coil_z_current, coil_x_temperature, coil_y_temperature, coil_z_temperature, mcu_temperature, ] return content_list def handle_eng_set(printer: FsfwTmTcPrinter, hk_data: bytes): pw = PrintWrapper(printer) content_list = unpack_eng_hk(hk_data) validity_buffer = hk_data[32:] num_of_vars = len(ENG_HK_HEADERS) pw.dlog(str(ENG_HK_HEADERS)) pw.dlog(str(content_list)) printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) def handle_calibrated_mtm_measurement(printer: FsfwTmTcPrinter, hk_data: bytes): pw = PrintWrapper(printer) header_list = [ "Calibrated MTM X [nT]", "Calibrated MTM Y [nT]", "Calibrated MTM Z [nT]", "Coil actuation status", ] mtm_x = struct.unpack("!I", hk_data[0:4])[0] mtm_y = struct.unpack("!I", hk_data[4:8])[0] mtm_z = struct.unpack("!I", hk_data[8:12])[0] coil_actuation_status = hk_data[12] validity_buffer = hk_data[12:] content_list = [mtm_x, mtm_y, mtm_z, coil_actuation_status] num_of_vars = len(header_list) pw.dlog(str(header_list)) pw.dlog(str(content_list)) printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) def handle_raw_mtm_measurement(printer: FsfwTmTcPrinter, hk_data: bytes): pw = PrintWrapper(printer) header_list = [ "Raw MTM X [nT]", "Raw MTM Y [nT]", "Raw MTM Z [nT]", "Coil actuation status", ] mtm_x = struct.unpack("!f", hk_data[0:4])[0] mtm_y = struct.unpack("!f", hk_data[4:8])[0] mtm_z = struct.unpack("!f", hk_data[8:12])[0] coil_actuation_status = hk_data[12] validity_buffer = hk_data[13:] content_list = [mtm_x, mtm_y, mtm_z, coil_actuation_status] num_of_vars = 2 pw.dlog(str(header_list)) pw.dlog(str(content_list)) printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes): pw = PrintWrapper(printer) header_list = [ "Init Err", "Init Raw Mag X [nT]", "Init Raw Mag Y [nT]", "Init Raw Mag Z [nT]", "Init Cal Mag X [nT]", "Init Cal Mag Y [nT]", "Init Cal Mag Z [nT]", "Init Coil X Current [mA]", "Init Coil Y Current [mA]", "Init Coil Z Current [mA]", "Init Coil X Temperature [°C]", "Init Coil Y Temperature [°C]", "Init Coil Z Temperature [°C]", "Err", "Raw Mag X [nT]", "Raw Mag Y [nT]", "Raw Mag Z [nT]", "Cal Mag X [nT]", "Cal Mag Y [nT]", "Cal Mag Z [nT]", "Coil X Current [mA]", "Coil Y Current [mA]", "Coil Z Current [mA]", "Coil X Temperature [°C]", "Coil Y Temperature [°C]", "Coil Z Temperature [°C]", "Fina Err", "Fina Raw Mag X [nT]", "Fina Raw Mag Y [nT]", "Fina Raw Mag Z [nT]", "Fina Cal Mag X [nT]", "Fina Cal Mag Y [nT]", "Fina Cal Mag Z [nT]", "Fina Coil X Current [mA]", "Fina Coil Y Current [mA]", "Fina Coil Z Current [mA]", "Fina Coil X Temperature [°C]", "Fina Coil Y Temperature [°C]", "Fina Coil Z Temperature [°C]", ] # INIT step (no coil actuation) init_err = hk_data[0] init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0] init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0] init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0] init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0] init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0] init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0] init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0] init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0] init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0] init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0] init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0] init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0] # Actuation step err = hk_data[43] raw_mag_x = struct.unpack("!f", hk_data[44:48])[0] raw_mag_y = struct.unpack("!f", hk_data[48:52])[0] raw_mag_z = struct.unpack("!f", hk_data[52:56])[0] cal_mag_x = struct.unpack("!f", hk_data[56:60])[0] cal_mag_y = struct.unpack("!f", hk_data[60:64])[0] cal_mag_z = struct.unpack("!f", hk_data[64:68])[0] coil_x_current = struct.unpack("!f", hk_data[68:72])[0] coil_y_current = struct.unpack("!f", hk_data[72:76])[0] coil_z_current = struct.unpack("!f", hk_data[76:80])[0] coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0] coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0] coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0] # FINA step (no coil actuation) fina_err = hk_data[86] fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0] fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0] fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0] fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0] fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0] fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0] fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0] fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0] fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0] fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0] fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0] fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0] validity_buffer = hk_data[129:] content_list = [ init_err, init_raw_mag_x, init_raw_mag_y, init_raw_mag_z, init_cal_mag_x, init_cal_mag_y, init_cal_mag_z, init_coil_x_current, init_coil_y_current, init_coil_z_current, init_coil_x_temperature, init_coil_y_temperature, init_coil_z_temperature, err, raw_mag_x, init_raw_mag_y, raw_mag_z, cal_mag_x, cal_mag_y, cal_mag_z, coil_x_current, coil_y_current, coil_z_current, coil_x_temperature, coil_y_temperature, coil_z_temperature, fina_err, fina_raw_mag_x, fina_raw_mag_y, fina_raw_mag_z, fina_cal_mag_x, fina_cal_mag_y, fina_cal_mag_z, fina_coil_x_current, fina_coil_y_current, fina_coil_z_current, fina_coil_x_temperature, fina_coil_y_temperature, fina_coil_z_temperature, ] num_of_vars = len(header_list) pw.dlog(str(header_list)) pw.dlog(str(content_list)) printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)