Files
Helmholtz_Test_Bench/csv_threading.py
T

144 lines
6.9 KiB
Python

import time
import pandas
from threading import *
from tkinter import messagebox
import matplotlib.pyplot as plt
import User_Interface as ui
import cage_func as func
import globals as g
class ExecCSVThread(Thread):
def __init__(self, threadID, array, parent, controller):
Thread.__init__(self)
self.threadID = threadID
self.array = array # numpy array containing data from csv to be executed
self.parent = parent # object from which this is called
self.controller = controller # object on which mainloop() is running, usually main window
self.__stop_event = Event()
def run(self):
ui.ui_print("Starting Sequence Execution...")
# g.threadLock.acquire() # Get lock to synchronize threads
# ToDo: add locking/synchronization? Works without so far but might be more robust
self.execute_sequence(self.array, 0.1, self.parent, self.controller) # run sequence
# reset buttons on UI:
if not g.exitFlag: # program window is open
self.parent.select_file_button["state"] = "normal"
self.parent.execute_button["state"] = "normal"
self.parent.stop_button["state"] = "disabled"
self.parent.reinit_button["state"] = "normal"
def stop(self): # stop thread execution
self.__stop_event.set()
def stopped(self):
return self.__stop_event.is_set()
def execute_sequence(self, array, delay, parent, controller): # runs through array containing times and desired field vectors
# array format: [time (s), xField (T), yField (T), zField (T)]
# decimal commas
# all times in seconds
func.power_down_all() # sets outputs to 0 before starting
t_zero = time.time() # set reference time for start of run
# Check if everything is properly connected:
all_connected = func.devices_ok(parent.xy_override.get(), parent.z_override.get(), parent.arduino_override.get())
# True or False depending on devices status, checks for some devices may be overridden by user
i = 0
while i < len(array) and all_connected and not self.stopped() and not g.exitFlag:
# while array is not finished, devices are connected, user has not cancelled and application is running
t = time.time() - t_zero # get relative time
if t >= array[i, 0]: # time for this row has come
# g.threadLock.acquire(0)
field_vec = array[i, 1:4] # extract desired field vector
ui.ui_print("%f s: t = %0.2f s, target field vector = "
% (time.time() - t_zero, array[i, 0]), field_vec * 1e6, "\u03BCT")
func.set_field(field_vec) # send field vector to test stand
ui.ui_print(time.time() - t_zero)
controller.StatusDisplay.update_labels() # update status display after change
i = i + 1 # next row
# g.threadLock.release()
elif t >= array[i, 0] - delay - 0.02: # next change time is close, not enough time to sleep
pass
else: # sleep to give other threads time to run
time.sleep(delay)
# check again if everything is connected before starting next loop run:
all_connected = func.devices_ok(parent.xy_override.get(), parent.z_override.get(), parent.arduino_override.get())
if not self.stopped() and not g.exitFlag and all_connected: # sequence ended without interruption
ui.ui_print("Sequence executed, powering down channels.")
elif all_connected: # interrupted by user
ui.ui_print("Sequence cancelled, powering down channels.")
elif not self.stopped() and not g.exitFlag: # interrupted by device error
ui.ui_print("Error with at least one device, sequence aborted.")
messagebox.showinfo("Device Error!", "Error with at least one device, sequence aborted.")
func.power_down_all() # set currents and voltages to 0, set arduino pins to low
def read_csv_to_array(filepath):
# csv format: time (s); xField (T); yField (T); zField (T) (german excel)
# decimal commas
file = pandas.read_csv(filepath, sep=';', decimal=',', header=0) # read csv file
array = file.to_numpy() # convert csv to array
return array
def check_array_ok(array): # check if any magnetic fields in an array exceed the limits
values_ok = True
for i in [0, 1, 2]: # go through axes
max_val = g.AXES[i].max_comp_field[1] # get limits
min_val = g.AXES[i].max_comp_field[0]
data = array[:, i + 1] # extract data for this axis from array
# noinspection PyTypeChecker
if any(data > max_val) or any(data < min_val): # if any datapoint is out of bounds
values_ok = False
if not values_ok: # show warning pop-up if values are exceeding limits
messagebox.showwarning("Value Limits Warning!", "Found field values exceeding limits of test stand."
"\nSee plot and check values in csv.")
def plot_field_sequence(array, width, height): # create plot of fixed size from array
# ToDo (optional): polar plots, plots of angle...
# ToDo (optional): show graphs as steps (as performed by test stand)
fig_dpi = 100 # set figure resolution
px = 1/fig_dpi # get pixel to inch size conversion
figure = plt.Figure(figsize=(width*px, height*px), dpi=fig_dpi) # create figure with correct size
# noinspection PyTypeChecker,SpellCheckingInspection
axes = figure.subplots(3, sharex=True, sharey=True, gridspec_kw={'hspace': 0.4}) # create subplots with shared axes
figure.suptitle("Magnetic Field Sequence")
t = array[:, 0] # extract time column
for i in [0, 1, 2]: # go through all three axes
data = array[:, i + 1] * 1e6 # extract field column of this axis
max_val = g.AXES[i].max_comp_field[1] * 1e6 # get limits of achievable field
min_val = g.AXES[i].max_comp_field[0] * 1e6
plot = axes[i] # get appropriate subplot
plot.plot(t, data, linestyle='solid', marker='.') # plot data
if any(data > max_val): # if any value is higher than the maximum
plot.axhline(y=max_val, linestyle='dashed', color='r') # plot horizontal line to show maximum
# add label to line:
plot.text(t[-1], max_val, "max", horizontalalignment='center', verticalalignment='top', color='r')
if any(data < min_val): # same as above
plot.axhline(y=min_val, linestyle='dashed', color='r')
plot.text(t[-1], min_val, "min", horizontalalignment='center', color='r')
plot.set_title(g.AXIS_NAMES[i], size=10) # set subplot title (e.g. "X-Axis")
# set shared axis labels:
axes[2].set_xlabel("Time (s)")
axes[1].set_ylabel("Magnetic Field (\u03BCT)")
return figure # return the created figure to be inserted somewhere else