forked from zietzm/Helmholtz_Test_Bench
Implemented ambient field calibration tool.
This commit is contained in:
@@ -10,6 +10,7 @@ from src.helmholtz_cage_device import HelmholtzCageDevice
|
||||
import src.globals as g
|
||||
import src.config_handling as config
|
||||
import src.csv_logging as log
|
||||
from src.magnetometer import MagnetometerProxy
|
||||
from src.user_interface import HelmholtzGUI, ExecuteCSVMode
|
||||
from src.socket_control import SocketInterfaceThread
|
||||
from src.utility import ui_print
|
||||
@@ -49,7 +50,10 @@ try: # start normal operations
|
||||
config.CONFIG_OBJECT = config.get_config_from_file(config.CONFIG_FILE) # read configuration data from config file
|
||||
|
||||
print("Starting setup...")
|
||||
g.CAGE_DEVICE = HelmholtzCageDevice() # initiate communication with devices and initialize all major program objects
|
||||
# initiate communication with devices and initialize all major program objects
|
||||
g.CAGE_DEVICE = HelmholtzCageDevice()
|
||||
# Mostly a data structure to hold field data broadcast by connected tcp client with HW access.
|
||||
g.MAGNETOMETER = MagnetometerProxy()
|
||||
|
||||
print("\nOpening User Interface...")
|
||||
|
||||
|
||||
@@ -33,7 +33,10 @@ class ArduinoDevice(Arduino):
|
||||
"""Returns a bool indicating whether the axis polarity is reversed (True)."""
|
||||
return self.digitalRead(self.pins[idx]) # pin is HIGH --> relay is switched
|
||||
|
||||
def shutdown(self):
|
||||
def idle(self):
|
||||
"""Sets relay switching pins to low to de-power most of the electronics box"""
|
||||
for pin in self.pins:
|
||||
self.digitalWrite(pin, "LOW")
|
||||
|
||||
def shutdown(self):
|
||||
self.idle()
|
||||
|
||||
@@ -0,0 +1,142 @@
|
||||
import time
|
||||
from datetime import datetime
|
||||
from threading import Thread
|
||||
|
||||
import numpy as np
|
||||
|
||||
from src.utility import ui_print
|
||||
from src.exceptions import DeviceBusy, DeviceAccessError
|
||||
import src.globals as g
|
||||
|
||||
|
||||
class AmbientFieldCalibration(Thread):
|
||||
"""Varies the coil-generated fields until a configuration is reached which zeros the connected magnetometer.
|
||||
The magnetometer does not need to be centered. The axes of the magnetometer must match the coil configuration!"""
|
||||
|
||||
# Timeout/settling time for the calibration procedure. An acceptable duration for the PI is required
|
||||
SETTLE_TIME = 15
|
||||
# PID controller time delta
|
||||
TIME_DELTA = 0.25
|
||||
P_CONTROL = -1e4 # 0.4 A/s slew-rate at 40uT
|
||||
I_CONTROL = -2.5e4 # 0.025 A/s slew-rate for 1uTs
|
||||
I_LIMIT = 1e-6 # uTs, Limit I to 0.025 A/s slew-rate to prevent wind-up
|
||||
# D_CONTROL = Not implemented for now
|
||||
|
||||
def __init__(self, view_queue):
|
||||
Thread.__init__(self)
|
||||
self.view_queue = view_queue
|
||||
|
||||
# Axis currents. Incremented by PID loop
|
||||
self.axis_currents = np.array([0, 0, 0], dtype=float)
|
||||
# Used for I control
|
||||
self.error_integral = np.array([0, 0, 0], dtype=float)
|
||||
|
||||
# Hardware checks are done in the init method to allow for exception handling in main thread
|
||||
# This means the run method should/must be called directly after Thread object creation.
|
||||
|
||||
# Make sure we really have magnetometer data
|
||||
if not g.MAGNETOMETER.connected:
|
||||
ui_print("\nError: The magnetometer is not connected. Required for ambient field calibration.")
|
||||
raise DeviceAccessError("The magnetometer is not connected. Required for ambient field calibration.")
|
||||
|
||||
# Acquire cage device. This resource will only be released after the thread is ended.
|
||||
try:
|
||||
self.cage_dev = g.CAGE_DEVICE.request_proxy()
|
||||
except DeviceBusy:
|
||||
ui_print("\nError: Failed to acquire coil control. Required for ambient field calibration.")
|
||||
raise DeviceAccessError("Failed to acquire coil control. Required for ambient field calibration.")
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
self.calibration_procedure()
|
||||
self.put_message('finished', None)
|
||||
except Exception as e:
|
||||
self.put_message('failed', e)
|
||||
finally:
|
||||
self.cage_dev.close()
|
||||
|
||||
def calibration_procedure(self):
|
||||
start_time = datetime.now()
|
||||
target_time = 0
|
||||
current_time = datetime.now()
|
||||
while (current_time - start_time).seconds < self.SETTLE_TIME:
|
||||
# Each axis runs its own PID controller. They are slightly coupled by unorthogonality, which should
|
||||
# hopefully not destabilize the feedback loop
|
||||
for i in range(3):
|
||||
# Error in tesla
|
||||
dt = self.TIME_DELTA
|
||||
e = g.MAGNETOMETER.field[i]
|
||||
# Change in control current
|
||||
du = e * self.P_CONTROL + self.error_integral[i] * self.I_CONTROL
|
||||
self.axis_currents[i] += du*dt
|
||||
|
||||
# Update integral
|
||||
# Add increment
|
||||
self.error_integral[i] += e*dt
|
||||
# Clamp range
|
||||
self.error_integral = np.clip(self.error_integral, -self.I_LIMIT, self.I_LIMIT)
|
||||
|
||||
# Apply new field actuation
|
||||
self.cage_dev.set_signed_currents(self.axis_currents)
|
||||
|
||||
# Set new progress indicator for UI
|
||||
self.put_message('progress', (current_time - start_time).seconds / self.SETTLE_TIME)
|
||||
|
||||
# Sleep until next iteration
|
||||
target_time += self.TIME_DELTA
|
||||
sleep_time = ((start_time - current_time).total_seconds() + target_time)
|
||||
if sleep_time > 0:
|
||||
time.sleep(sleep_time)
|
||||
current_time = datetime.now()
|
||||
|
||||
coil_constants = np.array([g.CAGE_DEVICE.axes[i].coil_const for i in range(3)])
|
||||
results = {'ambient': -self.axis_currents,
|
||||
'ambient_ut': -self.axis_currents * coil_constants * 1e6,
|
||||
'residual': g.MAGNETOMETER.field}
|
||||
self.put_message('ambient_data', results)
|
||||
|
||||
# Put device into an off and ready state
|
||||
self.cage_dev.idle()
|
||||
|
||||
def put_message(self, command, arg):
|
||||
self.view_queue.put({'cmd': command, 'arg': arg})
|
||||
|
||||
|
||||
class CoilConstantCalibration(Thread):
|
||||
def __init__(self, view_queue):
|
||||
Thread.__init__(self)
|
||||
self.view_queue = view_queue
|
||||
|
||||
# Hardware checks are done in the init method to allow for exception handling in main thread
|
||||
# This means the run method should/must be called directly after Thread object creation.
|
||||
|
||||
# Make sure we really have magnetometer data
|
||||
if not g.MAGNETOMETER.connected:
|
||||
ui_print("\nError: The magnetometer is not connected. Required for ambient field calibration.")
|
||||
raise DeviceAccessError("The magnetometer is not connected. Required for ambient field calibration.")
|
||||
|
||||
# Acquire cage device. This resource will only be released after the thread is ended.
|
||||
try:
|
||||
self.cage_dev = g.CAGE_DEVICE.request_proxy()
|
||||
except DeviceBusy:
|
||||
ui_print("\nError: Failed to acquire coil control. Required for ambient field calibration.")
|
||||
raise DeviceAccessError("Failed to acquire coil control. Required for ambient field calibration.")
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
self.calibration_procedure()
|
||||
self.put_message('finished', None)
|
||||
except Exception as e:
|
||||
self.put_message('failed', e)
|
||||
finally:
|
||||
self.cage_dev.close()
|
||||
|
||||
def calibration_procedure(self):
|
||||
# All generated fields will be compared to this using a simple difference method
|
||||
ambient_field = g.MAGNETOMETER.field
|
||||
# The coil constant must be determined for every axis
|
||||
for i in range(3):
|
||||
pass
|
||||
|
||||
def put_message(self, command, arg):
|
||||
self.view_queue.put({'cmd': command, 'arg': arg})
|
||||
@@ -0,0 +1,13 @@
|
||||
class ProxyNotOwnedException(Exception):
|
||||
"""Should not occur in correct operation. For whatever reason, this means the proxy was invalidated."""
|
||||
pass
|
||||
|
||||
|
||||
class DeviceAccessError(Exception):
|
||||
"""General error indicating that HW access failed."""
|
||||
pass
|
||||
|
||||
|
||||
class DeviceBusy(DeviceAccessError):
|
||||
"""Error thrown when the HW proxy (i.e. access) cannot be acquired"""
|
||||
pass
|
||||
+4
-6
@@ -9,6 +9,9 @@ app = None
|
||||
# The main access point for all hardware commands
|
||||
CAGE_DEVICE = None
|
||||
|
||||
# Magnetometer proxy object providing access to mag. data from an external client per tcp interface
|
||||
MAGNETOMETER = None
|
||||
|
||||
# list with the names of each axis, used mainly for printing functions
|
||||
AXIS_NAMES = ["X-Axis", "Y-Axis", "Z-Axis"]
|
||||
|
||||
@@ -45,9 +48,4 @@ default_psu_config = {
|
||||
|
||||
# Configuration for socket interface
|
||||
SOCKET_PORT = 6677
|
||||
SOCKET_MAX_CONNECTIONS = 5
|
||||
|
||||
|
||||
# Exception used globally throughout the application
|
||||
class DeviceNotConnected(Exception):
|
||||
pass
|
||||
SOCKET_MAX_CONNECTIONS = 5
|
||||
@@ -1,25 +1,16 @@
|
||||
import traceback
|
||||
from threading import RLock, Thread, Event
|
||||
from tkinter import messagebox
|
||||
from copy import deepcopy
|
||||
|
||||
import numpy as np
|
||||
|
||||
from src.arduino_device import ArduinoDevice
|
||||
from src.psu_device import PSUDevicePS2000B, PSUDeviceQL355TP
|
||||
from src.utility import ui_print
|
||||
from src.exceptions import DeviceBusy, ProxyNotOwnedException
|
||||
import src.config_handling as config_handling
|
||||
import src.globals as g
|
||||
|
||||
|
||||
class ProxyNotOwnedException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class DeviceBusy(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class HelmholtzCageDevice:
|
||||
"""This is the central object for controlling all the test bench related HW. This way, access can be
|
||||
synchronized and exclusive to a single controller at once. This device always exists, irrespective of
|
||||
@@ -142,10 +133,16 @@ class HelmholtzCageDevice:
|
||||
ui_print("Error creating PSU device:\n{}".format(e))
|
||||
|
||||
# Zero and activate channels. This is a sort of "armed" state so that we can send commands later
|
||||
if self.psu1 is not None:
|
||||
self.psu1.idle()
|
||||
if self.psu2 is not None:
|
||||
self.psu2.idle()
|
||||
self.idle()
|
||||
|
||||
def idle(self):
|
||||
""" Zero and activate channels """
|
||||
if self.psu1 is not None:
|
||||
self.psu1.idle()
|
||||
if self.psu2 is not None:
|
||||
self.psu2.idle()
|
||||
if self.arduino is not None:
|
||||
self.arduino.idle()
|
||||
|
||||
def request_proxy(self):
|
||||
"""Returns a new HelmholtzCageProxy or None, depending on if access is available"""
|
||||
@@ -166,16 +163,20 @@ class HelmholtzCageDevice:
|
||||
|
||||
def release_proxy(self, proxy_obj):
|
||||
"""Releases the proxy to free access for other controllers. Should only be called when proxy is destroyed"""
|
||||
with self.proxy_lock:
|
||||
if self.proxy_valid(proxy_obj):
|
||||
# This only frees the interface if it really was the active proxy
|
||||
if id(proxy_obj) == self.proxy_id:
|
||||
self.proxy_id = None
|
||||
# Otherwise do nothing, this case requires no behaviour
|
||||
self.proxy_id = None
|
||||
# Otherwise do nothing, this case requires no behaviour
|
||||
|
||||
def __exit__(self, *args):
|
||||
"""Enables: with g.CAGE_DEVICE as dev:"""
|
||||
self.release_proxy(self.context_manager_proxy)
|
||||
|
||||
def proxy_valid(self, proxy_obj):
|
||||
"""Returns True if the proxy currently owns the device."""
|
||||
with self.proxy_lock:
|
||||
return id(proxy_obj) == self.proxy_id
|
||||
|
||||
def subscribe_status_updates(self, callback):
|
||||
# List containing all interested subscribers.
|
||||
# We won't check if a callback is added twice. Not our responsibility
|
||||
@@ -216,6 +217,8 @@ class HelmholtzCageDevice:
|
||||
self._set_field_raw(command_arg)
|
||||
elif command_string == "set_field_compensated":
|
||||
self._set_field_compensated(command_arg)
|
||||
elif command_string == 'idle':
|
||||
self.idle()
|
||||
else:
|
||||
raise Exception("Command unknown!")
|
||||
except Exception as e:
|
||||
@@ -523,9 +526,19 @@ class HelmholtzCageProxy:
|
||||
def set_field_compensated(self, vector):
|
||||
self.cage_device.queue_command(self, {'command': 'set_field_compensated', 'arg': vector})
|
||||
|
||||
def __del__(self):
|
||||
def idle(self):
|
||||
"""Puts the helmholtz cage into an idle state with zeroed fields"""
|
||||
self.cage_device.queue_command(self, {'command': 'idle', 'arg': None})
|
||||
|
||||
def close(self):
|
||||
self.cage_device.release_proxy(self)
|
||||
|
||||
def __del__(self):
|
||||
# This is a fallback method and should not be relied on. Call 'close' manually
|
||||
if self.cage_device.proxy_valid(self):
|
||||
self.cage_device.release_proxy(self)
|
||||
ui_print("Warning: Proxy implicitly released. Use close() instead.")
|
||||
|
||||
|
||||
def value_in_limits(axis, key, value):
|
||||
"""Check if value is within safe limits (set in globals.py)"""
|
||||
@@ -539,4 +552,4 @@ def value_in_limits(axis, key, value):
|
||||
elif float(value) < float(min_value): # value is too low
|
||||
return 'LOW'
|
||||
else: # value is within limits
|
||||
return 'OK'
|
||||
return 'OK'
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
from threading import RLock
|
||||
import numpy as np
|
||||
|
||||
|
||||
class MagnetometerProxy:
|
||||
"""This class facilitates magnetometer data access, which is provided by a tcp client."""
|
||||
|
||||
def __init__(self):
|
||||
self.connected = False
|
||||
self._field_lock = RLock()
|
||||
self._field = np.array([0, 0, 0])
|
||||
|
||||
def set_connection_closed(self):
|
||||
"""Used by the TCP connection manager to indicate that the client has disconnected and that no new data
|
||||
will arrive."""
|
||||
self.connected = False
|
||||
|
||||
@property
|
||||
def field(self):
|
||||
with self._field_lock:
|
||||
return self._field
|
||||
|
||||
@field.setter
|
||||
def field(self, val):
|
||||
# If we receive data, we can assume we are connected
|
||||
self.connected = True
|
||||
with self._field_lock:
|
||||
self._field = val
|
||||
+33
-18
@@ -47,7 +47,7 @@ import src.helmholtz_cage_device as helmholtz_cage_device
|
||||
# This function must be called before sending HW commands.
|
||||
|
||||
|
||||
SOCKET_INTERFACE_API_VERSION = "1"
|
||||
SOCKET_INTERFACE_API_VERSION = "2"
|
||||
|
||||
|
||||
class ClientConnectionThread(Thread):
|
||||
@@ -56,14 +56,12 @@ class ClientConnectionThread(Thread):
|
||||
self.client_socket = client_socket
|
||||
self.client_address = address
|
||||
|
||||
# Throws DeviceBusy exception if it couldn't be acquired
|
||||
try:
|
||||
self.cage_dev = g.CAGE_DEVICE.request_proxy()
|
||||
except helmholtz_cage_device.DeviceBusy as e:
|
||||
self.client_socket.sendall("err".encode('utf-8'))
|
||||
# Bubble up to connection manager
|
||||
raise e
|
||||
# Indicates whether this thread was providing magnetometer data. If yes, set the magnetometer proxy object as
|
||||
# disconnected when the socket is closed.
|
||||
self.magnetometer_connection = False
|
||||
|
||||
# Holds proxy model to cage device if required. Is initialized lazily to prevent always blocking interface
|
||||
self._cage_dev = None
|
||||
self.api_compat = False # Indicates whether the client has a compatible API version
|
||||
|
||||
def run(self):
|
||||
@@ -73,8 +71,9 @@ class ClientConnectionThread(Thread):
|
||||
# Check for end of stream
|
||||
if raw_msg == "":
|
||||
self.client_socket.close()
|
||||
# TODO: This should be done explicitly instead of relying on __del__...
|
||||
self.cage_dev = None
|
||||
if self._cage_dev:
|
||||
self._cage_dev.close()
|
||||
g.MAGNETOMETER.connected = False
|
||||
return
|
||||
# Process message
|
||||
for char in raw_msg:
|
||||
@@ -130,6 +129,16 @@ class ClientConnectionThread(Thread):
|
||||
current_vec = np.array([x, y, z], dtype=np.float32)
|
||||
self.cage_dev.set_signed_currents(current_vec)
|
||||
return "1"
|
||||
elif tokens[0] == "magnetometer_field":
|
||||
"""The client is sending us information about the magnetometer state. This is used for some
|
||||
calibration procedures for example."""
|
||||
x = float(tokens[1])
|
||||
y = float(tokens[2])
|
||||
z = float(tokens[3])
|
||||
field = np.array([x, y, z], dtype=np.float32)
|
||||
g.MAGNETOMETER.field = field
|
||||
self.magnetometer_connection = True
|
||||
return "1"
|
||||
else:
|
||||
# The message given is unknown. The programmer probably did not intend for this, so display an error
|
||||
# even if is not inherently problematic.
|
||||
@@ -137,6 +146,17 @@ class ClientConnectionThread(Thread):
|
||||
else:
|
||||
raise Exception("The command '{}' may not be called before 'declare_api_version'".format(tokens[0]))
|
||||
|
||||
@property
|
||||
def cage_dev(self):
|
||||
if self._cage_dev is None:
|
||||
try:
|
||||
self._cage_dev = g.CAGE_DEVICE.request_proxy()
|
||||
except helmholtz_cage_device.DeviceBusy:
|
||||
# Return none. This will cause an error and show up as "err" on the client
|
||||
# A more helpful error message is shown on application side
|
||||
ui_print("Socket client attempted to acquire busy device.")
|
||||
return self._cage_dev
|
||||
|
||||
|
||||
class SocketInterfaceThread(Thread):
|
||||
def __init__(self):
|
||||
@@ -149,14 +169,9 @@ class SocketInterfaceThread(Thread):
|
||||
def run(self):
|
||||
while True:
|
||||
(client_socket, address) = self.server_socket.accept()
|
||||
try:
|
||||
new_thread = ClientConnectionThread(client_socket, address)
|
||||
new_thread.start()
|
||||
ui_print("Accepted connection from {}".format(address))
|
||||
except helmholtz_cage_device.DeviceBusy:
|
||||
ui_print("Denied connection from {}. Device is busy.".format(address))
|
||||
client_socket.close()
|
||||
|
||||
new_thread = ClientConnectionThread(client_socket, address)
|
||||
new_thread.start()
|
||||
ui_print("Accepted connection from {}".format(address))
|
||||
|
||||
def configure_tcp_port(self):
|
||||
# Creates and configures the listening port
|
||||
|
||||
+240
-26
@@ -3,7 +3,7 @@
|
||||
|
||||
# ToDo: optimize layout for smaller screen (like on IRS clean room PC)
|
||||
# import packages for user interface:
|
||||
|
||||
import queue
|
||||
from queue import Queue, Empty
|
||||
from tkinter import *
|
||||
from tkinter import ttk
|
||||
@@ -23,6 +23,8 @@ import src.globals as g
|
||||
import src.csv_threading as csv
|
||||
import src.config_handling as config
|
||||
import src.csv_logging as log
|
||||
from src.calibration import AmbientFieldCalibration
|
||||
from src.exceptions import DeviceAccessError
|
||||
from src.utility import ui_print
|
||||
import src.helmholtz_cage_device as helmholtz_cage_device
|
||||
|
||||
@@ -47,19 +49,23 @@ class HelmholtzGUI(Tk):
|
||||
|
||||
self.Menu = TopMenu(self) # display dropdown menu bar at the top (see TopMenu class for details)
|
||||
|
||||
mainArea = Frame(self, padx=10, pady=10) # create main area Frame where controls of each mode are displayed
|
||||
mainArea.pack(side="top", fill="both", expand=True) # pack main area at the top of the window
|
||||
main_area = Frame(self, padx=10, pady=10) # create main area Frame where controls of each mode are displayed
|
||||
main_area.pack(side="top", fill="both", expand=True) # pack main area at the top of the window
|
||||
|
||||
mainArea.grid_rowconfigure(0, weight=1) # configure rows and columns of the Tkinter grid to expand with window
|
||||
mainArea.grid_columnconfigure(0, weight=1)
|
||||
main_area.grid_rowconfigure(0, weight=1) # configure rows and columns of the Tkinter grid to expand with window
|
||||
main_area.grid_columnconfigure(0, weight=1)
|
||||
|
||||
# initialize the GUI pages for the different modes and setup switching between them
|
||||
# see https://pythonprogramming.net/change-show-new-frame-tkinter/ for explanation
|
||||
# switching between pages is done with show_frame() method
|
||||
|
||||
self.pages = {} # dictionary for storing all pages (different modes, displayed in main area)
|
||||
for P in [ManualMode, HardwareConfiguration, ExecuteCSVMode, ConfigureLogging]: # do this for every mode page
|
||||
page = P(mainArea, self) # initialize the page with the mainArea frame as the parent
|
||||
for P in [ManualMode,
|
||||
HardwareConfiguration,
|
||||
CalibrateAmbientField,
|
||||
ExecuteCSVMode,
|
||||
ConfigureLogging]: # do this for every mode page
|
||||
page = P(main_area, self) # initialize the page with the main_area frame as the parent
|
||||
self.pages[P] = page # add the page to the dictionary
|
||||
page.grid(row=0, column=0, sticky="nsew") # place all pages in the same place in the GUI
|
||||
|
||||
@@ -87,33 +93,34 @@ class HelmholtzGUI(Tk):
|
||||
class TopMenu:
|
||||
# the menu bar at the top of the window
|
||||
def __init__(self, window):
|
||||
self.window = window
|
||||
menu = Menu(window) # initialize Menu object
|
||||
window.config(menu=menu) # put menu at the top of the window
|
||||
|
||||
ModeSelector = Menu(menu) # create a submenu object
|
||||
menu.add_cascade(label="Menu", menu=ModeSelector) # add a dropdown with the submenu object
|
||||
mode_selector = Menu(menu) # create a submenu object
|
||||
menu.add_cascade(label="Menu", menu=mode_selector) # add a dropdown with the submenu object
|
||||
# create the different options in the dropdown:
|
||||
ModeSelector.add_command(label="Static Manual Input", command=lambda: self.manual_mode(window))
|
||||
ModeSelector.add_command(label="Execute CSV Sequence", command=lambda: self.execute_csv_mode(window))
|
||||
ModeSelector.add_separator()
|
||||
ModeSelector.add_command(label="Configure Data Logging", command=lambda: self.logging(window))
|
||||
ModeSelector.add_command(label="Settings...", command=lambda: self.configuration(window))
|
||||
mode_selector.add_command(label="Static Manual Input", command=self.manual_mode)
|
||||
mode_selector.add_command(label="Execute CSV Sequence", command=self.execute_csv_mode)
|
||||
mode_selector.add_command(label="Calibrate Ambient Field", command=self.calibrate_ambient)
|
||||
mode_selector.add_separator()
|
||||
mode_selector.add_command(label="Configure Data Logging", command=self.logging)
|
||||
mode_selector.add_command(label="Settings...", command=self.configuration)
|
||||
|
||||
@staticmethod
|
||||
def manual_mode(window): # switch to the manual mode page
|
||||
window.show_frame(ManualMode)
|
||||
def manual_mode(self): # switch to the manual mode page
|
||||
self.window.show_frame(ManualMode)
|
||||
|
||||
@staticmethod
|
||||
def configuration(window): # switch to the settings page
|
||||
window.show_frame(HardwareConfiguration)
|
||||
def configuration(self): # switch to the settings page
|
||||
self.window.show_frame(HardwareConfiguration)
|
||||
|
||||
@staticmethod
|
||||
def execute_csv_mode(window): # switch to the CSV execution page
|
||||
window.show_frame(ExecuteCSVMode)
|
||||
def calibrate_ambient(self):
|
||||
self.window.show_frame(CalibrateAmbientField)
|
||||
|
||||
@staticmethod
|
||||
def logging(window): # switch to the logging settings page
|
||||
window.show_frame(ConfigureLogging)
|
||||
def execute_csv_mode(self): # switch to the CSV execution page
|
||||
self.window.show_frame(ExecuteCSVMode)
|
||||
|
||||
def logging(self): # switch to the logging settings page
|
||||
self.window.show_frame(ConfigureLogging)
|
||||
|
||||
|
||||
class ManualMode(Frame):
|
||||
@@ -514,6 +521,212 @@ class ExecuteCSVMode(Frame):
|
||||
plotCanvas.get_tk_widget().grid(row=0, column=0, sticky="nesw") # place canvas in the UI
|
||||
|
||||
|
||||
class CalibrateAmbientField(Frame):
|
||||
def __init__(self, parent, controller):
|
||||
Frame.__init__(self, parent)
|
||||
self.parent = parent
|
||||
self.controller = controller
|
||||
|
||||
# To center window
|
||||
# self.columnconfigure(0, weight=1)
|
||||
self.rowconfigure(0, weight=1)
|
||||
self.left_column = Frame(self)
|
||||
self.left_column.grid(row=0, column=0, sticky="nsew")
|
||||
self.right_column = Frame(self)
|
||||
self.right_column.grid(row=0, column=1, sticky="nsew")
|
||||
self.left_column.rowconfigure(3, weight=1)
|
||||
|
||||
# Thread variables
|
||||
self.calibration_ambient_thread = None
|
||||
self.calibration_coil_constants_thread = None
|
||||
self.view_mpi_queue = Queue() # Receives status information from calibration procedure threads.
|
||||
|
||||
# UI variables
|
||||
self.connected_state_var = StringVar(value="Not connected")
|
||||
self.field_value_vars = [StringVar(value="No data"),
|
||||
StringVar(value="No data"),
|
||||
StringVar(value="No data")]
|
||||
self.calibration_procedure_progress_var = IntVar(value=0)
|
||||
self.ambient_field_result_vars = [StringVar(), StringVar(), StringVar()]
|
||||
self.ambient_field_ut_result_vars = [StringVar(), StringVar(), StringVar()]
|
||||
self.ambient_field_residual_vars = [StringVar(), StringVar(), StringVar()]
|
||||
|
||||
row_counter = 0
|
||||
|
||||
# Create headline
|
||||
header = Label(self.left_column, text="Ambient Field Calibration", font=HEADER_FONT)
|
||||
header.grid(row=row_counter, column=0, columnspan=2, padx=100, pady=20, sticky="nw")
|
||||
row_counter += 1
|
||||
|
||||
# Magnetometer connected indicator
|
||||
connected_status_frame = Frame(self.left_column)
|
||||
connected_status_frame.grid(row=row_counter, column=0, sticky="nw")
|
||||
connected_label = Label(connected_status_frame, text="Magnetometer state:", font=SUB_HEADER_FONT)
|
||||
connected_label.grid(row=0, column=0, padx=10, pady=20, sticky="nw")
|
||||
self.connected_state_label = Label(connected_status_frame, textvariable=self.connected_state_var, fg="red")
|
||||
self.connected_state_label.grid(row=0, column=1, padx=10, pady=20, sticky="nw")
|
||||
row_counter += 1
|
||||
|
||||
# Magnetometer field data grid
|
||||
field_data_frame = Frame(self.left_column)
|
||||
field_data_frame.grid(row=row_counter, column=0, sticky="nw")
|
||||
field_data_label = Label(field_data_frame, text="Field data:", font=SUB_HEADER_FONT)
|
||||
field_data_label.grid(row=0, column=0, padx=10, pady=3, sticky="nw")
|
||||
axis_labels = ['X:', 'Y:', 'Z:']
|
||||
for i in range(3):
|
||||
field_data_axis_label = Label(field_data_frame, text=axis_labels[i])
|
||||
field_data_axis_label.grid(row=i, column=1, padx=10, pady=3)
|
||||
|
||||
field_data_axis_data = Label(field_data_frame, textvariable=self.field_value_vars[i])
|
||||
field_data_axis_data.grid(row=i, column=2, padx=(20, 0), pady=3)
|
||||
|
||||
field_data_axis_units = Label(field_data_frame, text="\u03BCT")
|
||||
field_data_axis_units.grid(row=i, column=3, padx=5, pady=3)
|
||||
row_counter += 1
|
||||
|
||||
# Calibration start buttons
|
||||
start_button_frame = Frame(self.left_column)
|
||||
start_button_frame.grid(row=row_counter, column=0, sticky="sw")
|
||||
self.start_ambient_calibration_button = Button(start_button_frame, text="Calibrate Ambient Field",
|
||||
command=self.calibration_procedure_ambient,
|
||||
pady=5, padx=5, font=SMALL_BUTTON_FONT)
|
||||
self.start_ambient_calibration_button.grid(row=0, column=0, padx=10, pady=(30, 10))
|
||||
self.start_k_calibration_button = Button(start_button_frame, text="Calibrate Coil Constants",
|
||||
command=self.calibration_procedure_coil_constants,
|
||||
pady=5, padx=5, font=SMALL_BUTTON_FONT)
|
||||
self.start_k_calibration_button.grid(row=0, column=1, padx=10, pady=(30, 10))
|
||||
row_counter += 1
|
||||
|
||||
# Calibration progress bar
|
||||
progress_bar_frame = Frame(self.left_column)
|
||||
progress_bar_frame.grid(row=row_counter, column=0, sticky="swe")
|
||||
calibration_procedure_progress_label = Label(progress_bar_frame, text="Progress:")
|
||||
calibration_procedure_progress_label.grid(row=0, column=0, padx=10, pady=10)
|
||||
calibration_procedure_progress = ttk.Progressbar(progress_bar_frame,
|
||||
length=240,
|
||||
variable=self.calibration_procedure_progress_var)
|
||||
calibration_procedure_progress.grid(row=0, column=1, padx=10, pady=10, sticky="we")
|
||||
row_counter += 1
|
||||
|
||||
# Ambient field calibration results
|
||||
row_counter = 0
|
||||
ambient_field_results_frame = LabelFrame(self.right_column, text="Ambient Field Results")
|
||||
ambient_field_results_frame.grid(row=row_counter, column=1, padx=(100, 0), pady=20, sticky="nw")
|
||||
for i, label in enumerate(['X', 'Y', 'Z']):
|
||||
axis_label = Label(ambient_field_results_frame, text=label)
|
||||
axis_label.grid(row=0, column=i+1, padx=5, pady=5, sticky="nw")
|
||||
# Ambient field value (A)
|
||||
ambient_field_results_label = Label(ambient_field_results_frame, text="Ambient Field:")
|
||||
ambient_field_results_label.grid(row=1, column=0, padx=5, pady=5, sticky="nw")
|
||||
for i in range(3):
|
||||
axis_data = Entry(ambient_field_results_frame,
|
||||
textvariable=self.ambient_field_result_vars[i],
|
||||
width=15,
|
||||
state='readonly')
|
||||
axis_data.grid(row=1, column=i+1, padx=5, pady=5, sticky="nw")
|
||||
ambient_field_results_unit = Label(ambient_field_results_frame, text="A")
|
||||
ambient_field_results_unit.grid(row=1, column=4, padx=5, pady=5, sticky="nw")
|
||||
# Ambient field value (microtesla)
|
||||
ambient_field_results_ut_label = Label(ambient_field_results_frame, text="Ambient Field:")
|
||||
ambient_field_results_ut_label.grid(row=2, column=0, padx=5, pady=5, sticky="nw")
|
||||
for i in range(3):
|
||||
axis_data = Entry(ambient_field_results_frame,
|
||||
textvariable=self.ambient_field_ut_result_vars[i],
|
||||
width=15,
|
||||
state='readonly')
|
||||
axis_data.grid(row=2, column=i + 1, padx=5, pady=5, sticky="nw")
|
||||
ambient_field_results_ut_unit = Label(ambient_field_results_frame, text="\u03BCT")
|
||||
ambient_field_results_ut_unit.grid(row=2, column=4, padx=5, pady=5, sticky="nw")
|
||||
# Residuals
|
||||
ambient_field_residual_label = Label(ambient_field_results_frame, text="Residual Field:")
|
||||
ambient_field_residual_label.grid(row=3, column=0, padx=5, pady=5, sticky="nw")
|
||||
for i in range(3):
|
||||
axis_data = Entry(ambient_field_results_frame,
|
||||
textvariable=self.ambient_field_residual_vars[i],
|
||||
width=15,
|
||||
state='readonly')
|
||||
axis_data.grid(row=3, column=i+1, padx=5, pady=5, sticky="nw")
|
||||
ambient_field_residual_unit = Label(ambient_field_results_frame, text="\u03BCT")
|
||||
ambient_field_residual_unit.grid(row=3, column=4, padx=5, pady=5, sticky="nw")
|
||||
|
||||
# This starts an endless polling loop
|
||||
self.update_view()
|
||||
|
||||
def page_switch(self):
|
||||
# every class in the UI needs this, even if it doesn't do anything
|
||||
pass
|
||||
|
||||
def update_view(self):
|
||||
# Get new connected status
|
||||
if g.MAGNETOMETER.connected:
|
||||
self.connected_state_var.set("connected")
|
||||
self.connected_state_label.configure(fg="green")
|
||||
else:
|
||||
self.connected_state_var.set("Not connected")
|
||||
self.connected_state_label.configure(fg="red")
|
||||
|
||||
# Get new field data
|
||||
new_field = g.MAGNETOMETER.field
|
||||
for i in range(3):
|
||||
# Display in uT
|
||||
self.field_value_vars[i].set("{:.3f}".format(new_field[i] * 1e6))
|
||||
|
||||
# Get mpi messages from calibration procedures
|
||||
try:
|
||||
while True:
|
||||
msg = self.view_mpi_queue.get(block=False)
|
||||
cmd = msg['cmd']
|
||||
arg = msg['arg']
|
||||
if cmd == 'finished':
|
||||
self.reactivate_buttons()
|
||||
elif cmd == 'failed':
|
||||
messagebox.showerror("Calibration error", "Error occured during calibration:\n{}".format(arg))
|
||||
self.reactivate_buttons()
|
||||
elif cmd == 'progress':
|
||||
self.calibration_procedure_progress_var.set(min(int(arg*100), 100))
|
||||
elif cmd == 'ambient_data':
|
||||
self.update_ambient_calibration_results(arg)
|
||||
else:
|
||||
ui_print("Error: Unexpected mpi command '{}' in CalibrationTool".format(cmd))
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
self.controller.after(500, self.update_view)
|
||||
|
||||
def reactivate_buttons(self):
|
||||
self.start_ambient_calibration_button.configure(text="Calibrate Ambient Field", state=NORMAL)
|
||||
self.start_k_calibration_button.configure(text="Calibrate Coil Constants", state=NORMAL)
|
||||
self.calibration_procedure_progress_var.set(0)
|
||||
|
||||
def deactivate_buttons(self):
|
||||
self.start_ambient_calibration_button.configure(state=DISABLED)
|
||||
self.start_k_calibration_button.configure(state=DISABLED)
|
||||
|
||||
def update_ambient_calibration_results(self, results):
|
||||
for i in range(3):
|
||||
self.ambient_field_result_vars[i].set("{:.3f}".format(results['ambient'][i]))
|
||||
self.ambient_field_ut_result_vars[i].set("{:.3f}".format(results['ambient_ut'][i]))
|
||||
self.ambient_field_residual_vars[i].set("{:.3f}".format(results['residual'][i] * 1e6))
|
||||
|
||||
def calibration_procedure_ambient(self):
|
||||
try:
|
||||
self.calibration_ambient_thread = AmbientFieldCalibration(self.view_mpi_queue)
|
||||
self.calibration_ambient_thread.start()
|
||||
self.start_ambient_calibration_button.configure(text="Running")
|
||||
self.deactivate_buttons()
|
||||
except DeviceAccessError as e:
|
||||
print("Error starting calibration procedure: {}".format(e))
|
||||
|
||||
def calibration_procedure_coil_constants(self):
|
||||
try:
|
||||
self.calibration_coil_constants_thread = AmbientFieldCalibration(self.view_mpi_queue)
|
||||
self.calibration_coil_constants_thread.start()
|
||||
self.start_k_calibration_button.configure(text="Running")
|
||||
self.deactivate_buttons()
|
||||
except DeviceAccessError as e:
|
||||
print("Error starting calibration procedure: {}".format(e))
|
||||
|
||||
|
||||
class HardwareConfiguration(Frame):
|
||||
"""Settings window to set program constants"""
|
||||
|
||||
@@ -1150,6 +1363,7 @@ class StatusDisplay(Frame):
|
||||
pass
|
||||
self.controller.after(200, self.update_label_poll_method)
|
||||
|
||||
|
||||
class OutputConsole(Frame):
|
||||
# console to print information to user in, similar to standard python output
|
||||
|
||||
|
||||
Reference in New Issue
Block a user