From 244aaa8e891e68389dc5f146088e481fbf4ed911 Mon Sep 17 00:00:00 2001 From: Leon Teichroeb Date: Fri, 20 Aug 2021 13:28:06 +0200 Subject: [PATCH] Added calibration method for magnetometers. --- requirements.txt | 5 +- src/calibration.py | 158 +++++++++++++++++++++++++++++++++++++-- src/socket_control.py | 7 ++ src/user_interface.py | 170 +++++++++++++++++++++++++++++++++++++++++- 4 files changed, 331 insertions(+), 9 deletions(-) diff --git a/requirements.txt b/requirements.txt index 088bdfa..d2477a6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ -numpy==1.19.3 # bug in numpy 1.19.4, 1.19.3 used as workaround +numpy==1.19.3 pyserial~=3.5 future~=0.18.2 pandas~=1.1.5 -matplotlib~=3.3.2 \ No newline at end of file +matplotlib~=3.3.2 +scipy \ No newline at end of file diff --git a/src/calibration.py b/src/calibration.py index e5e411d..a501302 100644 --- a/src/calibration.py +++ b/src/calibration.py @@ -1,9 +1,9 @@ -from math import pi, sqrt +from math import pi, sqrt, sin, cos import time from datetime import datetime from threading import Thread - import numpy as np +import scipy.optimize from src.utility import ui_print from src.exceptions import DeviceBusy, DeviceAccessError @@ -155,9 +155,6 @@ class CoilConstantCalibration(Thread): for i in range(3): k_samples = [] for c_idx, c in enumerate(currents): - # Set new progress indicator for UI - self.put_message('progress', ((c_idx / (self.MEASUREMENT_POINTS * 2)) + i) / 3) - # Set current c_vec = [0, 0, 0] c_vec[i] = c @@ -174,6 +171,9 @@ class CoilConstantCalibration(Thread): if c_idx == currents.shape[0] - 1: axis_field_directions.append(g.MAGNETOMETER.field - ambient_field) + # Set new progress indicator for UI + self.put_message('progress', ((c_idx / (self.MEASUREMENT_POINTS * 2)) + i) / 3) + # Average samples for axis coil_constants[i] = np.average(k_samples) k_deviations[i], _ = self.calculate_standard_deviation(k_samples) @@ -209,5 +209,153 @@ class CoilConstantCalibration(Thread): v2_u = v2 / np.linalg.norm(v2) return np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0)) + def put_message(self, command, arg): + self.view_queue.put({'cmd': command, 'arg': arg}) + + +class MagnetometerCalibration(Thread): + TEST_VECTOR_MAGNITUDE = 100e-6 # In Tesla. Chosen so it can be achieved with a 3A PSU. + + def __init__(self, view_queue, calibration_points, calibration_interval): + Thread.__init__(self) + self.view_queue = view_queue + self.calibration_points = calibration_points + self.calibration_interval = calibration_interval + + # Hardware checks are done in the init method to allow for exception handling in main thread + # This means the run method should/must be called directly after Thread object creation. + + # Make sure we really have magnetometer data + if not g.MAGNETOMETER.connected: + ui_print("\nError: The magnetometer is not connected. Required for ambient field calibration.") + raise DeviceAccessError("The magnetometer is not connected. Required for ambient field calibration.") + + # Acquire cage device. This resource will only be released after the thread is ended. + try: + self.cage_dev = g.CAGE_DEVICE.request_proxy() + except DeviceBusy: + ui_print("\nError: Failed to acquire coil control. Required for ambient field calibration.") + raise DeviceAccessError("Failed to acquire coil control. Required for ambient field calibration.") + + def run(self): + try: + self.calibration_procedure() + self.put_message('finished', None) + except Exception as e: + self.put_message('failed', e) + finally: + self.cage_dev.close() + + def calibration_procedure(self): + # According to method outlined in: + # Zikmund, A. & Janosek, Michal. (2014). Calibration procedure for triaxial magnetometers without a compensating + # system or moving parts. + + # Find sensor offsets. They must be found prior to applying the chosen calibration algorithm + # This will be accurate if the cage was recently calibrated + self.cage_dev.set_field_compensated([0, 0, 0]) + # Sleep for a certain duration to allow psu to stabilize output and magnetometer to supply readings + time.sleep(self.calibration_interval) + # The offsets can easily be read from the magnetometer + offsets = g.MAGNETOMETER.field + # Set new progress indicator for UI + self.set_progress(True, 0) + + # Generate our set of test vectors + test_vectors = self.fibonacci_sphere(self.calibration_points) + + # Holds the knowns for each row of our system of equations. These are M, B_x, B_y, B_z + # (B_E is constant for the test and not stored in the array) + # Each sensor axis has its own independent system of equations + samples = [[], [], []] + # Collect sensor data for each test vector + for vec_idx, test_vec in enumerate(test_vectors): + # Command output + applied_vec = test_vec * self.TEST_VECTOR_MAGNITUDE + self.cage_dev.set_field_raw(applied_vec) + + # Sleep for a certain duration to allow psu to stabilize output and magnetometer to supply readings + time.sleep(self.calibration_interval) + + # Read output and save to array for solver later + reading = g.MAGNETOMETER.field - offsets + for i in range(3): + row = {'m': reading[i], 'b_x': applied_vec[0], 'b_y': applied_vec[1], 'b_z': applied_vec[2]} + samples[i].append(row) + + # Set new progress indicator for UI + self.set_progress(True, vec_idx + 1) + + # Put device into an off and ready state + self.cage_dev.idle() + + # Use collected data to build and solve system of equations + sensor_parameters = self.solve_system(samples) + + # Pass results to UI + self.put_message('calibration_data', sensor_parameters) + + def set_progress(self, offset_complete, test_vec_index): + progress = int(offset_complete) * 0.2 + (test_vec_index / self.calibration_points) * 0.8 + self.put_message('progress', progress) + + def solve_system(self, samples): + # Calculate magnitude of ambient field + b_e_x = g.CAGE_DEVICE.axes[0].ambient_field + b_e_y = g.CAGE_DEVICE.axes[0].ambient_field + b_e_z = g.CAGE_DEVICE.axes[0].ambient_field + b_e = sqrt(b_e_x**2 + b_e_y**2 + b_e_z**2) + + # Perform least squares optimization on all magnetometer axes + sensor_parameters = [] + for axis, axis_samples in enumerate(samples): + result = scipy.optimize.least_squares(self.residual_function, (0, 0, 0, 0), args=(b_e, axis_samples)) + s, alpha_e, alpha, beta = result.x + residual = result.cost + sensor_parameters.append({'sensitivity': s, + 'alpha_e': alpha_e, + 'alpha': alpha, + 'beta': beta, + 'residual': residual}) + + return sensor_parameters + + # Function passed to scipy for the optimization + @staticmethod + def residual_function(x, b_e, samples): + # Unpack vector. These unknown parameters are described in the calibration paper + s, alpha_e, alpha, beta = x + # Residual vector + res = [] + for sample in samples: + # Unpack row coefficients: + m = sample['m'] + b_x = sample['b_x'] + b_y = sample['b_y'] + b_z = sample['b_z'] + res.append(m - s * (b_e*sin(alpha_e) + b_x*cos(alpha)*cos(beta) + b_y*cos(alpha)*sin(beta) + b_z*sin(alpha))) + return res + + @staticmethod + def fibonacci_sphere(samples): + """ + Algorithm to generate roughly equally spaced points on a sphere + From https://stackoverflow.com/a/26127012""" + points = [] + phi = pi * (3.0 - sqrt(5.0)) # golden angle in radians + + for i in range(samples): + y = 1 - (i / float(samples - 1)) * 2 # y goes from 1 to -1 + radius = sqrt(1 - y * y) # radius at y + + theta = phi * i # golden angle increment + + x = cos(theta) * radius + z = sin(theta) * radius + + points.append(np.array([x, y, z])) + + return points + def put_message(self, command, arg): self.view_queue.put({'cmd': command, 'arg': arg}) \ No newline at end of file diff --git a/src/socket_control.py b/src/socket_control.py index 8609c42..0c501e3 100644 --- a/src/socket_control.py +++ b/src/socket_control.py @@ -40,6 +40,13 @@ import src.helmholtz_cage_device as helmholtz_cage_device # This function can be called before declare_api_version. # Please dont put # +# magnetometer_field [X comp.] [Y comp.] [Z comp.] +# Returns: 1 +# Accepts decimal point formatted floats, with or without scientific notation. The float() cast must understand it. +# The field units are Tesla +# Sets the state of an a virtual magnetometer object which mirrors a physical sensor providing data by means of +# this command. +# # declare_api_version [version] # Returns: 0 or 1 (terminated with newline) # Declare the api version the client application was programmed for. It must be compatible with the current diff --git a/src/user_interface.py b/src/user_interface.py index 21a43f3..22396e5 100644 --- a/src/user_interface.py +++ b/src/user_interface.py @@ -23,7 +23,7 @@ import src.globals as g import src.csv_threading as csv import src.config_handling as config import src.csv_logging as log -from src.calibration import AmbientFieldCalibration, CoilConstantCalibration +from src.calibration import AmbientFieldCalibration, CoilConstantCalibration, MagnetometerCalibration from src.exceptions import DeviceAccessError from src.utility import ui_print import src.helmholtz_cage_device as helmholtz_cage_device @@ -63,6 +63,7 @@ class HelmholtzGUI(Tk): for P in [ManualMode, HardwareConfiguration, CalibrateAmbientField, + CalibrateMagnetometer, ExecuteCSVMode, ConfigureLogging]: # do this for every mode page page = P(main_area, self) # initialize the page with the main_area frame as the parent @@ -97,12 +98,13 @@ class TopMenu: menu = Menu(window) # initialize Menu object window.config(menu=menu) # put menu at the top of the window - mode_selector = Menu(menu) # create a submenu object + mode_selector = Menu(menu, tearoff=0) # create a submenu object menu.add_cascade(label="Menu", menu=mode_selector) # add a dropdown with the submenu object # create the different options in the dropdown: mode_selector.add_command(label="Static Manual Input", command=self.manual_mode) mode_selector.add_command(label="Execute CSV Sequence", command=self.execute_csv_mode) mode_selector.add_command(label="Calibrate Ambient Field", command=self.calibrate_ambient) + mode_selector.add_command(label="Calibrate Magnetometer", command=self.calibrate_magnetometer) mode_selector.add_separator() mode_selector.add_command(label="Configure Data Logging", command=self.logging) mode_selector.add_command(label="Settings...", command=self.configuration) @@ -116,6 +118,9 @@ class TopMenu: def calibrate_ambient(self): self.window.show_frame(CalibrateAmbientField) + def calibrate_magnetometer(self): + self.window.show_frame(CalibrateMagnetometer) + def execute_csv_mode(self): # switch to the CSV execution page self.window.show_frame(ExecuteCSVMode) @@ -789,6 +794,167 @@ class CalibrateAmbientField(Frame): messagebox.showwarning("Calibration failed", "Failed to start calibration:\n{}".format(e)) +class CalibrateMagnetometer(Frame): + def __init__(self, parent, controller): + Frame.__init__(self, parent) + self.parent = parent + self.controller = controller + + # To center window + # self.columnconfigure(0, weight=1) + self.rowconfigure(0, weight=1) + self.left_column = Frame(self) + self.left_column.grid(row=0, column=0, sticky="nsew") + self.right_column = Frame(self) + self.right_column.grid(row=0, column=1, sticky="nsew") + self.left_column.rowconfigure(3, weight=1) + + # Thread variables + self.calibration_thread = None + self.view_mpi_queue = Queue() # Receives status information from calibration procedure threads. + + # UI variables + self.connected_state_var = StringVar(value="Not connected") + self.field_value_vars = [StringVar(value="No data"), + StringVar(value="No data"), + StringVar(value="No data")] + self.calibration_procedure_progress_var = IntVar(value=0) + # Calibration parameters + self.calibration_points_var = IntVar(value=8) + self.calibration_interval_var = DoubleVar(value=5) + + # UI Elements + row_counter = 0 + + # Create headline + header = Label(self.left_column, text="Magnetometer Calibration", font=HEADER_FONT) + header.grid(row=row_counter, column=0, columnspan=2, padx=100, pady=20, sticky="nw") + row_counter += 1 + + # Magnetometer connected indicator + connected_status_frame = Frame(self.left_column) + connected_status_frame.grid(row=row_counter, column=0, sticky="nw") + connected_label = Label(connected_status_frame, text="Magnetometer state:", font=SUB_HEADER_FONT) + connected_label.grid(row=0, column=0, padx=10, pady=20, sticky="nw") + self.connected_state_label = Label(connected_status_frame, textvariable=self.connected_state_var, fg="red") + self.connected_state_label.grid(row=0, column=1, padx=10, pady=20, sticky="nw") + row_counter += 1 + + # Magnetometer field data grid + field_data_frame = Frame(self.left_column) + field_data_frame.grid(row=row_counter, column=0, sticky="nw") + field_data_label = Label(field_data_frame, text="Field data:", font=SUB_HEADER_FONT) + field_data_label.grid(row=0, column=0, padx=10, pady=3, sticky="nw") + axis_labels = ['X:', 'Y:', 'Z:'] + for i in range(3): + field_data_axis_label = Label(field_data_frame, text=axis_labels[i]) + field_data_axis_label.grid(row=i, column=1, padx=10, pady=3) + + field_data_axis_data = Label(field_data_frame, textvariable=self.field_value_vars[i]) + field_data_axis_data.grid(row=i, column=2, padx=(20, 0), pady=3) + + field_data_axis_units = Label(field_data_frame, text="\u03BCT") + field_data_axis_units.grid(row=i, column=3, padx=5, pady=3) + row_counter += 1 + + # Centered controls + controls_frame = Frame(self.left_column) + controls_frame.grid(row=row_counter, column=0, sticky="sw") + # Number of calibration points + calibration_point_nr_label = Label(controls_frame, text="# of calibration points") + calibration_point_nr_label.grid(row=0, column=0, pady=5, sticky="w") + calibration_point_nr_entry = Entry(controls_frame, textvariable=self.calibration_points_var) + calibration_point_nr_entry.grid(row=0, column=1, pady=5, sticky="w") + # Measurement interval + calibration_point_nr_label = Label(controls_frame, text="Measurement interval [s]") + calibration_point_nr_label.grid(row=1, column=0, pady=5, sticky="w") + calibration_point_nr_entry = Entry(controls_frame, textvariable=self.calibration_interval_var) + calibration_point_nr_entry.grid(row=1, column=1, pady=5, sticky="w") + # Calibration start buttons + start_button_frame = Frame(controls_frame) + start_button_frame.grid(row=2, column=0, columnspan=2) + self.start_calibration_button = Button(start_button_frame, text="Start Calibration", + command=self.start_calibration_procedure, + pady=5, padx=5, font=SMALL_BUTTON_FONT) + self.start_calibration_button.grid(row=0, column=0, padx=10, pady=(30, 10)) + # Calibration progress bar + progress_bar_frame = Frame(controls_frame) + progress_bar_frame.grid(row=3, column=0, columnspan=2) + calibration_procedure_progress_label = Label(progress_bar_frame, text="Progress:") + calibration_procedure_progress_label.grid(row=0, column=0, padx=10, pady=10) + calibration_procedure_progress = ttk.Progressbar(progress_bar_frame, + length=240, + variable=self.calibration_procedure_progress_var) + calibration_procedure_progress.grid(row=0, column=1, padx=10, pady=10, sticky="we") + row_counter += 1 + + # This starts an endless polling loop + self.update_view() + + def page_switch(self): + # every class in the UI needs this, even if it doesn't do anything + pass + + def update_view(self): + # Get new connected status + if g.MAGNETOMETER.connected: + self.connected_state_var.set("connected") + self.connected_state_label.configure(fg="green") + else: + self.connected_state_var.set("Not connected") + self.connected_state_label.configure(fg="red") + + # Get new field data + new_field = g.MAGNETOMETER.field + for i in range(3): + # Display in uT + self.field_value_vars[i].set("{:.3f}".format(new_field[i] * 1e6)) + + # Get mpi messages from calibration procedures + try: + while True: + msg = self.view_mpi_queue.get(block=False) + cmd = msg['cmd'] + arg = msg['arg'] + if cmd == 'finished': + self.reactivate_buttons() + elif cmd == 'failed': + messagebox.showerror("Calibration error", "Error occured during calibration:\n{}".format(arg)) + self.reactivate_buttons() + elif cmd == 'progress': + self.calibration_procedure_progress_var.set(min(int(arg*100), 100)) + elif cmd == 'calibration_data': + self.display_calibration_results(arg) + else: + ui_print("Error: Unexpected mpi command '{}' in CalibrationTool".format(cmd)) + except queue.Empty: + pass + + self.controller.after(500, self.update_view) + + def reactivate_buttons(self): + self.start_calibration_button.configure(text="Start Calibration", state=NORMAL) + self.calibration_procedure_progress_var.set(0) + + def deactivate_buttons(self): + self.start_calibration_button.configure(text="Running...", state=DISABLED) + + def display_calibration_results(self, results): + pass + + def start_calibration_procedure(self): + try: + calibration_points = self.calibration_points_var.get() + calibration_interval = self.calibration_interval_var.get() + self.calibration_thread = MagnetometerCalibration(self.view_mpi_queue, + calibration_points, + calibration_interval) + self.calibration_thread.start() + self.deactivate_buttons() + except (DeviceAccessError, TclError) as e: + messagebox.showwarning("Calibration failed", "Failed to start calibration:\n{}".format(e)) + + class HardwareConfiguration(Frame): """Settings window to set program constants"""