Added calibration method for magnetometers.

This commit is contained in:
2021-08-20 13:28:06 +02:00
parent 4d1f5a7437
commit 244aaa8e89
4 changed files with 331 additions and 9 deletions
+3 -2
View File
@@ -1,5 +1,6 @@
numpy==1.19.3 # bug in numpy 1.19.4, 1.19.3 used as workaround
numpy==1.19.3
pyserial~=3.5
future~=0.18.2
pandas~=1.1.5
matplotlib~=3.3.2
matplotlib~=3.3.2
scipy
+153 -5
View File
@@ -1,9 +1,9 @@
from math import pi, sqrt
from math import pi, sqrt, sin, cos
import time
from datetime import datetime
from threading import Thread
import numpy as np
import scipy.optimize
from src.utility import ui_print
from src.exceptions import DeviceBusy, DeviceAccessError
@@ -155,9 +155,6 @@ class CoilConstantCalibration(Thread):
for i in range(3):
k_samples = []
for c_idx, c in enumerate(currents):
# Set new progress indicator for UI
self.put_message('progress', ((c_idx / (self.MEASUREMENT_POINTS * 2)) + i) / 3)
# Set current
c_vec = [0, 0, 0]
c_vec[i] = c
@@ -174,6 +171,9 @@ class CoilConstantCalibration(Thread):
if c_idx == currents.shape[0] - 1:
axis_field_directions.append(g.MAGNETOMETER.field - ambient_field)
# Set new progress indicator for UI
self.put_message('progress', ((c_idx / (self.MEASUREMENT_POINTS * 2)) + i) / 3)
# Average samples for axis
coil_constants[i] = np.average(k_samples)
k_deviations[i], _ = self.calculate_standard_deviation(k_samples)
@@ -209,5 +209,153 @@ class CoilConstantCalibration(Thread):
v2_u = v2 / np.linalg.norm(v2)
return np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))
def put_message(self, command, arg):
self.view_queue.put({'cmd': command, 'arg': arg})
class MagnetometerCalibration(Thread):
TEST_VECTOR_MAGNITUDE = 100e-6 # In Tesla. Chosen so it can be achieved with a 3A PSU.
def __init__(self, view_queue, calibration_points, calibration_interval):
Thread.__init__(self)
self.view_queue = view_queue
self.calibration_points = calibration_points
self.calibration_interval = calibration_interval
# Hardware checks are done in the init method to allow for exception handling in main thread
# This means the run method should/must be called directly after Thread object creation.
# Make sure we really have magnetometer data
if not g.MAGNETOMETER.connected:
ui_print("\nError: The magnetometer is not connected. Required for ambient field calibration.")
raise DeviceAccessError("The magnetometer is not connected. Required for ambient field calibration.")
# Acquire cage device. This resource will only be released after the thread is ended.
try:
self.cage_dev = g.CAGE_DEVICE.request_proxy()
except DeviceBusy:
ui_print("\nError: Failed to acquire coil control. Required for ambient field calibration.")
raise DeviceAccessError("Failed to acquire coil control. Required for ambient field calibration.")
def run(self):
try:
self.calibration_procedure()
self.put_message('finished', None)
except Exception as e:
self.put_message('failed', e)
finally:
self.cage_dev.close()
def calibration_procedure(self):
# According to method outlined in:
# Zikmund, A. & Janosek, Michal. (2014). Calibration procedure for triaxial magnetometers without a compensating
# system or moving parts.
# Find sensor offsets. They must be found prior to applying the chosen calibration algorithm
# This will be accurate if the cage was recently calibrated
self.cage_dev.set_field_compensated([0, 0, 0])
# Sleep for a certain duration to allow psu to stabilize output and magnetometer to supply readings
time.sleep(self.calibration_interval)
# The offsets can easily be read from the magnetometer
offsets = g.MAGNETOMETER.field
# Set new progress indicator for UI
self.set_progress(True, 0)
# Generate our set of test vectors
test_vectors = self.fibonacci_sphere(self.calibration_points)
# Holds the knowns for each row of our system of equations. These are M, B_x, B_y, B_z
# (B_E is constant for the test and not stored in the array)
# Each sensor axis has its own independent system of equations
samples = [[], [], []]
# Collect sensor data for each test vector
for vec_idx, test_vec in enumerate(test_vectors):
# Command output
applied_vec = test_vec * self.TEST_VECTOR_MAGNITUDE
self.cage_dev.set_field_raw(applied_vec)
# Sleep for a certain duration to allow psu to stabilize output and magnetometer to supply readings
time.sleep(self.calibration_interval)
# Read output and save to array for solver later
reading = g.MAGNETOMETER.field - offsets
for i in range(3):
row = {'m': reading[i], 'b_x': applied_vec[0], 'b_y': applied_vec[1], 'b_z': applied_vec[2]}
samples[i].append(row)
# Set new progress indicator for UI
self.set_progress(True, vec_idx + 1)
# Put device into an off and ready state
self.cage_dev.idle()
# Use collected data to build and solve system of equations
sensor_parameters = self.solve_system(samples)
# Pass results to UI
self.put_message('calibration_data', sensor_parameters)
def set_progress(self, offset_complete, test_vec_index):
progress = int(offset_complete) * 0.2 + (test_vec_index / self.calibration_points) * 0.8
self.put_message('progress', progress)
def solve_system(self, samples):
# Calculate magnitude of ambient field
b_e_x = g.CAGE_DEVICE.axes[0].ambient_field
b_e_y = g.CAGE_DEVICE.axes[0].ambient_field
b_e_z = g.CAGE_DEVICE.axes[0].ambient_field
b_e = sqrt(b_e_x**2 + b_e_y**2 + b_e_z**2)
# Perform least squares optimization on all magnetometer axes
sensor_parameters = []
for axis, axis_samples in enumerate(samples):
result = scipy.optimize.least_squares(self.residual_function, (0, 0, 0, 0), args=(b_e, axis_samples))
s, alpha_e, alpha, beta = result.x
residual = result.cost
sensor_parameters.append({'sensitivity': s,
'alpha_e': alpha_e,
'alpha': alpha,
'beta': beta,
'residual': residual})
return sensor_parameters
# Function passed to scipy for the optimization
@staticmethod
def residual_function(x, b_e, samples):
# Unpack vector. These unknown parameters are described in the calibration paper
s, alpha_e, alpha, beta = x
# Residual vector
res = []
for sample in samples:
# Unpack row coefficients:
m = sample['m']
b_x = sample['b_x']
b_y = sample['b_y']
b_z = sample['b_z']
res.append(m - s * (b_e*sin(alpha_e) + b_x*cos(alpha)*cos(beta) + b_y*cos(alpha)*sin(beta) + b_z*sin(alpha)))
return res
@staticmethod
def fibonacci_sphere(samples):
"""
Algorithm to generate roughly equally spaced points on a sphere
From https://stackoverflow.com/a/26127012"""
points = []
phi = pi * (3.0 - sqrt(5.0)) # golden angle in radians
for i in range(samples):
y = 1 - (i / float(samples - 1)) * 2 # y goes from 1 to -1
radius = sqrt(1 - y * y) # radius at y
theta = phi * i # golden angle increment
x = cos(theta) * radius
z = sin(theta) * radius
points.append(np.array([x, y, z]))
return points
def put_message(self, command, arg):
self.view_queue.put({'cmd': command, 'arg': arg})
+7
View File
@@ -40,6 +40,13 @@ import src.helmholtz_cage_device as helmholtz_cage_device
# This function can be called before declare_api_version.
# Please dont put
#
# magnetometer_field [X comp.] [Y comp.] [Z comp.]
# Returns: 1
# Accepts decimal point formatted floats, with or without scientific notation. The float() cast must understand it.
# The field units are Tesla
# Sets the state of an a virtual magnetometer object which mirrors a physical sensor providing data by means of
# this command.
#
# declare_api_version [version]
# Returns: 0 or 1 (terminated with newline)
# Declare the api version the client application was programmed for. It must be compatible with the current
+168 -2
View File
@@ -23,7 +23,7 @@ import src.globals as g
import src.csv_threading as csv
import src.config_handling as config
import src.csv_logging as log
from src.calibration import AmbientFieldCalibration, CoilConstantCalibration
from src.calibration import AmbientFieldCalibration, CoilConstantCalibration, MagnetometerCalibration
from src.exceptions import DeviceAccessError
from src.utility import ui_print
import src.helmholtz_cage_device as helmholtz_cage_device
@@ -63,6 +63,7 @@ class HelmholtzGUI(Tk):
for P in [ManualMode,
HardwareConfiguration,
CalibrateAmbientField,
CalibrateMagnetometer,
ExecuteCSVMode,
ConfigureLogging]: # do this for every mode page
page = P(main_area, self) # initialize the page with the main_area frame as the parent
@@ -97,12 +98,13 @@ class TopMenu:
menu = Menu(window) # initialize Menu object
window.config(menu=menu) # put menu at the top of the window
mode_selector = Menu(menu) # create a submenu object
mode_selector = Menu(menu, tearoff=0) # create a submenu object
menu.add_cascade(label="Menu", menu=mode_selector) # add a dropdown with the submenu object
# create the different options in the dropdown:
mode_selector.add_command(label="Static Manual Input", command=self.manual_mode)
mode_selector.add_command(label="Execute CSV Sequence", command=self.execute_csv_mode)
mode_selector.add_command(label="Calibrate Ambient Field", command=self.calibrate_ambient)
mode_selector.add_command(label="Calibrate Magnetometer", command=self.calibrate_magnetometer)
mode_selector.add_separator()
mode_selector.add_command(label="Configure Data Logging", command=self.logging)
mode_selector.add_command(label="Settings...", command=self.configuration)
@@ -116,6 +118,9 @@ class TopMenu:
def calibrate_ambient(self):
self.window.show_frame(CalibrateAmbientField)
def calibrate_magnetometer(self):
self.window.show_frame(CalibrateMagnetometer)
def execute_csv_mode(self): # switch to the CSV execution page
self.window.show_frame(ExecuteCSVMode)
@@ -789,6 +794,167 @@ class CalibrateAmbientField(Frame):
messagebox.showwarning("Calibration failed", "Failed to start calibration:\n{}".format(e))
class CalibrateMagnetometer(Frame):
def __init__(self, parent, controller):
Frame.__init__(self, parent)
self.parent = parent
self.controller = controller
# To center window
# self.columnconfigure(0, weight=1)
self.rowconfigure(0, weight=1)
self.left_column = Frame(self)
self.left_column.grid(row=0, column=0, sticky="nsew")
self.right_column = Frame(self)
self.right_column.grid(row=0, column=1, sticky="nsew")
self.left_column.rowconfigure(3, weight=1)
# Thread variables
self.calibration_thread = None
self.view_mpi_queue = Queue() # Receives status information from calibration procedure threads.
# UI variables
self.connected_state_var = StringVar(value="Not connected")
self.field_value_vars = [StringVar(value="No data"),
StringVar(value="No data"),
StringVar(value="No data")]
self.calibration_procedure_progress_var = IntVar(value=0)
# Calibration parameters
self.calibration_points_var = IntVar(value=8)
self.calibration_interval_var = DoubleVar(value=5)
# UI Elements
row_counter = 0
# Create headline
header = Label(self.left_column, text="Magnetometer Calibration", font=HEADER_FONT)
header.grid(row=row_counter, column=0, columnspan=2, padx=100, pady=20, sticky="nw")
row_counter += 1
# Magnetometer connected indicator
connected_status_frame = Frame(self.left_column)
connected_status_frame.grid(row=row_counter, column=0, sticky="nw")
connected_label = Label(connected_status_frame, text="Magnetometer state:", font=SUB_HEADER_FONT)
connected_label.grid(row=0, column=0, padx=10, pady=20, sticky="nw")
self.connected_state_label = Label(connected_status_frame, textvariable=self.connected_state_var, fg="red")
self.connected_state_label.grid(row=0, column=1, padx=10, pady=20, sticky="nw")
row_counter += 1
# Magnetometer field data grid
field_data_frame = Frame(self.left_column)
field_data_frame.grid(row=row_counter, column=0, sticky="nw")
field_data_label = Label(field_data_frame, text="Field data:", font=SUB_HEADER_FONT)
field_data_label.grid(row=0, column=0, padx=10, pady=3, sticky="nw")
axis_labels = ['X:', 'Y:', 'Z:']
for i in range(3):
field_data_axis_label = Label(field_data_frame, text=axis_labels[i])
field_data_axis_label.grid(row=i, column=1, padx=10, pady=3)
field_data_axis_data = Label(field_data_frame, textvariable=self.field_value_vars[i])
field_data_axis_data.grid(row=i, column=2, padx=(20, 0), pady=3)
field_data_axis_units = Label(field_data_frame, text="\u03BCT")
field_data_axis_units.grid(row=i, column=3, padx=5, pady=3)
row_counter += 1
# Centered controls
controls_frame = Frame(self.left_column)
controls_frame.grid(row=row_counter, column=0, sticky="sw")
# Number of calibration points
calibration_point_nr_label = Label(controls_frame, text="# of calibration points")
calibration_point_nr_label.grid(row=0, column=0, pady=5, sticky="w")
calibration_point_nr_entry = Entry(controls_frame, textvariable=self.calibration_points_var)
calibration_point_nr_entry.grid(row=0, column=1, pady=5, sticky="w")
# Measurement interval
calibration_point_nr_label = Label(controls_frame, text="Measurement interval [s]")
calibration_point_nr_label.grid(row=1, column=0, pady=5, sticky="w")
calibration_point_nr_entry = Entry(controls_frame, textvariable=self.calibration_interval_var)
calibration_point_nr_entry.grid(row=1, column=1, pady=5, sticky="w")
# Calibration start buttons
start_button_frame = Frame(controls_frame)
start_button_frame.grid(row=2, column=0, columnspan=2)
self.start_calibration_button = Button(start_button_frame, text="Start Calibration",
command=self.start_calibration_procedure,
pady=5, padx=5, font=SMALL_BUTTON_FONT)
self.start_calibration_button.grid(row=0, column=0, padx=10, pady=(30, 10))
# Calibration progress bar
progress_bar_frame = Frame(controls_frame)
progress_bar_frame.grid(row=3, column=0, columnspan=2)
calibration_procedure_progress_label = Label(progress_bar_frame, text="Progress:")
calibration_procedure_progress_label.grid(row=0, column=0, padx=10, pady=10)
calibration_procedure_progress = ttk.Progressbar(progress_bar_frame,
length=240,
variable=self.calibration_procedure_progress_var)
calibration_procedure_progress.grid(row=0, column=1, padx=10, pady=10, sticky="we")
row_counter += 1
# This starts an endless polling loop
self.update_view()
def page_switch(self):
# every class in the UI needs this, even if it doesn't do anything
pass
def update_view(self):
# Get new connected status
if g.MAGNETOMETER.connected:
self.connected_state_var.set("connected")
self.connected_state_label.configure(fg="green")
else:
self.connected_state_var.set("Not connected")
self.connected_state_label.configure(fg="red")
# Get new field data
new_field = g.MAGNETOMETER.field
for i in range(3):
# Display in uT
self.field_value_vars[i].set("{:.3f}".format(new_field[i] * 1e6))
# Get mpi messages from calibration procedures
try:
while True:
msg = self.view_mpi_queue.get(block=False)
cmd = msg['cmd']
arg = msg['arg']
if cmd == 'finished':
self.reactivate_buttons()
elif cmd == 'failed':
messagebox.showerror("Calibration error", "Error occured during calibration:\n{}".format(arg))
self.reactivate_buttons()
elif cmd == 'progress':
self.calibration_procedure_progress_var.set(min(int(arg*100), 100))
elif cmd == 'calibration_data':
self.display_calibration_results(arg)
else:
ui_print("Error: Unexpected mpi command '{}' in CalibrationTool".format(cmd))
except queue.Empty:
pass
self.controller.after(500, self.update_view)
def reactivate_buttons(self):
self.start_calibration_button.configure(text="Start Calibration", state=NORMAL)
self.calibration_procedure_progress_var.set(0)
def deactivate_buttons(self):
self.start_calibration_button.configure(text="Running...", state=DISABLED)
def display_calibration_results(self, results):
pass
def start_calibration_procedure(self):
try:
calibration_points = self.calibration_points_var.get()
calibration_interval = self.calibration_interval_var.get()
self.calibration_thread = MagnetometerCalibration(self.view_mpi_queue,
calibration_points,
calibration_interval)
self.calibration_thread.start()
self.deactivate_buttons()
except (DeviceAccessError, TclError) as e:
messagebox.showwarning("Calibration failed", "Failed to start calibration:\n{}".format(e))
class HardwareConfiguration(Frame):
"""Settings window to set program constants"""