import time import pandas from threading import * from tkinter import messagebox import matplotlib.pyplot as plt import User_Interface as ui import cage_func as func import globals as g class ExecCSVThread(Thread): # ToDo: handling for disconnected devices def __init__(self, threadID, array, parent, controller): Thread.__init__(self) self.threadID = threadID self.array = array # numpy array containing data from csv to be executed self.parent = parent # object from which this is called self.controller = controller # object on which mainloop() is running, usually main window def run(self): ui.ui_print("Starting Sequence Execution...") # g.threadLock.acquire() # Get lock to synchronize threads # ToDo: add locking/synchronization? Works without so far but might be more robust execute_sequence(self.array, 0.1, self.parent, self.controller) # run sequence self.parent.running = False # sequence finished --> no longer running # reset buttons on UI: self.parent.select_file_button["state"] = "normal" self.parent.execute_button["state"] = "normal" self.parent.stop_button["state"] = "disabled" self.parent.reinit_button["state"] = "normal" def execute_sequence(array, delay, parent, controller): # runs through array containing times and desired field vectors # array format: [time (s), xField (T), yField (T), zField (T)] # decimal commas # all times in seconds func.power_down_all() # sets outputs to 0 before starting t_zero = time.time() # set reference time for start of run # Check if everything is properly connected: all_connected = func.devices_ok(parent.xy_override, parent.z_override, parent.arduino_override) # True or False depending on devices status, checks for some devices may be overridden by user i = 0 while i < len(array) and g.running and all_connected: # while array is not finished, user has not cancelled and devices are connected t = time.time() - t_zero # get relative time if t >= array[i, 0]: # time for this row has come field_vec = array[i, 1:4] # extract desired field vector ui.ui_print("%f s: t = %0.2f s, target field vector = " % (time.time() - t_zero, array[i, 0]), field_vec * 1e6, "\u03BCT") func.set_field(field_vec) # send field vector to test stand ui.ui_print(time.time() - t_zero) controller.StatusDisplay.update_labels() # update status display after change i = i + 1 # next row elif t >= array[i, 0] - delay - 0.02: # next change time is close, not enough time to sleep pass else: # sleep to give other threads time to run time.sleep(delay) # check again if everything is connected before starting next loop run: all_connected = func.devices_ok(parent.xy_override, parent.z_override, parent.arduino_override) if g.running and all_connected: # sequence ended without interruption ui.ui_print("Sequence executed, powering down channels.") elif all_connected: # interrupted by user ui.ui_print("Sequence cancelled, powering down channels.") elif g.running: # interrupted by device error ui.ui_print("Error with at least one device, sequence aborted.") messagebox.showinfo("Device Error!", "Error with at least one device, sequence aborted.") func.power_down_all() # set currents and voltages to 0, set arduino pins to low def read_csv_to_array(filepath): # csv format: time (s); xField (T); yField (T); zField (T) (german excel) # decimal commas ui.ui_print("Reading File:", filepath) file = pandas.read_csv(filepath, sep=';', decimal=',', header=0) # read csv file array = file.to_numpy() # convert csv to array return array def check_array(array): # ToDo: message formatting, pop up warning # ToDo: comments concerns = [] for row in array: i = 1 for axis in g.AXES: value = row[i] if value > axis.max_comp_field[1]: concerns.append(row) elif value < axis.max_comp_field[0]: concerns.append(row) i += 1 ui.ui_print("Checked csv, found %i concerns." % len(concerns)) if len(concerns) > 0: ui.ui_print(concerns) def plot_field_sequence(array): # ToDo: comments # ToDo: make pretty figure = plt.Figure(figsize=(8, 10), dpi=100) # noinspection PyTypeChecker,SpellCheckingInspection axes = figure.subplots(3, sharex=True, sharey=True, gridspec_kw={'hspace': 0.4}) figure.suptitle("Magnetic Field Sequence") t = array[:, 0] for i in [0, 1, 2]: data = array[:, i + 1] * 1e6 plot = axes[i] plot.plot(t, data) plot.set_title(g.AXIS_NAMES[i]) return figure