from pyps2000b import PS2000B from Arduino import Arduino import globals as g import pandas import time import numpy as np import serial import traceback # noinspection PyPep8Naming import User_Interface as ui from tkinter import * from tkinter import messagebox from configparser import ConfigParser class Axis: def __init__(self, index, device, PSU_channel, arduino_pin): # static information self.index = index self.device = device # power supply object (PS2000B class) self.channel = PSU_channel # power supply unit channel (1 or 2) self.ardPin = arduino_pin # output pin on the arduino for switching polarity on this axis self.name = g.AXIS_NAMES[index] self.port = g.PORTS[index] self.resistance = float(read_from_config(self.name, "resistance", g.CONFIG_OBJECT)) self.max_watts = float(read_from_config(self.name, "max_watts", g.CONFIG_OBJECT)) self.max_amps = np.sqrt(self.max_watts / self.resistance) self.max_volts = float(read_from_config(self.name, "max_volts", g.CONFIG_OBJECT)) self.coil_constant = float(read_from_config(self.name, "coil_const", g.CONFIG_OBJECT)) self.ambient_field = float(read_from_config(self.name, "ambient_field", g.CONFIG_OBJECT)) max_field = self.max_amps * self.coil_constant # max field reachable in this axis self.max_field = np.array([-max_field, max_field]) self.max_comp_field = np.array([self.ambient_field - max_field, self.ambient_field + max_field]) # dynamic information self.connected = "Not Connected" self.output_active = "Unknown" # power output on the PSU enabled? self.remote_ctrl_active = "Unknown" # remote control on the PSU enabled? self.voltage_setpoint = 0 # target voltage on PSU [V] self.voltage = 0 # actual voltage on PSU [V] self.current_setpoint = 0 # target current on PSU [A] self.current = 0 # actual current on PSU [A] self.polarity_switched = "Unknown" # polarity switched on the Arduino? self.target_field_comp = 0 # field to be created by coil pair (this is sent to the coils) [T] self.target_field = 0 # field that should occur in measurement area (ambient still needs to be compensated) [T] self.target_current = 0 # signed current that should pass through coil pair [A] if self.device is not None: self.update_status_info() def update_status_info(self): # Read out the values of the parameters stored in this class and update them try: self.device.update_device_information(self.channel) device_status = self.device.get_device_status_information(self.channel) if device_status.output_active: self.output_active = "Active" else: self.output_active = "Inactive" if device_status.remote_control_active: self.remote_ctrl_active = "Active" else: self.remote_ctrl_active = "Inactive" self.voltage = self.device.get_voltage(self.channel) self.voltage_setpoint = self.device.get_voltage_setpoint(self.channel) self.current = self.device.get_current(self.channel) self.current_setpoint = self.device.get_current_setpoint(self.channel) except (serial.serialutil.SerialException, IndexError): # ui_print("Connection Error with %s PSU on %s" % (self.name, self.port)) self.connected = "Connection Error" self.output_active = "Unknown" self.remote_ctrl_active = "Unknown" else: self.connected = "Connected" def print_status(self): # axis = axis control variable, stored in globals.py ui_print("%s, %0.2f V, %0.2f A" % (self.device.get_device_status_information(self.channel), self.device.get_voltage(self.channel), self.device.get_current(self.channel))) def power_down(self): # temporary powerdown, set outputs to 0 but keep connections enabled try: self.target_current = 0 self.target_field = 0 self.target_field_comp = 0 if self.device is not None: self.device.set_voltage(0, self.channel) self.device.set_current(0, self.channel) self.device.disable_output(self.channel) g.ARDUINO.digitalWrite(self.ardPin, "LOW") except Exception as e: ui_print("Error while powering down %s: %s" % (self.name, e)) def set_signed_current(self, value): # sets current with correct polarity on this axis device = self.device channel = self.channel ardPin = self.ardPin # ui_print("Attempting to set current", value, "A") self.target_current = value if self.connected == "Connected" or True: # ToDo!: remove True, only for arduino testing! if abs(value) > self.max_amps: # prevent excessive currents self.power_down() # set output to 0 and deactivate raise ValueError("Invalid current value. Tried %0.2fA, max. %0.2fA allowed" % (value, self.max_amps)) elif value >= 0: # switch polarity as needed g.ARDUINO.digitalWrite(ardPin, "LOW") # ToDo: tie to arduino? elif value < 0: g.ARDUINO.digitalWrite(ardPin, "HIGH") # ToDo: tie to arduino? else: raise Exception("This should be impossible.") maxVoltage = min(max(1.1 * self.max_amps * self.resistance, 8), self.max_volts) # limit voltage # ui_print("sending values to device: U =", maxVoltage, "I =", abs(value)) if self.connected == "Connected": # ToDo!: remove if clause, only for arduino testing! device.set_current(abs(value), channel) device.set_voltage(maxVoltage, channel) device.enable_output(channel) else: ui_print(self.name, "not connected, can't set current.") def set_field_simple(self, value): # forms magnetic field as specified by value, w/o cancelling ambient field self.target_field = value self.target_field_comp = value current = value / self.coil_constant self.set_signed_current(current) def set_field(self, value): # forms magnetic field as specified by value, corrected for ambient field self.target_field = value field = value - self.ambient_field self.target_field_comp = field current = field / self.coil_constant self.set_signed_current(current) class ArduinoCtrl(Arduino): def __init__(self): self.connected = "Unknown" self.pins = [0, 0, 0] for i in range(3): # get correct pins from config file self.pins[i] = int(read_from_config(g.AXIS_NAMES[i], "relay_pin", g.CONFIG_OBJECT)) ui_print("\nConnecting to Arduino...") try: Arduino.__init__(self) # search for connected arduino and connect for pin in self.pins: self.pinMode(pin, "Output") self.digitalWrite(pin, "LOW") except Exception as e: ui_print("Connection to Arduino failed.", e) self.connected = "Not Connected" else: self.connected = "Connected" ui_print("Arduino ready.") def update_status_info(self): if self.connected == "Connected": try: for axis in g.AXES: if g.ARDUINO.digitalRead(axis.ardPin): axis.polarity_switched = "True" else: axis.polarity_switched = "False" except Exception as e: ui_print("Error with Arduino:", e) for axis in g.AXES: axis.polarity_switched = "Unknown" self.connected = "Connection Error" else: self.connected = "Connected" def safe(self): # sets output pins to low and closes serial connection for pin in self.pins: self.digitalWrite(pin, "LOW") def get_config_from_file(file=g.CONFIG_FILE): config_object = ConfigParser() # initialize config parser config_object.read(file) # open config file return config_object # return config object, that contains all info from the file def write_config_to_file(config_object): # write contents of config object to a config file with open(g.CONFIG_FILE, 'w') as conf: # Write changes to config file config_object.write(conf) def read_from_config(section, key, config_object): # read specific value from config object try: section_obj = config_object[section] # get relevant section value = section_obj[key] # get relevant value in the section return value except KeyError as e: ui_print("Error while reading config file:", e) raise KeyError("Could not find key", key, "in config file.") def edit_config(section, key, value, override=False): # edit specific value in config file config_object = g.CONFIG_OBJECT # ToDo: add check for data types, e.g. int for arduino ports # Check if value to write is within acceptable limits (set in globals.py) try: value_ok = 'OK' if section in g.AXIS_NAMES and not override: # only check numerical values and not if overridden by user value_ok = value_in_limits(section, key, value) # check if value is ok, too high or too low max_value = g.default_arrays[key][1][g.AXIS_NAMES.index(section)] # get max value min_value = g.default_arrays[key][2][g.AXIS_NAMES.index(section)] # get min value if value_ok == 'HIGH': message = "Prevented writing too high value for {s} {k} to config file:\n" \ "{v}, max. {mv} allowed. Erroneous values may damage equipment!" \ .format(s=section, k=key, v=value, mv=max_value) raise ValueError(message) elif value_ok == 'LOW': message = "Prevented writing too low value for {s} {k} to config file:\n" \ "{v}, max. {mv} allowed. Erroneous values may damage equipment!" \ .format(s=section, k=key, v=value, mv=min_value) raise ValueError(message) if value_ok == 'OK' or override: # value is within limits or user has overridden try: section_obj = config_object[section] # get relevant section except KeyError: ui_print("Could not find section", section, "in config file, creating new.") config_object.add_section(section) section_obj = config_object[section] try: section_obj[key] = str(value) # set relevant value in the section except KeyError: ui_print("Could not find key", key, "in config file, creating new.") config_object.set(section, key, str(value)) except KeyError as e: ui_print("Error while editing config file:", e) raise KeyError("Could not find key", key, "in config file.") def check_config(config_object): # check all numeric values in the config and see if they are within safe limits ui_print("Checking config file for values exceeding limits:") i = 0 concerns = {} # initialize dictionary for found problems problem_counter = 0 for axis in g.AXIS_NAMES: concerns[axis] = [] # create dictionary entry for this axis for key in g.default_arrays.keys(): # go over entries in this axis value = float(read_from_config(axis, key, config_object)) # read value to check from config file max_value = g.default_arrays[key][1][i] # get max value min_value = g.default_arrays[key][2][i] # get min value if not min_value <= value <= max_value: # value is not in safe limits concerns[axis].append(key) # add this entry to the problem dictionary problem_counter += 1 if len(concerns[axis]) == 0: concerns[axis].append("No problems detected.") ui_print(axis, ":", *concerns[axis]) # print out results for this axis i += 1 if problem_counter > 0: # some values are not ok # shop pup-up warning message: messagebox.showwarning("Warning!", "Found values exceeding limits in config file. Check values " "to ensure correct operation and avoid equipment damage!") g.app.show_frame(ui.Configuration) # open configuration window so user can check values def reset_config_to_default(file): # reset values in config object to defaults (stored in globals.py) config = ConfigParser() # initialize global config object g.CONFIG_OBJECT = config i = 0 for axis_name in g.AXIS_NAMES: # go through axes config.add_section(axis_name) # add section for this axis for key in g.default_arrays.keys(): # go through dictionary with default values config.set(axis_name, key, str(g.default_arrays[key][0][i])) # set value i += 1 config.add_section("PORTS") # add section for PSU serial ports for key in g.default_ports.keys(): config.set("PORTS", key, str(g.default_ports[key])) def ui_print(*content): # prints text to built in console output = "" for text in content: output = " ".join((output, str(text))) # append content if g.app is not None: output = "".join(("\n", output)) # begin new line each time g.app.OutputConsole.console.insert(END, output) # print to console g.app.OutputConsole.console.see(END) # scroll to bottom else: # if window is not open, do normal print print(output) def value_in_limits(axis, key, value): # Check if value is within safe limits (set in globals.py) max_value = g.default_arrays[key][1][g.AXIS_NAMES.index(axis)] # get max value min_value = g.default_arrays[key][2][g.AXIS_NAMES.index(axis)] # get min value if float(value) > float(max_value): return 'HIGH' elif float(value) < float(min_value): return 'LOW' else: return 'OK' def setup_all(): # main initialization function, creates device objects for all PSUs and Arduino and sets their values # Connect to Arduino: try: if g.ARDUINO is not None: # ui_print("\nClosing arduino link") try: g.ARDUINO.close() # close serial link before attempting reconnection except serial.serialutil.SerialException: pass # serial.flush() in Arduino.close() fails when reconnecting # this ignores it and allows serial.close() to execute (I think) except AttributeError: pass # when no Arduino is connected but g.ARDUINO has been initialized then there is nothing to close # this throws exception, which can be ignored g.ARDUINO = ArduinoCtrl() except Exception as e: ui_print("Arduino setup failed:", e) ui_print(traceback.print_exc()) g.AXES = [] g.XY_PORT = read_from_config("PORTS", "xy_port", g.CONFIG_OBJECT) g.Z_PORT = read_from_config("PORTS", "z_port", g.CONFIG_OBJECT) g.PORTS = [g.XY_PORT, g.XY_PORT, g.Z_PORT] ui_print("\nConnecting to XY Device on %s..." % g.XY_PORT) try: if g.XY_DEVICE is not None: ui_print("Closing serial connection on XY device") g.XY_DEVICE.serial.close() g.XY_DEVICE = None g.XY_DEVICE = PS2000B.PS2000B(g.XY_PORT) # setup PSU ui_print("Connection established.") g.X_AXIS = Axis(0, g.XY_DEVICE, 0, g.ARDUINO.pins[0]) # create axis objects g.Y_AXIS = Axis(1, g.XY_DEVICE, 1, g.ARDUINO.pins[1]) except serial.serialutil.SerialException: g.X_AXIS = Axis(0, None, 0, g.ARDUINO.pins[0]) # create axis objects g.Y_AXIS = Axis(1, None, 1, g.ARDUINO.pins[1]) ui_print("XY Device not connected or incorrect port set.") ui_print("Connecting to Z Device on %s..." % g.Z_PORT) try: if g.Z_DEVICE is not None: ui_print("Closing serial connection on Z device") g.Z_DEVICE.serial.close() g.Z_DEVICE = None g.Z_DEVICE = PS2000B.PS2000B(g.Z_PORT) ui_print("Connection established.") g.Z_AXIS = Axis(2, g.Z_DEVICE, 0, g.ARDUINO.pins[2]) except serial.serialutil.SerialException: g.Z_AXIS = Axis(2, None, 0, g.ARDUINO.pins[2]) ui_print("Z Device not connected or incorrect port set.") g.AXES.append(g.X_AXIS) g.AXES.append(g.Y_AXIS) g.AXES.append(g.Z_AXIS) ui_print("") # new line def activate_all(): # enables remote control and output on all PSUs and channels g.XY_DEVICE.enable_all() g.Z_DEVICE.enable_all() def print_status_3(): ui_print("X-Axis:") g.X_AXIS.print_status() ui_print("Y-Axis:") g.Y_AXIS.print_status() ui_print("Z-Axis:") g.Z_AXIS.print_status() def set_to_zero(device): # sets voltages and currents to 0 device.voltage1 = 0 device.current1 = 0 device.voltage2 = 0 device.current2 = 0 def power_down_all(): # temporary, set all outputs to 0 but keep connections enabled for axis in g.AXES: axis.power_down() # set outputs to 0 and pin to low on this axis def shut_down_all(): # shutdown at program end or on error, set outputs to 0 and disable connections # ToDo: remove checks if connected or make them only for printing ui_print("\nAttempting to safely shut down all devices. Check equipment to confirm.") if g.XY_DEVICE is not None: try: set_to_zero(g.XY_DEVICE) g.XY_DEVICE.disable_all() except BaseException as e: ui_print("Error while deactivating XY PSU:", e) else: ui_print("XY PSU deactivated.") else: ui_print("XY PSU not connected, can't deactivate.") if g.Z_DEVICE is not None: try: set_to_zero(g.Z_DEVICE) g.Z_DEVICE.disable_all() except BaseException as e: ui_print("Error while deactivating Z PSU:", e) else: ui_print("Z PSU deactivated.") else: ui_print("Z PSU not connected, can't deactivate.") try: g.ARDUINO.safe() except BaseException as e: ui_print("Arduino safing unsuccessful:", e) # this throws no exception, even when arduino is not connected # ToDo (optional): figure out error handling for this if g.ARDUINO.connected == "Connected": try: g.ARDUINO.close() except BaseException as e: ui_print("Closing Arduino connection failed:", e) else: ui_print("Serial connection to Arduino closed.") else: ui_print("Arduino not connected, can't close connection.") def set_field_simple(vector): # forms magnetic field as specified by vector, w/o cancelling ambient field for i in [0, 1, 2]: try: g.AXES[i].set_field_simple(vector[i]) except ValueError as e: ui_print(e) def set_field(vector): # forms magnetic field as specified by vector, corrected for ambient field for i in [0, 1, 2]: try: g.AXES[i].set_field(vector[i]) except ValueError as e: ui_print(e) def set_current_vec(vector): # sets needed currents on each axis for given vector i = 0 for axis in g.AXES: try: axis.target_field = 0 axis.target_field_comp = 0 axis.set_signed_current(vector[i]) except ValueError as e: ui_print(e) i += 1