From 2bce7ffd858abc5e5edbd1939f76b1da8b929519 Mon Sep 17 00:00:00 2001 From: Markus Koller Date: Mon, 3 Oct 2022 15:08:08 +0200 Subject: [PATCH] Dublicates calibration method, added notes --- .idea/Python-PS2000B.iml | 6 +- .idea/misc.xml | 2 +- ...calibration.py => calibration_complete.py} | 0 src/calibration_simple.py | 399 +++++++++++++++ src/user_interface.py | 468 +++++++++++++++++- 5 files changed, 864 insertions(+), 11 deletions(-) rename src/{calibration.py => calibration_complete.py} (100%) create mode 100644 src/calibration_simple.py diff --git a/.idea/Python-PS2000B.iml b/.idea/Python-PS2000B.iml index 5ed0139..b7dda91 100644 --- a/.idea/Python-PS2000B.iml +++ b/.idea/Python-PS2000B.iml @@ -1,8 +1,10 @@ - - + + + + diff --git a/.idea/misc.xml b/.idea/misc.xml index d56657a..f39fd7a 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,4 +1,4 @@ - + \ No newline at end of file diff --git a/src/calibration.py b/src/calibration_complete.py similarity index 100% rename from src/calibration.py rename to src/calibration_complete.py diff --git a/src/calibration_simple.py b/src/calibration_simple.py new file mode 100644 index 0000000..84f88ac --- /dev/null +++ b/src/calibration_simple.py @@ -0,0 +1,399 @@ +from math import pi, sqrt, sin, cos +import time +from datetime import datetime +from threading import Thread +import numpy as np +import scipy.optimize + +from src.utility import ui_print +from src.exceptions import DeviceBusy, DeviceAccessError +import src.globals as g +#from src.user_interface import CalibrateMagnetometer.mgm_to_helmholtz_cos_trans + + +class AmbientFieldCalibration(Thread): + """Varies the coil-generated fields until a configuration is reached which zeros the connected magnetometer. + The magnetometer does not need to be centered. The axes of the magnetometer must match the coil configuration!""" + + # Timeout/settling time for the calibration procedure. An acceptable duration for the PI is required + SETTLE_TIME = 45 + # PID controller time delta + TIME_DELTA = 0.5 + P_CONTROL = -7e3 # 0.2 A/s slew-rate at 40uT + I_CONTROL = 0 # -1e4 # 0.01A/s slew-rate for 1uTs + I_LIMIT = 1e-7 # uTs, Limit I to 0.025 A/s slew-rate to prevent wind-up + # D_CONTROL = Not implemented for now + + def __init__(self, view_queue): + Thread.__init__(self) + self.view_queue = view_queue + + # Axis currents. Incremented by PID loop + self.axis_currents = np.array([0, 0, 0], dtype=float) + # Used for I control + self.error_integral = np.array([0, 0, 0], dtype=float) + + # Hardware checks are done in the init method to allow for exception handling in main thread + # This means the run method should/must be called directly after Thread object creation. + + # Make sure we really have magnetometer data + if not g.MAGNETOMETER.connected: + ui_print("\nError: The magnetometer is not connected. Required for ambient field calibration.") + raise DeviceAccessError("The magnetometer is not connected. Required for ambient field calibration.") + + # Acquire cage device. This resource will only be released after the thread is ended. + try: + self.cage_dev = g.CAGE_DEVICE.request_proxy() + except DeviceBusy: + ui_print("\nError: Failed to acquire coil control. Required for ambient field calibration.") + raise DeviceAccessError("Failed to acquire coil control. Required for ambient field calibration.") + + def run(self): + try: + self.calibration_procedure() + self.put_message('finished', None) + except Exception as e: + self.put_message('failed', e) + finally: + self.cage_dev.close() + + def calibration_procedure(self): + raw_experiment_data = [] + + start_time = datetime.now() + target_time = 0 + current_time = datetime.now() + while (current_time - start_time).seconds < self.SETTLE_TIME: + # Each axis runs its own PID controller. They are slightly coupled by unorthogonality, which should + # hopefully not destabilize the feedback loop + for i in range(3): + # Error in tesla + dt = self.TIME_DELTA + e = g.MAGNETOMETER.field[i] + # Change in control current + du = e * self.P_CONTROL + self.error_integral[i] * self.I_CONTROL + self.axis_currents[i] += du*dt + + # Update integral + # Add increment + self.error_integral[i] += e*dt + # Clamp range + self.error_integral = np.clip(self.error_integral, -self.I_LIMIT, self.I_LIMIT) + + # Apply new field actuation + self.cage_dev.set_signed_currents(self.axis_currents) + + # Set new progress indicator for UI + self.put_message('progress', (current_time - start_time).seconds / self.SETTLE_TIME) + + # Sleep until next iteration + target_time += self.TIME_DELTA + sleep_time = ((start_time - current_time).total_seconds() + target_time) + if sleep_time > 0: + time.sleep(sleep_time) + current_time = datetime.now() + + coil_constants = np.array([g.CAGE_DEVICE.axes[i].coil_const for i in range(3)]) + for i, axis in zip(range(3), ['x', 'y', 'z']): + raw_experiment_data.append({'axis': axis, + 'cancellation_current': -self.axis_currents[i], + 'ambient_field': -self.axis_currents[i] * coil_constants[i], + 'residual_field': g.MAGNETOMETER.field[i]}) + + results = {'ambient': -self.axis_currents, + 'ambient_ut': -self.axis_currents * coil_constants * 1e6, + 'residual': g.MAGNETOMETER.field, + 'raw_data': raw_experiment_data} + self.put_message('ambient_data', results) + + # Put device into an off and ready state + self.cage_dev.idle() + + def put_message(self, command, arg): + self.view_queue.put({'cmd': command, 'arg': arg}) + + +class CoilConstantCalibration(Thread): + MEASUREMENT_RANGE = 3 # A. Will extend into negative and positive sign + MEASUREMENT_POINTS = 4 # Excludes zero. eg 0.5, 1, 1.5, 2 + SETTLE_TIME = 3 # Time until new measurement is ready after setting current + + def __init__(self, view_queue): + Thread.__init__(self) + self.view_queue = view_queue + + # Hardware checks are done in the init method to allow for exception handling in main thread + # This means the run method should/must be called directly after Thread object creation. + + # Make sure we really have magnetometer data + if not g.MAGNETOMETER.connected: + ui_print("\nError: The magnetometer is not connected. Required for ambient field calibration.") + raise DeviceAccessError("The magnetometer is not connected. Required for ambient field calibration.") + + # Acquire cage device. This resource will only be released after the thread is ended. + try: + self.cage_dev = g.CAGE_DEVICE.request_proxy() + except DeviceBusy: + ui_print("\nError: Failed to acquire coil control. Required for ambient field calibration.") + raise DeviceAccessError("Failed to acquire coil control. Required for ambient field calibration.") + + def run(self): + try: + self.calibration_procedure() + self.put_message('finished', None) + except Exception as e: + self.put_message('failed', e) + finally: + self.cage_dev.close() + + def calibration_procedure(self): + # All generated fields will be compared to this using a simple difference method + ambient_field = g.MAGNETOMETER.field + + # This generates linearly spaced current setpoints and excludes zero + currents = np.linspace(-self.MEASUREMENT_RANGE, self.MEASUREMENT_RANGE, self.MEASUREMENT_POINTS * 2 + 1) + currents = np.delete(currents, self.MEASUREMENT_POINTS) + + # Stores three vectors that correspond to the x,y,z actuation field directions. + axis_field_directions = [] + + # Result variables + coil_constants = np.zeros(3) + k_deviations = np.zeros(3) + raw_experiment_data = [] + + # The coil constant must be determined for every axis + for i in range(3): + k_samples = [] + for c_idx, c in enumerate(currents): + # Set current + c_vec = [0, 0, 0] + c_vec[i] = c + self.cage_dev.set_signed_currents(c_vec) + time.sleep(self.SETTLE_TIME) + + # Get coil constant + field = g.MAGNETOMETER.field + field_diff_mag = np.linalg.norm(field - ambient_field) + sign = 1 if field[i] - ambient_field[i] >= 0 else -1 + k = (field_diff_mag * sign) / c + k_samples.append(k) + + # Save vector as principal coil direction if it is the last sample with the largest positive current + if c_idx == currents.shape[0] - 1: + axis_field_directions.append(g.MAGNETOMETER.field - ambient_field) + + # Set new progress indicator for UI + self.put_message('progress', ((c_idx / (self.MEASUREMENT_POINTS * 2)) + i) / 3) + + # Save into raw data vector + raw_experiment_data.append({'axis': i, + 'I_x': c_vec[0], + 'I_y': c_vec[1], + 'I_z': c_vec[2], + 'delta_mag_B': field_diff_mag, + 'sign': sign, + 'K': k}) + + # Average samples for axis + coil_constants[i] = np.average(k_samples) + k_deviations[i], _ = self.calculate_standard_deviation(k_samples) + + # Put device into an off and ready state + self.cage_dev.idle() + + angles = {'xy': self.angle_between(axis_field_directions[0], axis_field_directions[1]) * 180 / pi, + 'yz': self.angle_between(axis_field_directions[1], axis_field_directions[2]) * 180 / pi, + 'xz': self.angle_between(axis_field_directions[0], axis_field_directions[2]) * 180 / pi} + self.put_message('coil_constant_results', {'k': coil_constants, + 'k_dev': k_deviations, + 'angle': angles, + 'raw_data': raw_experiment_data}) + + @staticmethod + def calculate_standard_deviation(data): + n = len(data) + average = 0 + for datapoint in data: + average += datapoint / n + std_dev = 0 + deviations = [] + for datapoint in data: + std_dev += (datapoint - average) ** 2 + deviations.append(datapoint - average) + std_dev = sqrt(std_dev / n) + return std_dev, deviations + + @staticmethod + def angle_between(v1, v2): + """ Returns the angle in radians between vectors 'v1' and 'v2'""" + v1_u = v1 / np.linalg.norm(v1) + v2_u = v2 / np.linalg.norm(v2) + return np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0)) + + def put_message(self, command, arg): + self.view_queue.put({'cmd': command, 'arg': arg}) + + +class MagnetometerCalibration(Thread): + TEST_VECTOR_MAGNITUDE = 100e-6 # In Tesla. Chosen so it can be achieved with a 3A PSU. + + def __init__(self, view_queue, calibration_points, calibration_interval, mgm_to_helmholtz_cos_trans): + Thread.__init__(self) + self.view_queue = view_queue + self.calibration_points = calibration_points + self.calibration_interval = calibration_interval + self.matrix_trans_mgm_to_hh = [[x.get() for x in row] for row in mgm_to_helmholtz_cos_trans] + + # Hardware checks are done in the init method to allow for exception handling in main thread + # This means the run method should/must be called directly after Thread object creation. + + # Make sure we really have magnetometer data + if not g.MAGNETOMETER.connected: + ui_print("\nError: The magnetometer is not connected. Required for ambient field calibration.") + raise DeviceAccessError("The magnetometer is not connected. Required for ambient field calibration.") + + # Acquire cage device. This resource will only be released after the thread is ended. + try: + self.cage_dev = g.CAGE_DEVICE.request_proxy() + except DeviceBusy: + ui_print("\nError: Failed to acquire coil control. Required for ambient field calibration.") + raise DeviceAccessError("Failed to acquire coil control. Required for ambient field calibration.") + + def run(self): + try: + self.calibration_procedure() + self.put_message('finished', None) + except Exception as e: + self.put_message('failed', e) + finally: + self.cage_dev.close() + + def calibration_procedure(self): + # According to method outlined in: + # Zikmund, A. & Janosek, Michal. (2014). Calibration procedure for triaxial magnetometers without a compensating + # system or moving parts. + + # This contains the raw experiment data for exporting + # Each row is a dict containing the applied vector and measured vector + raw_data = [] + + # Find sensor offsets. They must be found prior to applying the chosen calibration algorithm + # This will be accurate if the cage was recently calibrated + self.cage_dev.set_field_compensated([0, 0, 0]) + # Sleep for a certain duration to allow psu to stabilize output and magnetometer to supply readings + time.sleep(self.calibration_interval) + matrix_trans_mgm_to_hh_np = np.array(self.matrix_trans_mgm_to_hh) + # The offsets can easily be read from the magnetometer + offsets = matrix_trans_mgm_to_hh_np.dot(g.MAGNETOMETER.field) + # Save data point to raw_data list + raw_data.append({'applied_x': 0, 'applied_y': 0, 'applied_z': 0, + 'measured_x': offsets[0], 'measured_y': offsets[1], 'measured_z': offsets[2]}) + # Set new progress indicator for UI + self.set_progress(True, 0) + + # Generate our set of test vectors + test_vectors = self.fibonacci_sphere(self.calibration_points) + + # Holds the knowns for each row of our system of equations. These are M, B_x, B_y, B_z + # (B_E is constant for the test and not stored in the array) + # Each sensor axis has its own independent system of equations + samples = [[], [], []] + # Collect sensor data for each test vector + for vec_idx, test_vec in enumerate(test_vectors): + # Command output + applied_vec = test_vec * self.TEST_VECTOR_MAGNITUDE + self.cage_dev.set_field_raw(applied_vec) + + # Sleep for a certain duration to allow psu to stabilize output and magnetometer to supply readings + time.sleep(self.calibration_interval) + + # Read output and save to array for solver later + raw_reading = matrix_trans_mgm_to_hh_np.dot(g.MAGNETOMETER.field) + reading = raw_reading - offsets + for i in range(3): + row = {'m': reading[i], 'b_x': applied_vec[0], 'b_y': applied_vec[1], 'b_z': applied_vec[2]} + # print("[Axis {}] {}".format(i, row)) + samples[i].append(row) + + # Save data point to raw_data list + raw_data.append({'applied_x': applied_vec[0], 'applied_y': applied_vec[1], 'applied_z': applied_vec[2], + 'measured_x': raw_reading[0], 'measured_y': raw_reading[1], 'measured_z': raw_reading[2]}) + + # Set new progress indicator for UI + self.set_progress(True, vec_idx + 1) + + # Put device into an off and ready state + self.cage_dev.idle() + + # Use collected data to build and solve system of equations + sensor_parameters = self.solve_system(samples, offsets) + + # Pass results to UI + self.put_message('calibration_data', {'results': sensor_parameters, 'raw_data': raw_data}) + + def set_progress(self, offset_complete, test_vec_index): + progress = int(offset_complete) * 0.2 + (test_vec_index / self.calibration_points) * 0.8 + self.put_message('progress', progress) + + def solve_system(self, samples, offset_data): + # Calculate magnitude of ambient field + b_e_x = g.CAGE_DEVICE.axes[0].ambient_field + b_e_y = g.CAGE_DEVICE.axes[1].ambient_field + b_e_z = g.CAGE_DEVICE.axes[2].ambient_field + b_e = sqrt(b_e_x**2 + b_e_y**2 + b_e_z**2) + + # Perform least squares optimization on all magnetometer axes + sensor_parameters = [] + for axis, axis_samples in enumerate(samples): + result = scipy.optimize.least_squares(self.residual_function, (1.0, pi/4, pi/4, pi/4), args=(b_e, axis_samples), gtol=1e-13) + s, alpha_e, alpha, beta = result.x + residual = np.max(np.abs(result.fun)) + sensor_parameters.append({'sensitivity': s, + 'offset': offset_data[axis], + 'alpha_e': alpha_e, + 'alpha': alpha, + 'beta': beta, + 'residual': residual}) + + return sensor_parameters + + # Function passed to scipy for the optimization + @staticmethod + def residual_function(x, b_e, samples): + # Unpack vector. These unknown parameters are described in the calibration paper + s, alpha_e, alpha, beta = x + # Residual vector + res = [] + for sample in samples: + # Unpack row coefficients: + m = sample['m'] + b_x = sample['b_x'] + b_y = sample['b_y'] + b_z = sample['b_z'] + res.append(m - s * (b_e*sin(alpha_e) + b_x*cos(alpha)*cos(beta) + b_y*cos(alpha)*sin(beta) + b_z*sin(alpha))) + return res + + @staticmethod + def fibonacci_sphere(samples): + """ + Algorithm to generate roughly equally spaced points on a sphere + From https://stackoverflow.com/a/26127012""" + points = [] + phi = pi * (3.0 - sqrt(5.0)) # golden angle in radians + + for i in range(samples): + y = 1 - (i / float(samples - 1)) * 2 # y goes from 1 to -1 + radius = sqrt(1 - y * y) # radius at y + + theta = phi * i # golden angle increment + + x = cos(theta) * radius + z = sin(theta) * radius + + points.append(np.array([x, y, z])) + + return points + + def put_message(self, command, arg): + self.view_queue.put({'cmd': command, 'arg': arg}) \ No newline at end of file diff --git a/src/user_interface.py b/src/user_interface.py index 9f54389..3a56d2d 100644 --- a/src/user_interface.py +++ b/src/user_interface.py @@ -25,7 +25,8 @@ import src.globals as g import src.csv_threading as csv_threading import src.config_handling as config import src.csv_logging as log -from src.calibration import AmbientFieldCalibration, CoilConstantCalibration, MagnetometerCalibration +from src.calibration_simple import AmbientFieldCalibration, CoilConstantCalibration, MagnetometerCalibration +from src.calibration_complete import AmbientFieldCalibration, CoilConstantCalibration, MagnetometerCalibration from src.exceptions import DeviceAccessError from src.utility import ui_print, save_dict_list_to_csv import src.helmholtz_cage_device as helmholtz_cage_device @@ -78,7 +79,8 @@ class HelmholtzGUI(Tk): for P in [ManualMode, HardwareConfiguration, CalibrateAmbientField, - CalibrateMagnetometer, + CalibrateMagnetometerSimple, + CalibrateMagnetometerComplete, ExecuteCSVMode, ConfigureLogging]: # do this for every mode page page = P(main_area, self) # initialize the page with the main_area frame as the parent @@ -106,7 +108,8 @@ class TopMenu: mode_selector.add_command(label="Static Manual Input", command=self.manual_mode) mode_selector.add_command(label="Execute CSV Sequence", command=self.execute_csv_mode) mode_selector.add_command(label="Calibrate Ambient Field", command=self.calibrate_ambient) - mode_selector.add_command(label="Calibrate Magnetometer", command=self.calibrate_magnetometer) + mode_selector.add_command(label="Calibrate Magnetometer Simple", command=self.calibrate_magnetometer_simple) + mode_selector.add_command(label="Calibrate Magnetometer Complete", command=self.calibrate_magnetometer_complete) mode_selector.add_separator() mode_selector.add_command(label="Configure Data Logging", command=self.logging) mode_selector.add_command(label="Settings...", command=self.configuration) @@ -120,8 +123,11 @@ class TopMenu: def calibrate_ambient(self): self.window.show_frame(CalibrateAmbientField) - def calibrate_magnetometer(self): - self.window.show_frame(CalibrateMagnetometer) + def calibrate_magnetometer_simple(self): + self.window.show_frame(CalibrateMagnetometerSimple) + + def calibrate_magnetometer_complete(self): + self.window.show_frame(CalibrateMagnetometerComplete) def execute_csv_mode(self): # switch to the CSV execution page self.window.show_frame(ExecuteCSVMode) @@ -940,7 +946,7 @@ class CalibrateAmbientField(Frame): self.update() -class CalibrateMagnetometer(Frame): +class CalibrateMagnetometerSimple(Frame): def __init__(self, parent, controller): Frame.__init__(self, parent) self.parent = parent @@ -986,7 +992,447 @@ class CalibrateMagnetometer(Frame): row_counter = 0 # Create headline - header = Label(self.left_column, text="Magnetometer Calibration", font=HEADER_FONT) + header = Label(self.left_column, text="Magnetometer Calibration\n-simlified method-", font=HEADER_FONT) + header.grid(row=row_counter, column=0, columnspan=2, padx=100, pady=20, sticky="nw") + row_counter += 1 + + # Magnetometer connected indicator + connected_status_frame = Frame(self.left_column) + connected_status_frame.grid(row=row_counter, column=0, sticky="nw") + connected_label = Label(connected_status_frame, text="Magnetometer state:", font=SUB_HEADER_FONT) + connected_label.grid(row=0, column=0, padx=10, pady=20, sticky="nw") + self.connected_state_label = Label(connected_status_frame, textvariable=self.connected_state_var, fg="red") + self.connected_state_label.grid(row=0, column=1, padx=10, pady=20, sticky="nw") + row_counter += 1 + + # Magnetometer field data grid + field_data_frame = Frame(self.left_column) + field_data_frame.grid(row=row_counter, column=0, sticky="nw") + field_data_label = Label(field_data_frame, text="Field data:", font=SUB_HEADER_FONT) + field_data_label.grid(row=0, column=0, padx=10, pady=3, sticky="nw") + axis_labels = ['X:', 'Y:', 'Z:'] + for i in range(3): + field_data_axis_label = Label(field_data_frame, text=axis_labels[i]) + field_data_axis_label.grid(row=i, column=1, padx=10, pady=3) + + field_data_axis_data = Label(field_data_frame, textvariable=self.field_value_vars[i]) + field_data_axis_data.grid(row=i, column=2, padx=(20, 0), pady=3) + + field_data_axis_units = Label(field_data_frame, text="\u03BCT") + field_data_axis_units.grid(row=i, column=3, padx=5, pady=3) + row_counter += 1 + + # Centered controls + controls_frame = Frame(self.left_column) + controls_frame.grid(row=row_counter, column=0, sticky="sw") + # Number of calibration points + calibration_point_nr_label = Label(controls_frame, text="# of calibration points") + calibration_point_nr_label.grid(row=0, column=0, pady=5, sticky="w") + calibration_point_nr_entry = Entry(controls_frame, textvariable=self.calibration_points_var) + calibration_point_nr_entry.grid(row=0, column=1, pady=5, sticky="w") + # Measurement interval + calibration_point_nr_label = Label(controls_frame, text="Measurement interval [s]") + calibration_point_nr_label.grid(row=1, column=0, pady=5, sticky="w") + calibration_point_nr_entry = Entry(controls_frame, textvariable=self.calibration_interval_var) + calibration_point_nr_entry.grid(row=1, column=1, pady=5, sticky="w") + # Calibration start buttons + start_button_frame = Frame(controls_frame) + start_button_frame.grid(row=2, column=0, columnspan=2) + self.start_calibration_button = Button(start_button_frame, text="Start Calibration", + command=self.start_calibration_procedure, + pady=5, padx=5, font=SMALL_BUTTON_FONT) + self.start_calibration_button.grid(row=0, column=0, padx=10, pady=(30, 10)) + # Calibration progress bar + progress_bar_frame = Frame(controls_frame) + progress_bar_frame.grid(row=3, column=0, columnspan=2) + calibration_procedure_progress_label = Label(progress_bar_frame, text="Progress:") + calibration_procedure_progress_label.grid(row=0, column=0, padx=10, pady=10) + calibration_procedure_progress = ttk.Progressbar(progress_bar_frame, + length=240, + variable=self.calibration_procedure_progress_var) + calibration_procedure_progress.grid(row=0, column=1, padx=10, pady=10, sticky="we") + row_counter += 1 + + # CENTER COLUMN + # Magnetometer calibration results + row_counter = 0 + calibration_results_frame = LabelFrame(self.right_column, text="Magnetometer Results") + calibration_results_frame.grid(row=row_counter, column=1, padx=(100, 0), pady=20, sticky="nw") + for i, label in enumerate(['X', 'Y', 'Z']): + axis_label = Label(calibration_results_frame, text=label) + axis_label.grid(row=0, column=i + 1, padx=5, pady=5, sticky="nw") + # Axis sensitivities + sensitivity_results_label = Label(calibration_results_frame, text="Sensitivity:") + sensitivity_results_label.grid(row=1, column=0, padx=5, pady=5, sticky="nw") + for i in range(3): + axis_data = Entry(calibration_results_frame, + textvariable=self.sensitivity_result_vars[i], + width=15, + state='readonly') + axis_data.grid(row=1, column=i + 1, padx=5, pady=5, sticky="nw") + sensitivity_results_unit = Label(calibration_results_frame, text="-") + sensitivity_results_unit.grid(row=1, column=4, padx=5, pady=5, sticky="nw") + # Axis offsets + offset_results_label = Label(calibration_results_frame, text="Offset:") + offset_results_label.grid(row=2, column=0, padx=5, pady=5, sticky="nw") + for i in range(3): + axis_data = Entry(calibration_results_frame, + textvariable=self.offset_result_vars[i], + width=15, + state='readonly') + axis_data.grid(row=2, column=i + 1, padx=5, pady=5, sticky="nw") + offset_results_unit = Label(calibration_results_frame, text="\u03BCT") + offset_results_unit.grid(row=2, column=4, padx=5, pady=5, sticky="nw") + # Angle to XY coil plane + angle_to_plane_label = Label(calibration_results_frame, text="Angle to XY plane:") + angle_to_plane_label.grid(row=3, column=0, padx=5, pady=5, sticky="nw") + for i in range(3): + axis_data = Entry(calibration_results_frame, + textvariable=self.angle_to_plane_result_vars[i], + width=15, + state='readonly') + axis_data.grid(row=3, column=i + 1, padx=5, pady=5, sticky="nw") + angle_to_plane_unit = Label(calibration_results_frame, text="°") + angle_to_plane_unit.grid(row=3, column=4, padx=5, pady=5, sticky="nw") + # Angle in XY coil plane + angle_in_plane_label = Label(calibration_results_frame, text="Angle in XY plane:") + angle_in_plane_label.grid(row=4, column=0, padx=5, pady=5, sticky="nw") + for i in range(3): + axis_data = Entry(calibration_results_frame, + textvariable=self.angle_in_plane_result_vars[i], + width=15, + state='readonly') + axis_data.grid(row=4, column=i + 1, padx=5, pady=5, sticky="nw") + angle_in_plane_unit = Label(calibration_results_frame, text="°") + angle_in_plane_unit.grid(row=4, column=4, padx=5, pady=5, sticky="nw") + # Residual in system of equations + residual_label = Label(calibration_results_frame, text="Residual:") + residual_label.grid(row=5, column=0, padx=5, pady=5, sticky="nw") + for i in range(3): + axis_data = Entry(calibration_results_frame, + textvariable=self.residual_result_vars[i], + width=15, + state='readonly') + axis_data.grid(row=5, column=i + 1, padx=5, pady=5, sticky="nw") + residual_unit = Label(calibration_results_frame, text="\u03BCT") + residual_unit.grid(row=5, column=4, padx=5, pady=5, sticky="nw") + # Save calibration buttons + save_calibration_results_frame = Frame(calibration_results_frame) + save_calibration_results_frame.grid(row=6, column=0, columnspan=5) + # Notes on the calibration method + sensitivity_results_label = Label(calibration_results_frame, text="Sensitivity:") + sensitivity_results_label.grid(row=1, column=0, padx=5, pady=5, sticky="nw") + + # Save and apply + self.export_calibration_button = Button(save_calibration_results_frame, text="Export raw to CSV", + command=self.export_csv_calibration_raw_results, + state="disabled", + pady=5, padx=5) + self.export_calibration_button.grid(row=0, column=0, padx=5, pady=5) + self.copy_calibration_button = Button(save_calibration_results_frame, text="Copy to clipboard", + command=self.copy_to_clipboard, + state="disabled", + pady=5, padx=5) + self.copy_calibration_button.grid(row=0, column=1, padx=5, pady=5) + row_counter += 1 + # Notes on the calibration method + calibration_method_notes_frame = LabelFrame(self.right_column, text="Calibration method notes:") + calibration_method_notes_frame.grid(row=row_counter, column=1, padx=(100, 0), pady=20, sticky="nw") + label = "-Implementation according to Zikmund et al. [DOI: 10.1109/I2MTC.2014.6860790]\n-Points created by Fibonacci sphere\n-Only accounts for hard-iron offset and MGM scaling errors!" + calibration_method_notes = Label(calibration_method_notes_frame, anchor='w', justify='left', text=label) + calibration_method_notes.grid(row=3, column=0, padx=5, pady=5, sticky="nw") + # FLAG + + # RIGHT COLUMN + # Input coordinate system conversion matrix + row_counter = 0 + input_cos_frame = LabelFrame(self.right_column, text="Input MGM to Helmholtz COS Transformation Matrix") + input_cos_frame.grid(row=row_counter, column=2, padx=(100, 0), pady=20, sticky="nw") + for i, label in enumerate(['X', 'Y', 'Z']): + axis_label = Label(input_cos_frame, text=label) + axis_label.grid(row=0, column=i + 1, padx=5, pady=5, sticky="nw") + # Axis sensitivities + sensitivity_results_label = Label(input_cos_frame, text="X") + sensitivity_results_label.grid(row=1, column=0, padx=5, pady=5, sticky="nw") + for i in range(3): + axis_data = Entry(input_cos_frame, + textvariable=self.mgm_to_helmholtz_cos_trans[0][i], + width=15) + axis_data.grid(row=1, column=i + 1, padx=5, pady=5, sticky="nw") + sensitivity_results_unit = Label(input_cos_frame, text="-") + sensitivity_results_unit.grid(row=1, column=4, padx=5, pady=5, sticky="nw") + # Axis offsets + offset_results_label = Label(input_cos_frame, text="Y") + offset_results_label.grid(row=2, column=0, padx=5, pady=5, sticky="nw") + for i in range(3): + axis_data = Entry(input_cos_frame, + textvariable=self.mgm_to_helmholtz_cos_trans[1][i], + width=15) + axis_data.grid(row=2, column=i + 1, padx=5, pady=5, sticky="nw") + offset_results_unit = Label(input_cos_frame, text="-") + offset_results_unit.grid(row=2, column=4, padx=5, pady=5, sticky="nw") + # Angle to XY coil plane + angle_to_plane_label = Label(input_cos_frame, text="Z") + angle_to_plane_label.grid(row=3, column=0, padx=5, pady=5, sticky="nw") + for i in range(3): + axis_data = Entry(input_cos_frame, + textvariable=self.mgm_to_helmholtz_cos_trans[2][i], + width=15) + axis_data.grid(row=3, column=i + 1, padx=5, pady=5, sticky="nw") + angle_to_plane_unit = Label(input_cos_frame, text="-") + angle_to_plane_unit.grid(row=3, column=4, padx=5, pady=5, sticky="nw") + # Note on input + label = "Note:" + axis_note = Label(input_cos_frame, text=label) + axis_note.grid(row=4, column=0, padx=5, pady=5, sticky="nw") + label = "-Transfers fields value of Helmholtz cage to COS of MGM\n-B_mgm = mgm_T_hh * B_hh" + axis_note = Label(input_cos_frame, anchor='w', justify='left', text=label) + axis_note.grid(row=4, column=1, padx=5, pady=5, columnspan=4, sticky="nw") + # Save calibration buttons + save_input_cos_frame = Frame(input_cos_frame) + save_input_cos_frame.grid(row=6, column=0, columnspan=5) + # Save and apply + self.export_cos_trans_button = Button(save_input_cos_frame, text="Export to CSV", + command=self.export_csv_cos_trans_matrix, + state="normal", + pady=5, padx=5) + self.export_cos_trans_button.grid(row=0, column=0, padx=5, pady=5) + self.copy_cos_trans_matrix_button = Button(save_input_cos_frame, text="Copy to clipboard", + command=self.copy_to_clipboard_cos_trans_matrix, + state="normal", + pady=5, padx=5) + self.copy_cos_trans_matrix_button.grid(row=0, column=1, padx=5, pady=5) + self.normalize_matrix_button = Button(save_input_cos_frame, text="Orthonormalize matrix", + command=self.matrix_normalize, + state="normal", + pady=5, padx=5) + self.normalize_matrix_button.grid(row=0, column=2, padx=5, pady=5) + + row_counter += 1 + + # This starts an endless polling loop + self.update_view() + + def page_switch(self): + # every class in the UI needs this, even if it doesn't do anything + pass + + def update_view(self): + # Get new connected status + if g.MAGNETOMETER.connected: + self.connected_state_var.set("connected") + self.connected_state_label.configure(fg="green") + else: + self.connected_state_var.set("Not connected") + self.connected_state_label.configure(fg="red") + + # Get new field data + new_field = g.MAGNETOMETER.field + for i in range(3): + # Display in uT + self.field_value_vars[i].set("{:.3f}".format(new_field[i] * 1e6)) + + # Get mpi messages from calibration procedures + try: + while True: + msg = self.view_mpi_queue.get(block=False) + cmd = msg['cmd'] + arg = msg['arg'] + if cmd == 'finished': + self.reactivate_buttons() + elif cmd == 'failed': + messagebox.showerror("Calibration error", "Error occured during calibration:\n{}".format(arg)) + self.reactivate_buttons() + elif cmd == 'progress': + self.calibration_procedure_progress_var.set(min(int(arg * 100), 100)) + elif cmd == 'calibration_data': + self.display_calibration_results(arg) + else: + ui_print("Error: Unexpected mpi command '{}' in CalibrationTool".format(cmd)) + except queue.Empty: + pass + + self.controller.after(500, self.update_view) + + def reactivate_buttons(self): + self.start_calibration_button.configure(text="Start Calibration", state=NORMAL) + self.calibration_procedure_progress_var.set(0) + + def deactivate_buttons(self): + self.start_calibration_button.configure(text="Running...", state=DISABLED) + + def display_calibration_results(self, results): + # Cache raw experiment data for saving later + self.calibration_raw_results = results['raw_data'] + + # Unpack the dict + results = results['results'] + + # Display calibration in GUI + for i in range(3): + self.sensitivity_result_vars[i].set("{:.3f}".format(results[i]['sensitivity'])) + self.offset_result_vars[i].set("{:.3f}".format(results[i]['offset'] * 1e6)) + self.angle_to_plane_result_vars[i].set("{:.3f}".format(results[i]['alpha'] * 180 / pi)) + self.angle_in_plane_result_vars[i].set("{:.3f}".format(results[i]['beta'] * 180 / pi)) + self.residual_result_vars[i].set("{:.3e}".format(results[i]['residual'] * 1e6)) + + # Populate clipboard string + self.clipboard = "\tX\tY\tZ\n" + self.clipboard += "Sensitivity [-]" + for i in range(3): + self.clipboard += "\t{:.3f}".format(results[i]['sensitivity']) + self.clipboard += "\nOffset [uT]" + for i in range(3): + self.clipboard += "\t{:.3f}".format(results[i]['offset'] * 1e6) + self.clipboard += "\nAngle to XY Plane [deg]" + for i in range(3): + self.clipboard += "\t{:.3f}".format(results[i]['alpha'] * 180 / pi) + self.clipboard += "\nAngle in XY Plane [deg]" + for i in range(3): + self.clipboard += "\t{:.3f}".format(results[i]['beta'] * 180 / pi) + self.clipboard += "\nResidual [uT]" + for i in range(3): + self.clipboard += "\t{:.3e}".format(results[i]['residual'] * 1e6) + + # Enable save buttons + self.export_calibration_button.configure(state="normal") + self.copy_calibration_button.configure(state="normal") + # self.export_mgm_button.configure(state="normal") + + def start_calibration_procedure(self): + try: + calibration_points = self.calibration_points_var.get() + calibration_interval = self.calibration_interval_var.get() + self.calibration_thread = MagnetometerCalibration(self.view_mpi_queue, + calibration_points, + calibration_interval, + self.mgm_to_helmholtz_cos_trans) + self.calibration_thread.start() + self.deactivate_buttons() + except (DeviceAccessError, TclError) as e: + messagebox.showwarning("Calibration failed", "Failed to start calibration:\n{}".format(e)) + + def export_csv_calibration_raw_results(self): + if self.calibration_raw_results is None: + ui_print("Error: Failed to export non-existent calibration data.") + return + save_dict_list_to_csv('magnetometer_calibration.csv', self.calibration_raw_results, query_path=True) + ui_print("Saved calibration results to magnetometer_calibration.csv.") + + def export_csv_cos_trans_matrix(self): + cos_trans_matrix = [ + {'XX': "{:.5f}".format(self.mgm_to_helmholtz_cos_trans[0][0].get()), + 'XY': "{:.5f}".format(self.mgm_to_helmholtz_cos_trans[0][1].get()), + 'XZ': "{:.5f}".format(self.mgm_to_helmholtz_cos_trans[0][2].get()), + 'YX': "{:.5f}".format(self.mgm_to_helmholtz_cos_trans[1][0].get()), + 'YY': "{:.5f}".format(self.mgm_to_helmholtz_cos_trans[1][1].get()), + 'YZ': "{:.5f}".format(self.mgm_to_helmholtz_cos_trans[1][2].get()), + 'ZX': "{:.5f}".format(self.mgm_to_helmholtz_cos_trans[2][0].get()), + 'ZY': "{:.5f}".format(self.mgm_to_helmholtz_cos_trans[2][1].get()), + 'ZZ': "{:.5f}".format(self.mgm_to_helmholtz_cos_trans[2][2].get())} + ] + if cos_trans_matrix is None: + ui_print("Error: Failed to export non-existent coordinate transformation matrix.") + return + save_dict_list_to_csv('magnetometer_cos_trans_matrix.csv', cos_trans_matrix, query_path=True) + ui_print("Saved MGM to Helmholtz coordinate transformation matrix to magnetometer_cos_trans_matrix.csv.") + + def copy_to_clipboard(self): + self.clipboard_clear() + self.clipboard_append(self.clipboard) + self.update() + + def copy_to_clipboard_cos_trans_matrix(self): + # Populate clipboard for coordinate transformation matrix + self.cos_trans_matrix_clipboard = "\tX\tY\tZ\n" + self.cos_trans_matrix_clipboard += "X\t{:.5f}".format(self.mgm_to_helmholtz_cos_trans[0][0].get()) + self.cos_trans_matrix_clipboard += "\t{:.5f}".format(self.mgm_to_helmholtz_cos_trans[0][1].get()) + self.cos_trans_matrix_clipboard += "\t{:.5f}\n".format(self.mgm_to_helmholtz_cos_trans[0][2].get()) + self.cos_trans_matrix_clipboard += "Y\t{:.5f}".format(self.mgm_to_helmholtz_cos_trans[1][0].get()) + self.cos_trans_matrix_clipboard += "\t{:.5f}".format(self.mgm_to_helmholtz_cos_trans[1][1].get()) + self.cos_trans_matrix_clipboard += "\t{:.5f}\n".format(self.mgm_to_helmholtz_cos_trans[1][2].get()) + self.cos_trans_matrix_clipboard += "Z\t{:.5f}".format(self.mgm_to_helmholtz_cos_trans[2][0].get()) + self.cos_trans_matrix_clipboard += "\t{:.5f}".format(self.mgm_to_helmholtz_cos_trans[2][1].get()) + self.cos_trans_matrix_clipboard += "\t{:.5f}\n".format(self.mgm_to_helmholtz_cos_trans[2][2].get()) + self.clipboard_clear() + self.clipboard_append(self.cos_trans_matrix_clipboard) + self.update() + + def matrix_normalize(self): + try: + ui_print("Input matrix to be normalized:") + # Normalize Matrix + matrix = [[x.get() for x in row] for row in self.mgm_to_helmholtz_cos_trans] + matrix = np.array(matrix) + ui_print(matrix) + #matrix_max = matrix.max() + #matrix_min = matrix.min() + #matrix = (matrix - matrix_min) / (matrix_max - matrix_min) + + def gram_schmidt_columns(X): + Q, R = np.linalg.qr(X) + return Q + + matrix = gram_schmidt_columns(matrix) + ui_print("Normalized matrix (Gram-Schmidt):") + ui_print(matrix) + for i in range(3): + for j in range(3): + self.mgm_to_helmholtz_cos_trans[i][j].set(matrix[i][j]) + except: + # Couldn't compute matrix -> use unity matrix + ui_print("Could not normalize matrix, reverted to unity matrix!") + self.mgm_to_helmholtz_cos_trans = [[DoubleVar(value=1), DoubleVar(value=0), DoubleVar(value=0)], + [DoubleVar(value=0), DoubleVar(value=1), DoubleVar(value=0)], + [DoubleVar(value=0), DoubleVar(value=0), DoubleVar(value=1)]] + +class CalibrateMagnetometerComplete(Frame): + def __init__(self, parent, controller): + Frame.__init__(self, parent) + self.parent = parent + self.controller = controller + + # To center window + # self.columnconfigure(0, weight=1) + self.rowconfigure(0, weight=1) + self.left_column = Frame(self) + self.left_column.grid(row=0, column=0, sticky="nsew") + self.right_column = Frame(self) + self.right_column.grid(row=0, column=1, sticky="nsew") + self.left_column.rowconfigure(3, weight=1) + + # Thread variables + self.calibration_thread = None + self.view_mpi_queue = Queue() # Receives status information from calibration procedure threads. + + # UI variables + self.connected_state_var = StringVar(value="Not connected") + self.field_value_vars = [StringVar(value="No data"), + StringVar(value="No data"), + StringVar(value="No data")] + self.calibration_procedure_progress_var = IntVar(value=0) + # Calibration parameters + self.calibration_points_var = IntVar(value=8) + self.calibration_interval_var = DoubleVar(value=5) + # Calibration results + self.sensitivity_result_vars = [StringVar(), StringVar(), StringVar()] + self.offset_result_vars = [StringVar(), StringVar(), StringVar()] + self.angle_to_plane_result_vars = [StringVar(), StringVar(), StringVar()] + self.angle_in_plane_result_vars = [StringVar(), StringVar(), StringVar()] + self.residual_result_vars = [StringVar(), StringVar(), StringVar()] + self.calibration_raw_results = None # Cached raw experiment data to allow for saving to csv. + self.clipboard = "" # Clipboard string containing results + self.cos_trans_matrix_clipboard = "" # Clipboard string containing coordinate transformation matrix + + self.mgm_to_helmholtz_cos_trans = [[DoubleVar(value=1), DoubleVar(value=0), DoubleVar(value=0)], + [DoubleVar(value=0), DoubleVar(value=1), DoubleVar(value=0)], + [DoubleVar(value=0), DoubleVar(value=0), DoubleVar(value=1)]] + + # UI Elements + row_counter = 0 + + # Create headline + header = Label(self.left_column, text="Magnetometer Calibration\n-ellipsoid fitting method-", font=HEADER_FONT) header.grid(row=row_counter, column=0, columnspan=2, padx=100, pady=20, sticky="nw") row_counter += 1 @@ -1125,6 +1571,13 @@ class CalibrateMagnetometer(Frame): pady=5, padx=5) self.copy_calibration_button.grid(row=0, column=1, padx=5, pady=5) row_counter += 1 + # Notes on the calibration method + calibration_method_notes_frame = LabelFrame(self.right_column, text="Calibration method notes:") + calibration_method_notes_frame.grid(row=row_counter, column=1, padx=(100, 0), pady=20, sticky="nw") + label = "-Implementation of calibration according to Kok et al. [ISBN: 978-0-9824438-5-9]\n-Implementation of ellipsoid fit according to Li et al. [DOI: 10.1109/GMAP.2004.1290055]\n-Points created by Fibonacci sphere\n-Accounts for soft-iron and hard-iron effects!" + calibration_method_notes = Label(calibration_method_notes_frame, anchor='w', justify='left', text=label) + calibration_method_notes.grid(row=1, column=0, padx=5, pady=5, sticky="nw") + #FLAG # RIGHT COLUMN # Input coordinate system conversion matrix @@ -1193,7 +1646,6 @@ class CalibrateMagnetometer(Frame): row_counter += 1 - # This starts an endless polling loop self.update_view()