forked from zietzm/Helmholtz_Test_Bench
198 lines
10 KiB
Python
198 lines
10 KiB
Python
# This file contains code for executing a sequence of magnetic fields from a csv file.
|
|
# To do this without crashing the UI it has to run in a separate thread using the threading module.
|
|
|
|
import time
|
|
from io import StringIO
|
|
|
|
import pandas
|
|
import numpy as np
|
|
from threading import *
|
|
from tkinter import messagebox
|
|
import matplotlib.pyplot as plt
|
|
|
|
from src.exceptions import DeviceBusy, DeviceAccessError
|
|
from src.utility import ui_print
|
|
import src.user_interface as ui
|
|
import src.globals as g
|
|
|
|
|
|
class ExecCSVThread(Thread):
|
|
# main class for executing a CSV sequence
|
|
# it inherits the threading.Thread class, enabling sequence execution in a separate thread
|
|
|
|
def __init__(self, array, parent, controller):
|
|
Thread.__init__(self)
|
|
|
|
self.array = array # numpy array containing data from csv to be executed
|
|
self.parent = parent # object from which this class is called, here the ExecuteCSVMode object of the UI
|
|
self.controller = controller # object on which mainloop() is running, usually the main UI window
|
|
|
|
# Acquire cage device. This resource will only be released after the thread is ended.
|
|
try:
|
|
self.cage_dev = g.CAGE_DEVICE.request_proxy()
|
|
except DeviceBusy:
|
|
raise DeviceAccessError("Failed to acquire coil control. Required for ambient field calibration.")
|
|
|
|
self._stop_event = Event() # event which can be set to stop the thread execution if needed
|
|
|
|
def run(self): # called to start the execution of the thread
|
|
ui_print("\nStarting Sequence Execution...")
|
|
try:
|
|
self.execute_sequence(self.array, 0.1, self.parent, self.controller) # run sequence
|
|
finally:
|
|
self.cage_dev.idle() # set currents and voltages to 0, set arduino pins to low
|
|
# Release the proxy so other components can use it
|
|
self.cage_dev.close()
|
|
|
|
# when the sequence has ended, reset buttons on the UI:
|
|
if not g.exit_flag: # main window is open
|
|
self.parent.select_file_button["state"] = "normal"
|
|
self.parent.execute_button["state"] = "normal"
|
|
self.parent.stop_button["state"] = "disabled"
|
|
self.parent.reinit_button["state"] = "normal"
|
|
|
|
# setup ability to interrupt thread (https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread)
|
|
def stop(self): # stop thread execution, can be called from another thread to kill this one
|
|
self._stop_event.set()
|
|
|
|
@property
|
|
def stopped(self): # returns true if the thread has been stopped, used to check if a run should continue
|
|
return self._stop_event.is_set()
|
|
|
|
def execute_sequence(self, array, delay, parent, controller):
|
|
# main execution method of the class
|
|
# runs through array with times and desired fields and commands test bench accordingly
|
|
|
|
# array format: [time (s), xField (T), yField (T), zField (T)]
|
|
self.cage_dev.idle() # sets outputs on PSUs to 0 and Arduino pins to LOW before starting
|
|
t_zero = time.time() # set reference time for start of run
|
|
|
|
# Check if everything is properly connected:
|
|
all_connected = (parent.xy_override.get() or g.CAGE_DEVICE.psu1 is not None) and\
|
|
(parent.z_override.get() or g.CAGE_DEVICE.psu2 is not None) and\
|
|
(parent.arduino_override.get() or g.CAGE_DEVICE.arduino is not None)
|
|
# True or False depending on devices status, checks for some devices may be overridden by user
|
|
if not all_connected:
|
|
ui_print("Required devices are not present, sequence aborted.")
|
|
messagebox.showwarning("Device Error!", "Required devices are not present, sequence aborted.")
|
|
return
|
|
|
|
i = 0 # index of the current array row
|
|
while i < len(array):
|
|
if self.stopped or g.exit_flag:
|
|
# Interrupt sequence
|
|
ui_print("Sequence interrupted, powering down channels.")
|
|
# Channels powered down in run function
|
|
return
|
|
|
|
# while array is not finished, devices are connected, user has not cancelled and application is running
|
|
|
|
t = time.time() - t_zero # get time relative to start of run
|
|
target_t = array[i, 0] # Target execution time of data point
|
|
if t >= target_t: # time for this row has come
|
|
field_vec = array[i, 1:4] # extract desired field vector
|
|
ui_print("[{:5.3f}s] B=[{:.1f}, {:.1f}, {:.1f}]\u03BCT for t={:.2f}s".format(t,
|
|
field_vec[0] * 1e6,
|
|
field_vec[1] * 1e6,
|
|
field_vec[2] * 1e6,
|
|
target_t))
|
|
self.cage_dev.set_field_compensated(field_vec) # send field vector to test bench
|
|
|
|
# log change to the log file if user has selected event logging in the Configure Logging window
|
|
logger = controller.pages[ui.ConfigureLogging] # get object of logging configurator
|
|
if logger.event_logging: # data should be logged when test bench is commanded
|
|
logger.log_datapoint() # log data
|
|
|
|
i = i + 1 # next row
|
|
|
|
elif t <= target_t - delay - 0.02: # is there enough time to sleep before the next row?
|
|
time.sleep(delay) # sleep to give other threads time to run
|
|
|
|
ui_print("Sequence executed, powering down channels.")
|
|
|
|
|
|
def read_csv_to_array(filepath): # convert a given csv file to a numpy array
|
|
# csv format: time (s); xField (T); yField (T); zField (T) (german excel)
|
|
# decimal or period commas. Do not use these characters as a thousands seperator!
|
|
with open(filepath, 'r') as csv_file:
|
|
# Normalize seperators
|
|
csv_string = csv_file.read().replace(',', '.')
|
|
# read csv file without column headers
|
|
file = pandas.read_csv(StringIO(csv_string), sep=';', decimal='.', header=0)
|
|
array = file.to_numpy() # convert csv to array
|
|
return array
|
|
|
|
|
|
def check_array_ok(array):
|
|
"""Checks if values are within limits, and if not shows a warning message."""
|
|
# check if any magnetic fields in an array exceed the test bench limits
|
|
warnings = []
|
|
for i in [0, 1, 2]: # go through axes/columns
|
|
# get limits the test bench can do
|
|
min_val, max_val = g.CAGE_DEVICE.axes[i].max_comp_field
|
|
for row_idx in range(array.shape[0]):
|
|
data_point = array[row_idx, i + 1] # extract data for this axis from array
|
|
if data_point > max_val or data_point < min_val:
|
|
# Out of bounds
|
|
warnings.append({'row': row_idx+1, 'axis': g.AXIS_NAMES[i]})
|
|
|
|
# show warning pop-up if values are exceeding limits
|
|
nr_warnings = len(warnings)
|
|
if nr_warnings > 0:
|
|
warning_msg = "Found field values exceeding limits of test bench.\n"
|
|
# Only print the first three warnings
|
|
for i in range(min(nr_warnings, 3)):
|
|
warning_msg += "[Line {}] {} exceeds limits.\n".format(warnings[i]['row'], warnings[i]['axis'])
|
|
if nr_warnings > 3:
|
|
warning_msg += "And {} more...".format(nr_warnings - 3)
|
|
# Show all warnings collectively
|
|
messagebox.showwarning("Value Limits Warning!", warning_msg)
|
|
|
|
|
|
def plot_field_sequence(array, width, height): # create plot of fixed size (pixels) from array
|
|
# ToDo (optional): polar plots, plots of angle...
|
|
fig_dpi = 100 # set figure resolution (dots per inch)
|
|
px = 1/fig_dpi # get pixel to inch size conversion
|
|
figure = plt.Figure(figsize=(width*px, height*px), dpi=fig_dpi) # create figure with correct size
|
|
|
|
# noinspection PyTypeChecker,SpellCheckingInspection
|
|
axes = figure.subplots(3, sharex=True, sharey=True, gridspec_kw={'hspace': 0.4}) # create subplots with shared axes
|
|
|
|
figure.suptitle("Magnetic Field Sequence") # set figure title
|
|
|
|
# modify data to show instantaneous jumps in field to reflect test bench operation
|
|
new_array = np.array([[0, 0, 0, 0]], dtype=float) # initialize modified array, zeros to show start from no fields
|
|
|
|
last_vals = [0, 0, 0] # [x,y,z] field values from last data point (zero here), used to create step in data
|
|
for row in array[:, 0:4]: # go through each row in the original array
|
|
# create extra datapoint at current timestamp, with field values from last to create a "step" in the plot:
|
|
new_array = np.append(new_array, [[row[0], *last_vals]], axis=0)
|
|
new_array = np.append(new_array, [row], axis=0) # add actual datapoint for current timestamp
|
|
last_vals = row[1:4] # save values from current timestamp for next
|
|
new_array = np.append(new_array, [[new_array[-1, 0], 0, 0, 0]], axis=0) # append last datapoint with 0 fields
|
|
|
|
# extract data and plot:
|
|
t = new_array[:, 0] # extract time column
|
|
for i in [0, 1, 2]: # go through all three axes
|
|
data = new_array[:, i + 1] * 1e6 # extract field column of this axis and convert to microtesla
|
|
min_val, max_val = g.CAGE_DEVICE.axes[i].max_comp_field * 1e6 # get limits of achievable field
|
|
plot = axes[i] # get appropriate subplot
|
|
|
|
plot.plot(t, data, linestyle='solid', marker='.') # plot data
|
|
|
|
if any(data > max_val): # if any value is higher than the maximum
|
|
plot.axhline(y=max_val, linestyle='dashed', color='r') # plot horizontal line to show maximum
|
|
# add label to line:
|
|
plot.text(t[-1], max_val, "max", horizontalalignment='center', verticalalignment='top', color='r')
|
|
if any(data < min_val): # same as above
|
|
plot.axhline(y=min_val, linestyle='dashed', color='r')
|
|
plot.text(t[-1], min_val, "min", horizontalalignment='center', color='r')
|
|
|
|
plot.set_title(g.AXIS_NAMES[i], size=10) # set subplot title (e.g. "X-Axis")
|
|
|
|
# set shared axis labels:
|
|
axes[2].set_xlabel("Time (s)")
|
|
axes[1].set_ylabel("Magnetic Field (\u03BCT)")
|
|
|
|
return figure # return the created figure to be inserted somewhere
|