Files
Helmholtz_Test_Bench/src/calibration_complete.py
T

399 lines
18 KiB
Python

from math import pi, sqrt, sin, cos
import time
from datetime import datetime
from threading import Thread
import numpy as np
import scipy.optimize
from src.utility import ui_print
from src.exceptions import DeviceBusy, DeviceAccessError
import src.globals as g
#from src.user_interface import CalibrateMagnetometer.mgm_to_helmholtz_cos_trans
class AmbientFieldCalibration(Thread):
"""Varies the coil-generated fields until a configuration is reached which zeros the connected magnetometer.
The magnetometer does not need to be centered. The axes of the magnetometer must match the coil configuration!"""
# Timeout/settling time for the calibration procedure. An acceptable duration for the PI is required
SETTLE_TIME = 45
# PID controller time delta
TIME_DELTA = 0.5
P_CONTROL = -7e3 # 0.2 A/s slew-rate at 40uT
I_CONTROL = 0 # -1e4 # 0.01A/s slew-rate for 1uTs
I_LIMIT = 1e-7 # uTs, Limit I to 0.025 A/s slew-rate to prevent wind-up
# D_CONTROL = Not implemented for now
def __init__(self, view_queue):
Thread.__init__(self)
self.view_queue = view_queue
# Axis currents. Incremented by PID loop
self.axis_currents = np.array([0, 0, 0], dtype=float)
# Used for I control
self.error_integral = np.array([0, 0, 0], dtype=float)
# Hardware checks are done in the init method to allow for exception handling in main thread
# This means the run method should/must be called directly after Thread object creation.
# Make sure we really have magnetometer data
if not g.MAGNETOMETER.connected:
ui_print("\nError: The magnetometer is not connected. Required for ambient field calibration.")
raise DeviceAccessError("The magnetometer is not connected. Required for ambient field calibration.")
# Acquire cage device. This resource will only be released after the thread is ended.
try:
self.cage_dev = g.CAGE_DEVICE.request_proxy()
except DeviceBusy:
ui_print("\nError: Failed to acquire coil control. Required for ambient field calibration.")
raise DeviceAccessError("Failed to acquire coil control. Required for ambient field calibration.")
def run(self):
try:
self.calibration_procedure()
self.put_message('finished', None)
except Exception as e:
self.put_message('failed', e)
finally:
self.cage_dev.close()
def calibration_procedure(self):
raw_experiment_data = []
start_time = datetime.now()
target_time = 0
current_time = datetime.now()
while (current_time - start_time).seconds < self.SETTLE_TIME:
# Each axis runs its own PID controller. They are slightly coupled by unorthogonality, which should
# hopefully not destabilize the feedback loop
for i in range(3):
# Error in tesla
dt = self.TIME_DELTA
e = g.MAGNETOMETER.field[i]
# Change in control current
du = e * self.P_CONTROL + self.error_integral[i] * self.I_CONTROL
self.axis_currents[i] += du*dt
# Update integral
# Add increment
self.error_integral[i] += e*dt
# Clamp range
self.error_integral = np.clip(self.error_integral, -self.I_LIMIT, self.I_LIMIT)
# Apply new field actuation
self.cage_dev.set_signed_currents(self.axis_currents)
# Set new progress indicator for UI
self.put_message('progress', (current_time - start_time).seconds / self.SETTLE_TIME)
# Sleep until next iteration
target_time += self.TIME_DELTA
sleep_time = ((start_time - current_time).total_seconds() + target_time)
if sleep_time > 0:
time.sleep(sleep_time)
current_time = datetime.now()
coil_constants = np.array([g.CAGE_DEVICE.axes[i].coil_const for i in range(3)])
for i, axis in zip(range(3), ['x', 'y', 'z']):
raw_experiment_data.append({'axis': axis,
'cancellation_current': -self.axis_currents[i],
'ambient_field': -self.axis_currents[i] * coil_constants[i],
'residual_field': g.MAGNETOMETER.field[i]})
results = {'ambient': -self.axis_currents,
'ambient_ut': -self.axis_currents * coil_constants * 1e6,
'residual': g.MAGNETOMETER.field,
'raw_data': raw_experiment_data}
self.put_message('ambient_data', results)
# Put device into an off and ready state
self.cage_dev.idle()
def put_message(self, command, arg):
self.view_queue.put({'cmd': command, 'arg': arg})
class CoilConstantCalibration(Thread):
MEASUREMENT_RANGE = 3 # A. Will extend into negative and positive sign
MEASUREMENT_POINTS = 4 # Excludes zero. eg 0.5, 1, 1.5, 2
SETTLE_TIME = 3 # Time until new measurement is ready after setting current
def __init__(self, view_queue):
Thread.__init__(self)
self.view_queue = view_queue
# Hardware checks are done in the init method to allow for exception handling in main thread
# This means the run method should/must be called directly after Thread object creation.
# Make sure we really have magnetometer data
if not g.MAGNETOMETER.connected:
ui_print("\nError: The magnetometer is not connected. Required for ambient field calibration.")
raise DeviceAccessError("The magnetometer is not connected. Required for ambient field calibration.")
# Acquire cage device. This resource will only be released after the thread is ended.
try:
self.cage_dev = g.CAGE_DEVICE.request_proxy()
except DeviceBusy:
ui_print("\nError: Failed to acquire coil control. Required for ambient field calibration.")
raise DeviceAccessError("Failed to acquire coil control. Required for ambient field calibration.")
def run(self):
try:
self.calibration_procedure()
self.put_message('finished', None)
except Exception as e:
self.put_message('failed', e)
finally:
self.cage_dev.close()
def calibration_procedure(self):
# All generated fields will be compared to this using a simple difference method
ambient_field = g.MAGNETOMETER.field
# This generates linearly spaced current setpoints and excludes zero
currents = np.linspace(-self.MEASUREMENT_RANGE, self.MEASUREMENT_RANGE, self.MEASUREMENT_POINTS * 2 + 1)
currents = np.delete(currents, self.MEASUREMENT_POINTS)
# Stores three vectors that correspond to the x,y,z actuation field directions.
axis_field_directions = []
# Result variables
coil_constants = np.zeros(3)
k_deviations = np.zeros(3)
raw_experiment_data = []
# The coil constant must be determined for every axis
for i in range(3):
k_samples = []
for c_idx, c in enumerate(currents):
# Set current
c_vec = [0, 0, 0]
c_vec[i] = c
self.cage_dev.set_signed_currents(c_vec)
time.sleep(self.SETTLE_TIME)
# Get coil constant
field = g.MAGNETOMETER.field
field_diff_mag = np.linalg.norm(field - ambient_field)
sign = 1 if field[i] - ambient_field[i] >= 0 else -1
k = (field_diff_mag * sign) / c
k_samples.append(k)
# Save vector as principal coil direction if it is the last sample with the largest positive current
if c_idx == currents.shape[0] - 1:
axis_field_directions.append(g.MAGNETOMETER.field - ambient_field)
# Set new progress indicator for UI
self.put_message('progress', ((c_idx / (self.MEASUREMENT_POINTS * 2)) + i) / 3)
# Save into raw data vector
raw_experiment_data.append({'axis': i,
'I_x': c_vec[0],
'I_y': c_vec[1],
'I_z': c_vec[2],
'delta_mag_B': field_diff_mag,
'sign': sign,
'K': k})
# Average samples for axis
coil_constants[i] = np.average(k_samples)
k_deviations[i], _ = self.calculate_standard_deviation(k_samples)
# Put device into an off and ready state
self.cage_dev.idle()
angles = {'xy': self.angle_between(axis_field_directions[0], axis_field_directions[1]) * 180 / pi,
'yz': self.angle_between(axis_field_directions[1], axis_field_directions[2]) * 180 / pi,
'xz': self.angle_between(axis_field_directions[0], axis_field_directions[2]) * 180 / pi}
self.put_message('coil_constant_results', {'k': coil_constants,
'k_dev': k_deviations,
'angle': angles,
'raw_data': raw_experiment_data})
@staticmethod
def calculate_standard_deviation(data):
n = len(data)
average = 0
for datapoint in data:
average += datapoint / n
std_dev = 0
deviations = []
for datapoint in data:
std_dev += (datapoint - average) ** 2
deviations.append(datapoint - average)
std_dev = sqrt(std_dev / n)
return std_dev, deviations
@staticmethod
def angle_between(v1, v2):
""" Returns the angle in radians between vectors 'v1' and 'v2'"""
v1_u = v1 / np.linalg.norm(v1)
v2_u = v2 / np.linalg.norm(v2)
return np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))
def put_message(self, command, arg):
self.view_queue.put({'cmd': command, 'arg': arg})
class MagnetometerCalibration(Thread):
TEST_VECTOR_MAGNITUDE = 100e-6 # In Tesla. Chosen so it can be achieved with a 3A PSU.
def __init__(self, view_queue, calibration_points, calibration_interval, mgm_to_helmholtz_cos_trans):
Thread.__init__(self)
self.view_queue = view_queue
self.calibration_points = calibration_points
self.calibration_interval = calibration_interval
self.matrix_trans_mgm_to_hh = [[x.get() for x in row] for row in mgm_to_helmholtz_cos_trans]
# Hardware checks are done in the init method to allow for exception handling in main thread
# This means the run method should/must be called directly after Thread object creation.
# Make sure we really have magnetometer data
if not g.MAGNETOMETER.connected:
ui_print("\nError: The magnetometer is not connected. Required for ambient field calibration.")
raise DeviceAccessError("The magnetometer is not connected. Required for ambient field calibration.")
# Acquire cage device. This resource will only be released after the thread is ended.
try:
self.cage_dev = g.CAGE_DEVICE.request_proxy()
except DeviceBusy:
ui_print("\nError: Failed to acquire coil control. Required for ambient field calibration.")
raise DeviceAccessError("Failed to acquire coil control. Required for ambient field calibration.")
def run(self):
try:
self.calibration_procedure()
self.put_message('finished', None)
except Exception as e:
self.put_message('failed', e)
finally:
self.cage_dev.close()
def calibration_procedure(self):
# According to method outlined in:
# Zikmund, A. & Janosek, Michal. (2014). Calibration procedure for triaxial magnetometers without a compensating
# system or moving parts.
# This contains the raw experiment data for exporting
# Each row is a dict containing the applied vector and measured vector
raw_data = []
# Find sensor offsets. They must be found prior to applying the chosen calibration algorithm
# This will be accurate if the cage was recently calibrated
self.cage_dev.set_field_compensated([0, 0, 0])
# Sleep for a certain duration to allow psu to stabilize output and magnetometer to supply readings
time.sleep(self.calibration_interval)
matrix_trans_mgm_to_hh_np = np.array(self.matrix_trans_mgm_to_hh)
# The offsets can easily be read from the magnetometer
offsets = matrix_trans_mgm_to_hh_np.dot(g.MAGNETOMETER.field)
# Save data point to raw_data list
raw_data.append({'applied_x': 0, 'applied_y': 0, 'applied_z': 0,
'measured_x': offsets[0], 'measured_y': offsets[1], 'measured_z': offsets[2]})
# Set new progress indicator for UI
self.set_progress(True, 0)
# Generate our set of test vectors
test_vectors = self.fibonacci_sphere(self.calibration_points)
# Holds the knowns for each row of our system of equations. These are M, B_x, B_y, B_z
# (B_E is constant for the test and not stored in the array)
# Each sensor axis has its own independent system of equations
samples = [[], [], []]
# Collect sensor data for each test vector
for vec_idx, test_vec in enumerate(test_vectors):
# Command output
applied_vec = test_vec * self.TEST_VECTOR_MAGNITUDE
self.cage_dev.set_field_raw(applied_vec)
# Sleep for a certain duration to allow psu to stabilize output and magnetometer to supply readings
time.sleep(self.calibration_interval)
# Read output and save to array for solver later
raw_reading = matrix_trans_mgm_to_hh_np.dot(g.MAGNETOMETER.field)
reading = raw_reading - offsets
for i in range(3):
row = {'m': reading[i], 'b_x': applied_vec[0], 'b_y': applied_vec[1], 'b_z': applied_vec[2]}
# print("[Axis {}] {}".format(i, row))
samples[i].append(row)
# Save data point to raw_data list
raw_data.append({'applied_x': applied_vec[0], 'applied_y': applied_vec[1], 'applied_z': applied_vec[2],
'measured_x': raw_reading[0], 'measured_y': raw_reading[1], 'measured_z': raw_reading[2]})
# Set new progress indicator for UI
self.set_progress(True, vec_idx + 1)
# Put device into an off and ready state
self.cage_dev.idle()
# Use collected data to build and solve system of equations
sensor_parameters = self.solve_system(samples, offsets)
# Pass results to UI
self.put_message('calibration_data', {'results': sensor_parameters, 'raw_data': raw_data})
def set_progress(self, offset_complete, test_vec_index):
progress = int(offset_complete) * 0.2 + (test_vec_index / self.calibration_points) * 0.8
self.put_message('progress', progress)
def solve_system(self, samples, offset_data):
# Calculate magnitude of ambient field
b_e_x = g.CAGE_DEVICE.axes[0].ambient_field
b_e_y = g.CAGE_DEVICE.axes[1].ambient_field
b_e_z = g.CAGE_DEVICE.axes[2].ambient_field
b_e = sqrt(b_e_x**2 + b_e_y**2 + b_e_z**2)
# Perform least squares optimization on all magnetometer axes
sensor_parameters = []
for axis, axis_samples in enumerate(samples):
result = scipy.optimize.least_squares(self.residual_function, (1.0, pi/4, pi/4, pi/4), args=(b_e, axis_samples), gtol=1e-13)
s, alpha_e, alpha, beta = result.x
residual = np.max(np.abs(result.fun))
sensor_parameters.append({'sensitivity': s,
'offset': offset_data[axis],
'alpha_e': alpha_e,
'alpha': alpha,
'beta': beta,
'residual': residual})
return sensor_parameters
# Function passed to scipy for the optimization
@staticmethod
def residual_function(x, b_e, samples):
# Unpack vector. These unknown parameters are described in the calibration paper
s, alpha_e, alpha, beta = x
# Residual vector
res = []
for sample in samples:
# Unpack row coefficients:
m = sample['m']
b_x = sample['b_x']
b_y = sample['b_y']
b_z = sample['b_z']
res.append(m - s * (b_e*sin(alpha_e) + b_x*cos(alpha)*cos(beta) + b_y*cos(alpha)*sin(beta) + b_z*sin(alpha)))
return res
@staticmethod
def fibonacci_sphere(samples):
"""
Algorithm to generate roughly equally spaced points on a sphere
From https://stackoverflow.com/a/26127012"""
points = []
phi = pi * (3.0 - sqrt(5.0)) # golden angle in radians
for i in range(samples):
y = 1 - (i / float(samples - 1)) * 2 # y goes from 1 to -1
radius = sqrt(1 - y * y) # radius at y
theta = phi * i # golden angle increment
x = cos(theta) * radius
z = sin(theta) * radius
points.append(np.array([x, y, z]))
return points
def put_message(self, command, arg):
self.view_queue.put({'cmd': command, 'arg': arg})