# tThis file contains code for executing a sequence of magnetic fields from a csv file. # To do this without crashing the UI it has to run in a separate thread using the threading module. import time import pandas from threading import * from tkinter import messagebox import matplotlib.pyplot as plt import User_Interface as ui import cage_func as func import globals as g class ExecCSVThread(Thread): # main class for executing a CSV sequence # it inherits the threading.Thread class, enabling sequence execution in a separate thread def __init__(self, array, parent, controller): Thread.__init__(self) self.array = array # numpy array containing data from csv to be executed self.parent = parent # object from which this class is called, here the ExecuteCSVMode object of the UI self.controller = controller # object on which mainloop() is running, usually the main UI window self.__stop_event = Event() # event which can be set to stop the thread execution if needed def run(self): # called to start the execution of the thread ui.ui_print("Starting Sequence Execution...") self.execute_sequence(self.array, 0.1, self.parent, self.controller) # run sequence # when the sequence has ended, reset buttons on the UI: if not g.exitFlag: # main window is open self.parent.select_file_button["state"] = "normal" self.parent.execute_button["state"] = "normal" self.parent.stop_button["state"] = "disabled" self.parent.reinit_button["state"] = "normal" def stop(self): # stop thread execution, can be called from another thread to kill this one self.__stop_event.set() def stopped(self): # returns true if the thread has been stopped, used to check if a run should continue return self.__stop_event.is_set() def execute_sequence(self, array, delay, parent, controller): # main execution method of the class # runs through array with times and desired fields and commands test stand accordingly # array format: [time (s), xField (T), yField (T), zField (T)] func.power_down_all() # sets outputs on PSUs to 0 and Arduino pins to LOW before starting t_zero = time.time() # set reference time for start of run # Check if everything is properly connected: all_connected = func.devices_ok(parent.xy_override.get(), parent.z_override.get(), parent.arduino_override.get()) # True or False depending on devices status, checks for some devices may be overridden by user i = 0 # index of the current array row while i < len(array) and all_connected and not self.stopped() and not g.exitFlag: # while array is not finished, devices are connected, user has not cancelled and application is running t = time.time() - t_zero # get time relative to start of run if t >= array[i, 0]: # time for this row has come g.threadLock.acquire() # execute all lines until threadLock.release() before going back to main thread # check if everything is still connected before sending commands: all_connected = func.devices_ok(parent.xy_override.get(), parent.z_override.get(), parent.arduino_override.get()) if all_connected: field_vec = array[i, 1:4] # extract desired field vector ui.ui_print("%0.5f s: t = %0.2f s, target field vector =" % (time.time() - t_zero, array[i, 0]), field_vec * 1e6, "\u03BCT") func.set_field(field_vec) # send field vector to test stand controller.StatusDisplay.update_labels() # update status display after change # log change to the log file if user has selected event logging in the Configure Logging window logger = controller.pages[ui.ConfigureLogging] # get object of logging configurator if logger.event_logging: # data should be logged when test stand is commanded logger.log_datapoint() # log data i = i + 1 # next row g.threadLock.release() # allow going back to main thread now elif t <= array[i, 0] - delay - 0.02: # is there enough time to sleep before the next row? time.sleep(delay) # sleep to give other threads time to run if not self.stopped() and not g.exitFlag and all_connected: # sequence ended without interruption ui.ui_print("Sequence executed, powering down channels.") elif all_connected: # interrupted by user ui.ui_print("Sequence cancelled, powering down channels.") elif not all_connected: # interrupted by device error ui.ui_print("Error with at least one device, sequence aborted.") messagebox.showwarning("Device Error!", "Error with at least one device, sequence aborted.") else: # if this happens there is a mistake in the logic above, it really should not # tell the user something weird happened: ui.ui_print("Encountered unexpected sequence end state:" "\nThread Stopped:", self.stopped(), ", Application Closed:", g.exitFlag, ", Devices connected:", all_connected) messagebox.showwarning("Unexpected state", "Encountered unexpected sequence end state, see console output for details.") func.power_down_all() # set currents and voltages to 0, set arduino pins to low def read_csv_to_array(filepath): # convert a given csv file to a numpy array # csv format: time (s); xField (T); yField (T); zField (T) (german excel) # decimal commas file = pandas.read_csv(filepath, sep=';', decimal=',', header=0) # read csv file without column headers array = file.to_numpy() # convert csv to array return array def check_array_ok(array): # check if any magnetic fields in an array exceed the test stand limits and if so display a warning message values_ok = True for i in [0, 1, 2]: # go through axes/columns max_val = g.AXES[i].max_comp_field[1] # get limits the test stand can do min_val = g.AXES[i].max_comp_field[0] data = array[:, i + 1] # extract data for this axis from array # noinspection PyTypeChecker if any(data > max_val) or any(data < min_val): # if any datapoint is out of bounds values_ok = False if not values_ok: # show warning pop-up if values are exceeding limits messagebox.showwarning("Value Limits Warning!", "Found field values exceeding limits of test stand." "\nSee plot and check values in csv.") def plot_field_sequence(array, width, height): # create plot of fixed size (pixels) from array # ToDo (optional): polar plots, plots of angle... # ToDo (optional): show graphs as steps (as performed by test stand) fig_dpi = 100 # set figure resolution (dots per inch) px = 1/fig_dpi # get pixel to inch size conversion figure = plt.Figure(figsize=(width*px, height*px), dpi=fig_dpi) # create figure with correct size # noinspection PyTypeChecker,SpellCheckingInspection axes = figure.subplots(3, sharex=True, sharey=True, gridspec_kw={'hspace': 0.4}) # create subplots with shared axes figure.suptitle("Magnetic Field Sequence") # set figure title t = array[:, 0] # extract time column for i in [0, 1, 2]: # go through all three axes data = array[:, i + 1] * 1e6 # extract field column of this axis and convert to microtesla max_val = g.AXES[i].max_comp_field[1] * 1e6 # get limits of achievable field min_val = g.AXES[i].max_comp_field[0] * 1e6 plot = axes[i] # get appropriate subplot plot.plot(t, data, linestyle='solid', marker='.') # plot data if any(data > max_val): # if any value is higher than the maximum plot.axhline(y=max_val, linestyle='dashed', color='r') # plot horizontal line to show maximum # add label to line: plot.text(t[-1], max_val, "max", horizontalalignment='center', verticalalignment='top', color='r') if any(data < min_val): # same as above plot.axhline(y=min_val, linestyle='dashed', color='r') plot.text(t[-1], min_val, "min", horizontalalignment='center', color='r') plot.set_title(g.AXIS_NAMES[i], size=10) # set subplot title (e.g. "X-Axis") # set shared axis labels: axes[2].set_xlabel("Time (s)") axes[1].set_ylabel("Magnetic Field (\u03BCT)") return figure # return the created figure to be inserted somewhere