from pyps2000b import PS2000B from Arduino import Arduino import globals as g import pandas import time import numpy as np import serial import traceback # ToDo: remove from tkinter import * from configparser import ConfigParser class Axis: def __init__(self, index, device, PSU_channel, arduino_pin): # static information self.index = index self.device = device # power supply object (PS2000B class) self.channel = PSU_channel # power supply unit channel (1 or 2) self.ardPin = arduino_pin # output pin on the arduino for switching polarity on this axis self.name = g.AXIS_NAMES[index] self.port = g.PORTS[index] self.resistance = float(read_config(self.name, "resistance")) self.max_watts = float(read_config(self.name, "max_watts")) self.max_amps = np.sqrt(self.max_watts / self.resistance) self.max_volts = float(read_config(self.name, "max_volts")) self.coil_constant = float(read_config(self.name, "coil_const")) self.ambient_field = float(read_config(self.name, "ambient_field")) max_field = self.max_amps * self.coil_constant # max field reachable in this axis self.max_field = np.array([-max_field, max_field]) self.max_comp_field = np.array([self.ambient_field - max_field, self.ambient_field + max_field]) # dynamic information self.connected = "Not Connected" self.output_active = "Unknown" # power output on the PSU enabled? self.remote_ctrl_active = "Unknown" # remote control on the PSU enabled? self.voltage_setpoint = 0 # target voltage on PSU [V] self.voltage = 0 # actual voltage on PSU [V] self.current_setpoint = 0 # target current on PSU [A] self.current = 0 # actual current on PSU [A] self.polarity_switched = "Unknown" # polarity switched on the Arduino? self.target_field_comp = 0 # field to be created by coil pair (this is sent to the coils) [T] self.target_field = 0 # field that should occur in measurement area (ambient still needs to be compensated) [T] self.target_current = 0 # signed current that should pass through coil pair [A] if self.device is not None: self.update_status_info() def update_status_info(self): # Read out the values of the parameters stored in this class and update them try: self.device.update_device_information(self.channel) device_status = self.device.get_device_status_information(self.channel) if device_status.output_active: self.output_active = "Active" else: self.output_active = "Inactive" if device_status.remote_control_active: self.remote_ctrl_active = "Active" else: self.remote_ctrl_active = "Inactive" self.voltage = self.device.get_voltage(self.channel) self.voltage_setpoint = self.device.get_voltage_setpoint(self.channel) self.current = self.device.get_current(self.channel) self.current_setpoint = self.device.get_current_setpoint(self.channel) except (serial.serialutil.SerialException, IndexError): # ui_print("Connection Error with %s PSU on %s" % (self.name, self.port)) self.connected = "Connection Error" self.output_active = "Unknown" self.remote_ctrl_active = "Unknown" else: self.connected = "Connected" def print_status(self): # axis = axis control variable, stored in globals.py ui_print("%s, %0.2f V, %0.2f A" % (self.device.get_device_status_information(self.channel), self.device.get_voltage(self.channel), self.device.get_current(self.channel))) def power_down(self): # temporary powerdown, set outputs to 0 but keep connections enabled try: self.target_current = 0 self.target_field = 0 self.target_field_comp = 0 if self.device is not None: self.device.set_voltage(0, self.channel) self.device.set_current(0, self.channel) self.device.disable_output(self.channel) g.ARDUINO.digitalWrite(self.ardPin, "LOW") except Exception as e: ui_print(e) # ToDo: more error handling here def set_signed_current(self, value): # sets current with correct polarity on this axis device = self.device channel = self.channel ardPin = self.ardPin # ui_print("Attempting to set current", value, "A") self.target_current = value if self.connected == "Connected" or True: # ToDo!: remove True, only for arduino testing! if abs(value) > self.max_amps: # prevent excessive currents self.power_down() # set output to 0 and deactivate raise ValueError("Invalid current value. Tried %0.2fA, max. %0.2fA allowed" % (value, self.max_amps)) elif value >= 0: # switch polarity as needed g.ARDUINO.digitalWrite(ardPin, "LOW") # ToDo: reactivate and tie to arduino elif value < 0: g.ARDUINO.digitalWrite(ardPin, "HIGH") # ToDo: tie to arduino else: raise Exception("This should be impossible.") maxVoltage = min(max(1.1 * self.max_amps * self.resistance, 8), self.max_volts) # limit voltage# # ui_print("sending values to device: U =", maxVoltage, "I =", abs(value)) if self.connected == "Connected": # ToDo!: remove if, only for arduino testing! device.set_current(abs(value), channel) device.set_voltage(maxVoltage, channel) device.enable_output(channel) else: ui_print(self.name, "not connected, can't set current.") def set_field_simple(self, value): # forms magnetic field as specified by value, w/o cancelling ambient field self.target_field = value self.target_field_comp = value current = value / self.coil_constant self.set_signed_current(current) def set_field(self, value): # forms magnetic field as specified by value, corrected for ambient field self.target_field = value field = value - self.ambient_field self.target_field_comp = field current = field / self.coil_constant self.set_signed_current(current) class ArduinoCtrl(Arduino): def __init__(self, pins): self.connected = "Unknown" self.pins = pins ui_print("\nConnecting to Arduino...") try: Arduino.__init__(self) # search for connected arduino and connect for pin in self.pins: self.pinMode(pin, "Output") self.digitalWrite(pin, "LOW") except Exception as e: ui_print("Connection to Arduino failed.", e) self.connected = "Not Connected" else: self.connected = "Connected" ui_print("Arduino ready.") def update_status_info(self): if self.connected == "Connected": try: for axis in g.AXES: if g.ARDUINO.digitalRead(axis.ardPin): # ToDo: Test if this actually works axis.polarity_switched = "True" else: axis.polarity_switched = "False" except Exception as e: ui_print("Error with Arduino:", e) for axis in g.AXES: axis.polarity_switched = "Unknown" self.connected = "Connection Error" else: g.ARDUINO.connected = "Connected" def safe(self): # sets output pins to low and closes serial connection for pin in self.pins: self.digitalWrite(pin, "LOW") def read_config(section, key): # read specific value from config file # ToDo (optional) better error handling config_object = ConfigParser() try: config_object.read(g.CONFIG_FILE) section_obj = config_object[section] return section_obj[key] except KeyError as e: ui_print("Error while reading config file:", e) raise KeyError("Could not find key", key, "in config file.") def edit_config(section, key, value): # edit specific value in config file config_object = ConfigParser() try: config_object.read(g.CONFIG_FILE) section_obj = config_object[section] section_obj[key] = str(value) with open(g.CONFIG_FILE, 'w') as conf: # Write changes back to file config_object.write(conf) except KeyError as e: ui_print("Error while editing config file:", e) raise KeyError("Could not find key", key, "in config file.") def create_default_config(file): # create config file from default values (stored in globals.py) config = ConfigParser() i = 0 for axis_name in g.AXIS_NAMES: config.add_section(axis_name) for key in g.default_arrays.keys(): config.set(axis_name, key, str(g.default_arrays[key][i])) i += 1 config.add_section("PORTS") for key in g.defaults.keys(): config.set("PORTS", key, str(g.defaults[key])) with open(file, 'w') as conf: config.write(conf) def ui_print(*content): # prints text to built in console output = "" for text in content: output = " ".join((output, str(text))) # append content if g.app is not None: output = "".join(("\n", output)) # begin new line each time g.app.OutputConsole.console.insert(END, output) # print to console g.app.OutputConsole.console.see(END) # scroll to bottom else: # if window is not open, do normal print print(output) def setup_axes(): # creates device objects for all PSUs and sets their values # Connect to Arduino: arduino_pins = read_config("PORTS", "relay_pins") try: if g.ARDUINO is not None: # ui_print("\nClosing arduino link") try: g.ARDUINO.close() # close serial link before attempting reconnection except serial.serialutil.SerialException: pass # serial.flush() in Arduino.close() fails when reconnecting # this ignores it and allows serial.close() to execute (I think) except AttributeError: pass # when no Arduino is connected but g.ARDUINO has been initialized then there is nothing to close # this throws exception, which can be ignored g.ARDUINO = ArduinoCtrl(arduino_pins) except Exception as e: ui_print("Arduino setup failed:", e) ui_print(traceback.print_exc()) g.AXES = [] g.XY_PORT = read_config("PORTS", "xy_port") g.Z_PORT = read_config("PORTS", "z_port") g.PORTS = [g.XY_PORT, g.XY_PORT, g.Z_PORT] ui_print("\nConnecting to XY Device on %s..." % g.XY_PORT) try: if g.XY_DEVICE is not None: ui_print("closing serial connection on XY device") g.XY_DEVICE.serial.close() g.XY_DEVICE = None g.XY_DEVICE = PS2000B.PS2000B(g.XY_PORT) # setup PSU ui_print("Connection established.") g.X_AXIS = Axis(0, g.XY_DEVICE, 0, g.ARDUINO.pins[0]) # create axis objects g.Y_AXIS = Axis(1, g.XY_DEVICE, 1, g.ARDUINO.pins[1]) except serial.serialutil.SerialException: g.X_AXIS = Axis(0, None, 0, g.ARDUINO.pins[0]) # create axis objects g.Y_AXIS = Axis(1, None, 1, g.ARDUINO.pins[1]) ui_print("XY Device not connected or incorrect port set.") ui_print("Connecting to Z Device on %s..." % g.XY_PORT) try: g.Z_DEVICE = PS2000B.PS2000B(g.Z_PORT) ui_print("Connection established.") g.Z_AXIS = Axis(2, g.Z_DEVICE, 0, g.ARDUINO.pins[2]) except serial.serialutil.SerialException: g.Z_AXIS = Axis(2, None, 0, g.ARDUINO.pins[2]) ui_print("Z Device not connected or incorrect port set.") g.AXES.append(g.X_AXIS) g.AXES.append(g.Y_AXIS) g.AXES.append(g.Z_AXIS) ui_print("") # new line def activate_all(): # enables remote control and output on all PSUs and channels g.XY_DEVICE.enable_all() g.Z_DEVICE.enable_all() def deactivate_all(): # disables remote control and output on all PSUs and channels # ToDo: add check if device is connected try: g.XY_DEVICE.disable_all() except BaseException: ui_print("XY PSU deactivation unsuccessful.") else: ui_print("XY PSU deactivated.") try: g.Z_DEVICE.disable_all() except BaseException: ui_print("Z PSU deactivation unsuccessful.") else: ui_print("Z PSU deactivated.") def print_status_3(): ui_print("X-Axis:") g.X_AXIS.print_status() ui_print("Y-Axis:") g.Y_AXIS.print_status() ui_print("Z-Axis:") g.Z_AXIS.print_status() def set_to_zero(device): # sets voltages and currents to 0 device.voltage1 = 0 device.current1 = 0 device.voltage2 = 0 device.current2 = 0 def power_down_all(): # temporary, set all outputs to 0 but keep connections enabled set_to_zero(g.XY_DEVICE) set_to_zero(g.Z_DEVICE) g.ARDUINO.safe() def shut_down_all(): # shutdown at program end or on error, set outputs to 0 and disable connections # ToDo: better messages, check if things are connected first ui_print("\nAttempting to safely shut down all devices. Check equipment to confirm.") try: set_to_zero(g.XY_DEVICE) except: ui_print("XY PSU set to 0 unsuccessful.") else: ui_print("XY PSU currents and voltages set to 0.") try: set_to_zero(g.Z_DEVICE) except: ui_print("Z PSU set to 0 unsuccessful.") else: ui_print("Z PSU currents and voltages set to 0.") deactivate_all() try: g.ARDUINO.safe() except: ui_print("Arduino safing unsuccessful.") # else: # commented out bc this throws no exception, even when arduino is not connected # ui_print("Arduino pins set to LOW.") # ToDo: figure out error handling for this try: g.ARDUINO.close() except: ui_print("Closing Arduino connection failed.") else: ui_print("Serial connection to Arduino closed.") def set_field_simple(vector): # forms magnetic field as specified by vector, w/o cancelling ambient field for i in [0, 1, 2]: g.AXES[i].set_field_simple(vector[i]) def set_field(vector): # forms magnetic field as specified by vector, corrected for ambient field for i in [0, 1, 2]: g.AXES[i].set_field(vector[i]) def set_current_vec(vector): # sets needed currents on each axis for given vector i = 0 for axis in g.AXES: axis.set_signed_current(vector[i]) i = i + 1 def execute_csv(filepath, printing=0): # runs through csv file containing times and desired field vectors # csv format: time (s); xField (T); yField (T); zField (T) # decimal commas ui_print("Reading File:", filepath) file = pandas.read_csv(filepath, sep=';', decimal=',', header=0) # read csv file array = file.to_numpy() # convert csv to array t_zero = time.time() t_ref = t_zero i = 0 ui_print("Starting Execution...") while i < len(array): t = time.time() - t_zero if t >= array[i, 0]: field_vec = array[i, 1:4] ui_print("t = %0.2f s, target field vector = " % (array[i, 0]), field_vec) set_field(field_vec) i = i + 1 if t - t_ref >= 1 and printing == 1: # print status every second print_status_3() t_ref = t ui_print("File executed, powering down channels.") power_down_all() # set currents and voltages to 0, set arduino pins to low