from math import pi, sqrt, sin, cos import time from datetime import datetime from threading import Thread import numpy as np import scipy.optimize from src.utility import ui_print from src.exceptions import DeviceBusy, DeviceAccessError import src.globals as g class AmbientFieldCalibration(Thread): """Varies the coil-generated fields until a configuration is reached which zeros the connected magnetometer. The magnetometer does not need to be centered. The axes of the magnetometer must match the coil configuration!""" # Timeout/settling time for the calibration procedure. An acceptable duration for the PI is required SETTLE_TIME = 45 # PID controller time delta TIME_DELTA = 0.5 P_CONTROL = -7e3 # 0.2 A/s slew-rate at 40uT I_CONTROL = 0 # -1e4 # 0.01A/s slew-rate for 1uTs I_LIMIT = 1e-7 # uTs, Limit I to 0.025 A/s slew-rate to prevent wind-up # D_CONTROL = Not implemented for now def __init__(self, view_queue): Thread.__init__(self) self.view_queue = view_queue # Axis currents. Incremented by PID loop self.axis_currents = np.array([0, 0, 0], dtype=float) # Used for I control self.error_integral = np.array([0, 0, 0], dtype=float) # Hardware checks are done in the init method to allow for exception handling in main thread # This means the run method should/must be called directly after Thread object creation. # Make sure we really have magnetometer data if not g.MAGNETOMETER.connected: ui_print("\nError: The magnetometer is not connected. Required for ambient field calibration.") raise DeviceAccessError("The magnetometer is not connected. Required for ambient field calibration.") # Acquire cage device. This resource will only be released after the thread is ended. try: self.cage_dev = g.CAGE_DEVICE.request_proxy() except DeviceBusy: ui_print("\nError: Failed to acquire coil control. Required for ambient field calibration.") raise DeviceAccessError("Failed to acquire coil control. Required for ambient field calibration.") def run(self): try: self.calibration_procedure() self.put_message('finished', None) except Exception as e: self.put_message('failed', e) finally: self.cage_dev.close() def calibration_procedure(self): raw_experiment_data = [] start_time = datetime.now() target_time = 0 current_time = datetime.now() while (current_time - start_time).seconds < self.SETTLE_TIME: # Each axis runs its own PID controller. They are slightly coupled by unorthogonality, which should # hopefully not destabilize the feedback loop for i in range(3): # Error in tesla dt = self.TIME_DELTA e = g.MAGNETOMETER.field[i] # Change in control current du = e * self.P_CONTROL + self.error_integral[i] * self.I_CONTROL self.axis_currents[i] += du*dt # Update integral # Add increment self.error_integral[i] += e*dt # Clamp range self.error_integral = np.clip(self.error_integral, -self.I_LIMIT, self.I_LIMIT) # Apply new field actuation self.cage_dev.set_signed_currents(self.axis_currents) # Set new progress indicator for UI self.put_message('progress', (current_time - start_time).seconds / self.SETTLE_TIME) # Sleep until next iteration target_time += self.TIME_DELTA sleep_time = ((start_time - current_time).total_seconds() + target_time) if sleep_time > 0: time.sleep(sleep_time) current_time = datetime.now() coil_constants = np.array([g.CAGE_DEVICE.axes[i].coil_const for i in range(3)]) for i, axis in zip(range(3), ['x', 'y', 'z']): raw_experiment_data.append({'axis': axis, 'cancellation_current': -self.axis_currents[i], 'ambient_field': -self.axis_currents[i] * coil_constants[i], 'residual_field': g.MAGNETOMETER.field[i]}) results = {'ambient': -self.axis_currents, 'ambient_ut': -self.axis_currents * coil_constants * 1e6, 'residual': g.MAGNETOMETER.field, 'raw_data': raw_experiment_data} self.put_message('ambient_data', results) # Put device into an off and ready state self.cage_dev.idle() def put_message(self, command, arg): self.view_queue.put({'cmd': command, 'arg': arg}) class CoilConstantCalibration(Thread): MEASUREMENT_RANGE = 3 # A. Will extend into negative and positive sign MEASUREMENT_POINTS = 4 # Excludes zero. eg 0.5, 1, 1.5, 2 SETTLE_TIME = 3 # Time until new measurement is ready after setting current def __init__(self, view_queue): Thread.__init__(self) self.view_queue = view_queue # Hardware checks are done in the init method to allow for exception handling in main thread # This means the run method should/must be called directly after Thread object creation. # Make sure we really have magnetometer data if not g.MAGNETOMETER.connected: ui_print("\nError: The magnetometer is not connected. Required for ambient field calibration.") raise DeviceAccessError("The magnetometer is not connected. Required for ambient field calibration.") # Acquire cage device. This resource will only be released after the thread is ended. try: self.cage_dev = g.CAGE_DEVICE.request_proxy() except DeviceBusy: ui_print("\nError: Failed to acquire coil control. Required for ambient field calibration.") raise DeviceAccessError("Failed to acquire coil control. Required for ambient field calibration.") def run(self): try: self.calibration_procedure() self.put_message('finished', None) except Exception as e: self.put_message('failed', e) finally: self.cage_dev.close() def calibration_procedure(self): # All generated fields will be compared to this using a simple difference method ambient_field = g.MAGNETOMETER.field # This generates linearly spaced current setpoints and excludes zero currents = np.linspace(-self.MEASUREMENT_RANGE, self.MEASUREMENT_RANGE, self.MEASUREMENT_POINTS * 2 + 1) currents = np.delete(currents, self.MEASUREMENT_POINTS) # Stores three vectors that correspond to the x,y,z actuation field directions. axis_field_directions = [] # Result variables coil_constants = np.zeros(3) k_deviations = np.zeros(3) raw_experiment_data = [] # The coil constant must be determined for every axis for i in range(3): k_samples = [] for c_idx, c in enumerate(currents): # Set current c_vec = [0, 0, 0] c_vec[i] = c self.cage_dev.set_signed_currents(c_vec) time.sleep(self.SETTLE_TIME) # Get coil constant field = g.MAGNETOMETER.field field_diff_mag = np.linalg.norm(field - ambient_field) sign = 1 if field[i] - ambient_field[i] >= 0 else -1 k =(field_diff_mag * sign) / c k_samples.append(k) # Save vector as principal coil direction if it is the last sample with the largest positive current if c_idx == currents.shape[0] - 1: axis_field_directions.append(g.MAGNETOMETER.field - ambient_field) # Set new progress indicator for UI self.put_message('progress', ((c_idx / (self.MEASUREMENT_POINTS * 2)) + i) / 3) # Save into raw data vector raw_experiment_data.append({'axis': i, 'I_x': c_vec[0], 'I_y': c_vec[1], 'I_z': c_vec[2], 'delta_mag_B': field_diff_mag, 'sign': sign, 'K': k}) # Average samples for axis coil_constants[i] = np.average(k_samples) k_deviations[i], _ = self.calculate_standard_deviation(k_samples) # Put device into an off and ready state self.cage_dev.idle() angles = {'xy': self.angle_between(axis_field_directions[0], axis_field_directions[1]) * 180 / pi, 'yz': self.angle_between(axis_field_directions[1], axis_field_directions[2]) * 180 / pi, 'xz': self.angle_between(axis_field_directions[0], axis_field_directions[2]) * 180 / pi} self.put_message('coil_constant_results', {'k': coil_constants, 'k_dev': k_deviations, 'angle': angles, 'raw_data': raw_experiment_data}) @staticmethod def calculate_standard_deviation(data): n = len(data) average = 0 for datapoint in data: average += datapoint / n std_dev = 0 deviations = [] for datapoint in data: std_dev += (datapoint - average) ** 2 deviations.append(datapoint - average) std_dev = sqrt(std_dev / n) return std_dev, deviations @staticmethod def angle_between(v1, v2): """ Returns the angle in radians between vectors 'v1' and 'v2'""" v1_u = v1 / np.linalg.norm(v1) v2_u = v2 / np.linalg.norm(v2) return np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0)) def put_message(self, command, arg): self.view_queue.put({'cmd': command, 'arg': arg}) class MagnetometerCalibration(Thread): TEST_VECTOR_MAGNITUDE = 100e-6 # In Tesla. Chosen so it can be achieved with a 3A PSU. def __init__(self, view_queue, calibration_points, calibration_interval): Thread.__init__(self) self.view_queue = view_queue self.calibration_points = calibration_points self.calibration_interval = calibration_interval # Hardware checks are done in the init method to allow for exception handling in main thread # This means the run method should/must be called directly after Thread object creation. # Make sure we really have magnetometer data if not g.MAGNETOMETER.connected: ui_print("\nError: The magnetometer is not connected. Required for ambient field calibration.") raise DeviceAccessError("The magnetometer is not connected. Required for ambient field calibration.") # Acquire cage device. This resource will only be released after the thread is ended. try: self.cage_dev = g.CAGE_DEVICE.request_proxy() except DeviceBusy: ui_print("\nError: Failed to acquire coil control. Required for ambient field calibration.") raise DeviceAccessError("Failed to acquire coil control. Required for ambient field calibration.") def run(self): try: self.calibration_procedure() self.put_message('finished', None) except Exception as e: self.put_message('failed', e) finally: self.cage_dev.close() def calibration_procedure(self): # According to method outlined in: # Zikmund, A. & Janosek, Michal. (2014). Calibration procedure for triaxial magnetometers without a compensating # system or moving parts. # Find sensor offsets. They must be found prior to applying the chosen calibration algorithm # This will be accurate if the cage was recently calibrated self.cage_dev.set_field_compensated([0, 0, 0]) # Sleep for a certain duration to allow psu to stabilize output and magnetometer to supply readings time.sleep(self.calibration_interval) # The offsets can easily be read from the magnetometer offsets = g.MAGNETOMETER.field # Set new progress indicator for UI self.set_progress(True, 0) # Generate our set of test vectors test_vectors = self.fibonacci_sphere(self.calibration_points) # Holds the knowns for each row of our system of equations. These are M, B_x, B_y, B_z # (B_E is constant for the test and not stored in the array) # Each sensor axis has its own independent system of equations samples = [[], [], []] # Collect sensor data for each test vector for vec_idx, test_vec in enumerate(test_vectors): # Command output applied_vec = test_vec * self.TEST_VECTOR_MAGNITUDE self.cage_dev.set_field_raw(applied_vec) # Sleep for a certain duration to allow psu to stabilize output and magnetometer to supply readings time.sleep(self.calibration_interval) # Read output and save to array for solver later reading = g.MAGNETOMETER.field - offsets for i in range(3): row = {'m': reading[i], 'b_x': applied_vec[0], 'b_y': applied_vec[1], 'b_z': applied_vec[2]} # print("[Axis {}] {}".format(i, row)) samples[i].append(row) # Set new progress indicator for UI self.set_progress(True, vec_idx + 1) # Put device into an off and ready state self.cage_dev.idle() # Use collected data to build and solve system of equations sensor_parameters = self.solve_system(samples, offsets) # Pass results to UI self.put_message('calibration_data', sensor_parameters) def set_progress(self, offset_complete, test_vec_index): progress = int(offset_complete) * 0.2 + (test_vec_index / self.calibration_points) * 0.8 self.put_message('progress', progress) def solve_system(self, samples, offset_data): # Calculate magnitude of ambient field b_e_x = g.CAGE_DEVICE.axes[0].ambient_field b_e_y = g.CAGE_DEVICE.axes[1].ambient_field b_e_z = g.CAGE_DEVICE.axes[2].ambient_field b_e = sqrt(b_e_x**2 + b_e_y**2 + b_e_z**2) # Perform least squares optimization on all magnetometer axes sensor_parameters = [] for axis, axis_samples in enumerate(samples): result = scipy.optimize.least_squares(self.residual_function, (1.0, pi/4, pi/4, pi/4), args=(b_e, axis_samples), gtol=1e-13) s, alpha_e, alpha, beta = result.x residual = np.max(np.abs(result.fun)) sensor_parameters.append({'sensitivity': s, 'offset': offset_data[axis], 'alpha_e': alpha_e, 'alpha': alpha, 'beta': beta, 'residual': residual}) return sensor_parameters # Function passed to scipy for the optimization @staticmethod def residual_function(x, b_e, samples): # Unpack vector. These unknown parameters are described in the calibration paper s, alpha_e, alpha, beta = x # Residual vector res = [] for sample in samples: # Unpack row coefficients: m = sample['m'] b_x = sample['b_x'] b_y = sample['b_y'] b_z = sample['b_z'] res.append(m - s * (b_e*sin(alpha_e) + b_x*cos(alpha)*cos(beta) + b_y*cos(alpha)*sin(beta) + b_z*sin(alpha))) return res @staticmethod def fibonacci_sphere(samples): """ Algorithm to generate roughly equally spaced points on a sphere From https://stackoverflow.com/a/26127012""" points = [] phi = pi * (3.0 - sqrt(5.0)) # golden angle in radians for i in range(samples): y = 1 - (i / float(samples - 1)) * 2 # y goes from 1 to -1 radius = sqrt(1 - y * y) # radius at y theta = phi * i # golden angle increment x = cos(theta) * radius z = sin(theta) * radius points.append(np.array([x, y, z])) return points def put_message(self, command, arg): self.view_queue.put({'cmd': command, 'arg': arg})