forked from zietzm/Helmholtz_Test_Bench
125 lines
4.9 KiB
Python
125 lines
4.9 KiB
Python
import time
|
|
import pandas
|
|
from threading import *
|
|
from tkinter import messagebox
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
import User_Interface as ui
|
|
import cage_func as func
|
|
import globals as g
|
|
|
|
|
|
class ExecCSVThread(Thread):
|
|
# ToDo: handling for disconnected devices
|
|
def __init__(self, threadID, array, parent, controller):
|
|
Thread.__init__(self)
|
|
|
|
self.threadID = threadID
|
|
self.array = array # numpy array containing data from csv to be executed
|
|
self.parent = parent # object from which this is called
|
|
self.controller = controller # object on which mainloop() is running, usually main window
|
|
|
|
def run(self):
|
|
ui.ui_print("Starting Sequence Execution...")
|
|
# g.threadLock.acquire() # Get lock to synchronize threads
|
|
# ToDo: add locking/synchronization? Works without so far but might be more robust
|
|
execute_sequence(self.array, 0.1, self.parent, self.controller) # run sequence
|
|
self.parent.running = False # sequence finished --> no longer running
|
|
# reset buttons on UI:
|
|
self.parent.select_file_button["state"] = "normal"
|
|
self.parent.execute_button["state"] = "normal"
|
|
self.parent.stop_button["state"] = "disabled"
|
|
self.parent.reinit_button["state"] = "normal"
|
|
|
|
|
|
def execute_sequence(array, delay, parent, controller): # runs through array containing times and desired field vectors
|
|
# array format: [time (s), xField (T), yField (T), zField (T)]
|
|
# decimal commas
|
|
# all times in seconds
|
|
func.power_down_all() # sets outputs to 0 before starting
|
|
t_zero = time.time() # set reference time for start of run
|
|
|
|
# Check if everything is properly connected:
|
|
all_connected = func.devices_ok(parent.xy_override, parent.z_override, parent.arduino_override)
|
|
# True or False depending on devices status, checks for some devices may be overridden by user
|
|
|
|
i = 0
|
|
while i < len(array) and g.running and all_connected:
|
|
# while array is not finished, user has not cancelled and devices are connected
|
|
|
|
t = time.time() - t_zero # get relative time
|
|
if t >= array[i, 0]: # time for this row has come
|
|
field_vec = array[i, 1:4] # extract desired field vector
|
|
ui.ui_print("%f s: t = %0.2f s, target field vector = "
|
|
% (time.time() - t_zero, array[i, 0]), field_vec * 1e6, "\u03BCT")
|
|
func.set_field(field_vec) # send field vector to test stand
|
|
ui.ui_print(time.time() - t_zero)
|
|
controller.StatusDisplay.update_labels() # update status display after change
|
|
i = i + 1 # next row
|
|
|
|
elif t >= array[i, 0] - delay - 0.02: # next change time is close, not enough time to sleep
|
|
pass
|
|
else: # sleep to give other threads time to run
|
|
time.sleep(delay)
|
|
|
|
# check again if everything is connected before starting next loop run:
|
|
all_connected = func.devices_ok(parent.xy_override, parent.z_override, parent.arduino_override)
|
|
|
|
if g.running and all_connected: # sequence ended without interruption
|
|
ui.ui_print("Sequence executed, powering down channels.")
|
|
elif all_connected: # interrupted by user
|
|
ui.ui_print("Sequence cancelled, powering down channels.")
|
|
elif g.running: # interrupted by device error
|
|
ui.ui_print("Error with at least one device, sequence aborted.")
|
|
messagebox.showinfo("Device Error!", "Error with at least one device, sequence aborted.")
|
|
func.power_down_all() # set currents and voltages to 0, set arduino pins to low
|
|
|
|
|
|
def read_csv_to_array(filepath):
|
|
# csv format: time (s); xField (T); yField (T); zField (T) (german excel)
|
|
# decimal commas
|
|
ui.ui_print("Reading File:", filepath)
|
|
file = pandas.read_csv(filepath, sep=';', decimal=',', header=0) # read csv file
|
|
array = file.to_numpy() # convert csv to array
|
|
return array
|
|
|
|
|
|
def check_array(array):
|
|
# ToDo: message formatting, pop up warning
|
|
# ToDo: comments
|
|
concerns = []
|
|
for row in array:
|
|
i = 1
|
|
for axis in g.AXES:
|
|
value = row[i]
|
|
if value > axis.max_comp_field[1]:
|
|
concerns.append(row)
|
|
elif value < axis.max_comp_field[0]:
|
|
concerns.append(row)
|
|
i += 1
|
|
ui.ui_print("Checked csv, found %i concerns." % len(concerns))
|
|
if len(concerns) > 0:
|
|
ui.ui_print(concerns)
|
|
|
|
|
|
def plot_field_sequence(array, width, height): # ToDo: comments
|
|
# ToDo: make pretty
|
|
fig_dpi = 100
|
|
px = 1/fig_dpi
|
|
figure = plt.Figure(figsize=(width*px, height*px), dpi=fig_dpi)
|
|
|
|
# noinspection PyTypeChecker,SpellCheckingInspection
|
|
axes = figure.subplots(3, sharex=True, sharey=True, gridspec_kw={'hspace': 0.4})
|
|
|
|
figure.suptitle("Magnetic Field Sequence")
|
|
t = array[:, 0]
|
|
|
|
for i in [0, 1, 2]:
|
|
data = array[:, i + 1] * 1e6
|
|
plot = axes[i]
|
|
plot.plot(t, data)
|
|
plot.set_title(g.AXIS_NAMES[i])
|
|
|
|
return figure
|