forked from zietzm/Helmholtz_Test_Bench
487 lines
20 KiB
Python
487 lines
20 KiB
Python
from pyps2000b import PS2000B
|
|
from Arduino import Arduino
|
|
import globals as g
|
|
import pandas
|
|
import time
|
|
import numpy as np
|
|
import serial
|
|
import traceback
|
|
# noinspection PyPep8Naming
|
|
import User_Interface as ui
|
|
from tkinter import *
|
|
from tkinter import messagebox
|
|
from configparser import ConfigParser
|
|
|
|
|
|
class Axis:
|
|
def __init__(self, index, device, PSU_channel, arduino_pin):
|
|
# static information
|
|
self.index = index
|
|
self.device = device # power supply object (PS2000B class)
|
|
self.channel = PSU_channel # power supply unit channel (1 or 2)
|
|
self.ardPin = arduino_pin # output pin on the arduino for switching polarity on this axis
|
|
|
|
self.name = g.AXIS_NAMES[index]
|
|
self.port = g.PORTS[index]
|
|
|
|
self.resistance = float(read_from_config(self.name, "resistance", g.CONFIG_OBJECT))
|
|
self.max_watts = float(read_from_config(self.name, "max_watts", g.CONFIG_OBJECT))
|
|
self.max_amps = np.sqrt(self.max_watts / self.resistance)
|
|
self.max_volts = float(read_from_config(self.name, "max_volts", g.CONFIG_OBJECT))
|
|
|
|
self.coil_constant = float(read_from_config(self.name, "coil_const", g.CONFIG_OBJECT))
|
|
self.ambient_field = float(read_from_config(self.name, "ambient_field", g.CONFIG_OBJECT))
|
|
|
|
max_field = self.max_amps * self.coil_constant # max field reachable in this axis
|
|
self.max_field = np.array([-max_field, max_field])
|
|
self.max_comp_field = np.array([self.ambient_field - max_field, self.ambient_field + max_field])
|
|
|
|
# dynamic information
|
|
self.connected = "Not Connected"
|
|
self.output_active = "Unknown" # power output on the PSU enabled?
|
|
self.remote_ctrl_active = "Unknown" # remote control on the PSU enabled?
|
|
self.voltage_setpoint = 0 # target voltage on PSU [V]
|
|
self.voltage = 0 # actual voltage on PSU [V]
|
|
self.current_setpoint = 0 # target current on PSU [A]
|
|
self.current = 0 # actual current on PSU [A]
|
|
|
|
self.polarity_switched = "Unknown" # polarity switched on the Arduino?
|
|
|
|
self.target_field_comp = 0 # field to be created by coil pair (this is sent to the coils) [T]
|
|
self.target_field = 0 # field that should occur in measurement area (ambient still needs to be compensated) [T]
|
|
self.target_current = 0 # signed current that should pass through coil pair [A]
|
|
|
|
if self.device is not None:
|
|
self.update_status_info()
|
|
|
|
def update_status_info(self): # Read out the values of the parameters stored in this class and update them
|
|
try:
|
|
self.device.update_device_information(self.channel)
|
|
device_status = self.device.get_device_status_information(self.channel)
|
|
if device_status.output_active:
|
|
self.output_active = "Active"
|
|
else:
|
|
self.output_active = "Inactive"
|
|
if device_status.remote_control_active:
|
|
self.remote_ctrl_active = "Active"
|
|
else:
|
|
self.remote_ctrl_active = "Inactive"
|
|
|
|
self.voltage = self.device.get_voltage(self.channel)
|
|
self.voltage_setpoint = self.device.get_voltage_setpoint(self.channel)
|
|
self.current = self.device.get_current(self.channel)
|
|
self.current_setpoint = self.device.get_current_setpoint(self.channel)
|
|
except (serial.serialutil.SerialException, IndexError):
|
|
# ui_print("Connection Error with %s PSU on %s" % (self.name, self.port))
|
|
self.connected = "Connection Error"
|
|
self.output_active = "Unknown"
|
|
self.remote_ctrl_active = "Unknown"
|
|
else:
|
|
self.connected = "Connected"
|
|
|
|
def print_status(self): # axis = axis control variable, stored in globals.py
|
|
ui_print("%s, %0.2f V, %0.2f A"
|
|
% (self.device.get_device_status_information(self.channel),
|
|
self.device.get_voltage(self.channel), self.device.get_current(self.channel)))
|
|
|
|
def power_down(self): # temporary powerdown, set outputs to 0 but keep connections enabled
|
|
try:
|
|
self.target_current = 0
|
|
self.target_field = 0
|
|
self.target_field_comp = 0
|
|
if self.device is not None:
|
|
self.device.set_voltage(0, self.channel)
|
|
self.device.set_current(0, self.channel)
|
|
self.device.disable_output(self.channel)
|
|
g.ARDUINO.digitalWrite(self.ardPin, "LOW")
|
|
except Exception as e:
|
|
ui_print("Error while powering down %s: %s" % (self.name, e))
|
|
|
|
def set_signed_current(self, value): # sets current with correct polarity on this axis
|
|
device = self.device
|
|
channel = self.channel
|
|
ardPin = self.ardPin
|
|
# ui_print("Attempting to set current", value, "A")
|
|
self.target_current = value
|
|
if self.connected == "Connected" or True: # ToDo!: remove True, only for arduino testing!
|
|
|
|
if abs(value) > self.max_amps: # prevent excessive currents
|
|
self.power_down() # set output to 0 and deactivate
|
|
raise ValueError("Invalid current value. Tried %0.2fA, max. %0.2fA allowed" % (value, self.max_amps))
|
|
|
|
elif value >= 0: # switch polarity as needed
|
|
g.ARDUINO.digitalWrite(ardPin, "LOW") # ToDo: tie to arduino?
|
|
elif value < 0:
|
|
g.ARDUINO.digitalWrite(ardPin, "HIGH") # ToDo: tie to arduino?
|
|
else:
|
|
raise Exception("This should be impossible.")
|
|
|
|
maxVoltage = min(max(1.1 * self.max_amps * self.resistance, 8), self.max_volts) # limit voltage
|
|
# ui_print("sending values to device: U =", maxVoltage, "I =", abs(value))
|
|
if self.connected == "Connected": # ToDo!: remove if clause, only for arduino testing!
|
|
device.set_current(abs(value), channel)
|
|
device.set_voltage(maxVoltage, channel)
|
|
device.enable_output(channel)
|
|
else:
|
|
ui_print(self.name, "not connected, can't set current.")
|
|
|
|
def set_field_simple(self, value): # forms magnetic field as specified by value, w/o cancelling ambient field
|
|
self.target_field = value
|
|
self.target_field_comp = value
|
|
current = value / self.coil_constant
|
|
self.set_signed_current(current)
|
|
|
|
def set_field(self, value): # forms magnetic field as specified by value, corrected for ambient field
|
|
self.target_field = value
|
|
field = value - self.ambient_field
|
|
self.target_field_comp = field
|
|
current = field / self.coil_constant
|
|
self.set_signed_current(current)
|
|
|
|
|
|
class ArduinoCtrl(Arduino):
|
|
|
|
def __init__(self):
|
|
self.connected = "Unknown"
|
|
self.pins = [0, 0, 0]
|
|
for i in range(3): # get correct pins from config file
|
|
self.pins[i] = int(read_from_config(g.AXIS_NAMES[i], "relay_pin", g.CONFIG_OBJECT))
|
|
ui_print("\nConnecting to Arduino...")
|
|
try:
|
|
Arduino.__init__(self) # search for connected arduino and connect
|
|
for pin in self.pins:
|
|
self.pinMode(pin, "Output")
|
|
self.digitalWrite(pin, "LOW")
|
|
except Exception as e:
|
|
ui_print("Connection to Arduino failed.", e)
|
|
self.connected = "Not Connected"
|
|
else:
|
|
self.connected = "Connected"
|
|
ui_print("Arduino ready.")
|
|
|
|
def update_status_info(self):
|
|
if self.connected == "Connected":
|
|
try:
|
|
for axis in g.AXES:
|
|
if g.ARDUINO.digitalRead(axis.ardPin):
|
|
axis.polarity_switched = "True"
|
|
else:
|
|
axis.polarity_switched = "False"
|
|
except Exception as e:
|
|
ui_print("Error with Arduino:", e)
|
|
for axis in g.AXES:
|
|
axis.polarity_switched = "Unknown"
|
|
self.connected = "Connection Error"
|
|
else:
|
|
self.connected = "Connected"
|
|
|
|
def safe(self): # sets output pins to low and closes serial connection
|
|
for pin in self.pins:
|
|
self.digitalWrite(pin, "LOW")
|
|
|
|
|
|
def get_config_from_file(file=g.CONFIG_FILE):
|
|
config_object = ConfigParser() # initialize config parser
|
|
config_object.read(file) # open config file
|
|
return config_object # return config object, that contains all info from the file
|
|
|
|
|
|
def write_config_to_file(config_object): # write contents of config object to a config file
|
|
with open(g.CONFIG_FILE, 'w') as conf: # Write changes to config file
|
|
config_object.write(conf)
|
|
|
|
|
|
def read_from_config(section, key, config_object): # read specific value from config object
|
|
try:
|
|
section_obj = config_object[section] # get relevant section
|
|
value = section_obj[key] # get relevant value in the section
|
|
return value
|
|
except KeyError as e:
|
|
ui_print("Error while reading config file:", e)
|
|
raise KeyError("Could not find key", key, "in config file.")
|
|
|
|
|
|
def edit_config(section, key, value, override=False): # edit specific value in config file
|
|
config_object = g.CONFIG_OBJECT
|
|
# ToDo: add check for data types, e.g. int for arduino ports
|
|
|
|
# Check if value to write is within acceptable limits (set in globals.py)
|
|
try:
|
|
value_ok = 'OK'
|
|
if section in g.AXIS_NAMES and not override: # only check numerical values and not if overridden by user
|
|
value_ok = value_in_limits(section, key, value) # check if value is ok, too high or too low
|
|
max_value = g.default_arrays[key][1][g.AXIS_NAMES.index(section)] # get max value
|
|
min_value = g.default_arrays[key][2][g.AXIS_NAMES.index(section)] # get min value
|
|
if value_ok == 'HIGH':
|
|
message = "Prevented writing too high value for {s} {k} to config file:\n" \
|
|
"{v}, max. {mv} allowed. Erroneous values may damage equipment!" \
|
|
.format(s=section, k=key, v=value, mv=max_value)
|
|
raise ValueError(message)
|
|
elif value_ok == 'LOW':
|
|
message = "Prevented writing too low value for {s} {k} to config file:\n" \
|
|
"{v}, max. {mv} allowed. Erroneous values may damage equipment!" \
|
|
.format(s=section, k=key, v=value, mv=min_value)
|
|
raise ValueError(message)
|
|
|
|
if value_ok == 'OK' or override: # value is within limits or user has overridden
|
|
try:
|
|
section_obj = config_object[section] # get relevant section
|
|
except KeyError:
|
|
ui_print("Could not find section", section, "in config file, creating new.")
|
|
config_object.add_section(section)
|
|
section_obj = config_object[section]
|
|
try:
|
|
section_obj[key] = str(value) # set relevant value in the section
|
|
except KeyError:
|
|
ui_print("Could not find key", key, "in config file, creating new.")
|
|
config_object.set(section, key, str(value))
|
|
|
|
except KeyError as e:
|
|
ui_print("Error while editing config file:", e)
|
|
raise KeyError("Could not find key", key, "in config file.")
|
|
|
|
|
|
def check_config(config_object): # check all numeric values in the config and see if they are within safe limits
|
|
ui_print("Checking config file for values exceeding limits:")
|
|
i = 0
|
|
concerns = {} # initialize dictionary for found problems
|
|
problem_counter = 0
|
|
for axis in g.AXIS_NAMES:
|
|
concerns[axis] = [] # create dictionary entry for this axis
|
|
for key in g.default_arrays.keys(): # go over entries in this axis
|
|
value = float(read_from_config(axis, key, config_object)) # read value to check from config file
|
|
max_value = g.default_arrays[key][1][i] # get max value
|
|
min_value = g.default_arrays[key][2][i] # get min value
|
|
|
|
if not min_value <= value <= max_value: # value is not in safe limits
|
|
concerns[axis].append(key) # add this entry to the problem dictionary
|
|
problem_counter += 1
|
|
|
|
if len(concerns[axis]) == 0:
|
|
concerns[axis].append("No problems detected.")
|
|
|
|
ui_print(axis, ":", *concerns[axis]) # print out results for this axis
|
|
i += 1
|
|
if problem_counter > 0: # some values are not ok
|
|
# shop pup-up warning message:
|
|
messagebox.showwarning("Warning!", "Found values exceeding limits in config file. Check values "
|
|
"to ensure correct operation and avoid equipment damage!")
|
|
g.app.show_frame(ui.Configuration) # open configuration window so user can check values
|
|
|
|
|
|
def reset_config_to_default(file): # reset values in config object to defaults (stored in globals.py)
|
|
config = ConfigParser() # initialize global config object
|
|
g.CONFIG_OBJECT = config
|
|
|
|
i = 0
|
|
for axis_name in g.AXIS_NAMES: # go through axes
|
|
config.add_section(axis_name) # add section for this axis
|
|
for key in g.default_arrays.keys(): # go through dictionary with default values
|
|
config.set(axis_name, key, str(g.default_arrays[key][0][i])) # set value
|
|
i += 1
|
|
|
|
config.add_section("PORTS") # add section for PSU serial ports
|
|
for key in g.default_ports.keys():
|
|
config.set("PORTS", key, str(g.default_ports[key]))
|
|
|
|
|
|
def ui_print(*content): # prints text to built in console
|
|
output = ""
|
|
for text in content:
|
|
output = " ".join((output, str(text))) # append content
|
|
if g.app is not None:
|
|
output = "".join(("\n", output)) # begin new line each time
|
|
g.app.OutputConsole.console.insert(END, output) # print to console
|
|
g.app.OutputConsole.console.see(END) # scroll to bottom
|
|
else: # if window is not open, do normal print
|
|
print(output)
|
|
|
|
|
|
def value_in_limits(axis, key, value): # Check if value is within safe limits (set in globals.py)
|
|
max_value = g.default_arrays[key][1][g.AXIS_NAMES.index(axis)] # get max value
|
|
min_value = g.default_arrays[key][2][g.AXIS_NAMES.index(axis)] # get min value
|
|
|
|
if float(value) > float(max_value):
|
|
return 'HIGH'
|
|
elif float(value) < float(min_value):
|
|
return 'LOW'
|
|
else:
|
|
return 'OK'
|
|
|
|
|
|
def setup_all(): # main initialization function, creates device objects for all PSUs and Arduino and sets their values
|
|
# Connect to Arduino:
|
|
try:
|
|
if g.ARDUINO is not None:
|
|
# ui_print("\nClosing arduino link")
|
|
try:
|
|
g.ARDUINO.close() # close serial link before attempting reconnection
|
|
except serial.serialutil.SerialException:
|
|
pass
|
|
# serial.flush() in Arduino.close() fails when reconnecting
|
|
# this ignores it and allows serial.close() to execute (I think)
|
|
except AttributeError:
|
|
pass
|
|
# when no Arduino is connected but g.ARDUINO has been initialized then there is nothing to close
|
|
# this throws exception, which can be ignored
|
|
g.ARDUINO = ArduinoCtrl()
|
|
except Exception as e:
|
|
ui_print("Arduino setup failed:", e)
|
|
ui_print(traceback.print_exc())
|
|
|
|
g.AXES = []
|
|
|
|
g.XY_PORT = read_from_config("PORTS", "xy_port", g.CONFIG_OBJECT)
|
|
g.Z_PORT = read_from_config("PORTS", "z_port", g.CONFIG_OBJECT)
|
|
g.PORTS = [g.XY_PORT, g.XY_PORT, g.Z_PORT]
|
|
|
|
ui_print("\nConnecting to XY Device on %s..." % g.XY_PORT)
|
|
try:
|
|
if g.XY_DEVICE is not None:
|
|
ui_print("Closing serial connection on XY device")
|
|
g.XY_DEVICE.serial.close()
|
|
g.XY_DEVICE = None
|
|
g.XY_DEVICE = PS2000B.PS2000B(g.XY_PORT) # setup PSU
|
|
ui_print("Connection established.")
|
|
g.X_AXIS = Axis(0, g.XY_DEVICE, 0, g.ARDUINO.pins[0]) # create axis objects
|
|
g.Y_AXIS = Axis(1, g.XY_DEVICE, 1, g.ARDUINO.pins[1])
|
|
except serial.serialutil.SerialException:
|
|
g.X_AXIS = Axis(0, None, 0, g.ARDUINO.pins[0]) # create axis objects
|
|
g.Y_AXIS = Axis(1, None, 1, g.ARDUINO.pins[1])
|
|
ui_print("XY Device not connected or incorrect port set.")
|
|
|
|
ui_print("Connecting to Z Device on %s..." % g.Z_PORT)
|
|
try:
|
|
if g.Z_DEVICE is not None:
|
|
ui_print("Closing serial connection on Z device")
|
|
g.Z_DEVICE.serial.close()
|
|
g.Z_DEVICE = None
|
|
g.Z_DEVICE = PS2000B.PS2000B(g.Z_PORT)
|
|
ui_print("Connection established.")
|
|
g.Z_AXIS = Axis(2, g.Z_DEVICE, 0, g.ARDUINO.pins[2])
|
|
except serial.serialutil.SerialException:
|
|
g.Z_AXIS = Axis(2, None, 0, g.ARDUINO.pins[2])
|
|
ui_print("Z Device not connected or incorrect port set.")
|
|
|
|
g.AXES.append(g.X_AXIS)
|
|
g.AXES.append(g.Y_AXIS)
|
|
g.AXES.append(g.Z_AXIS)
|
|
|
|
ui_print("") # new line
|
|
|
|
|
|
def activate_all(): # enables remote control and output on all PSUs and channels
|
|
g.XY_DEVICE.enable_all()
|
|
g.Z_DEVICE.enable_all()
|
|
|
|
|
|
def print_status_3():
|
|
ui_print("X-Axis:")
|
|
g.X_AXIS.print_status()
|
|
ui_print("Y-Axis:")
|
|
g.Y_AXIS.print_status()
|
|
ui_print("Z-Axis:")
|
|
g.Z_AXIS.print_status()
|
|
|
|
|
|
def set_to_zero(device): # sets voltages and currents to 0
|
|
device.voltage1 = 0
|
|
device.current1 = 0
|
|
device.voltage2 = 0
|
|
device.current2 = 0
|
|
|
|
|
|
def power_down_all(): # temporary, set all outputs to 0 but keep connections enabled
|
|
for axis in g.AXES:
|
|
axis.power_down() # set outputs to 0 and pin to low on this axis
|
|
|
|
|
|
def shut_down_all(): # shutdown at program end or on error, set outputs to 0 and disable connections
|
|
# ToDo: remove checks if connected or make them only for printing
|
|
ui_print("\nAttempting to safely shut down all devices. Check equipment to confirm.")
|
|
if g.XY_DEVICE is not None:
|
|
try:
|
|
set_to_zero(g.XY_DEVICE)
|
|
g.XY_DEVICE.disable_all()
|
|
except BaseException as e:
|
|
ui_print("Error while deactivating XY PSU:", e)
|
|
else:
|
|
ui_print("XY PSU deactivated.")
|
|
else:
|
|
ui_print("XY PSU not connected, can't deactivate.")
|
|
if g.Z_DEVICE is not None:
|
|
try:
|
|
set_to_zero(g.Z_DEVICE)
|
|
g.Z_DEVICE.disable_all()
|
|
except BaseException as e:
|
|
ui_print("Error while deactivating Z PSU:", e)
|
|
else:
|
|
ui_print("Z PSU deactivated.")
|
|
else:
|
|
ui_print("Z PSU not connected, can't deactivate.")
|
|
|
|
try:
|
|
g.ARDUINO.safe()
|
|
except BaseException as e:
|
|
ui_print("Arduino safing unsuccessful:", e)
|
|
# this throws no exception, even when arduino is not connected
|
|
# ToDo (optional): figure out error handling for this
|
|
if g.ARDUINO.connected == "Connected":
|
|
try:
|
|
g.ARDUINO.close()
|
|
except BaseException as e:
|
|
ui_print("Closing Arduino connection failed:", e)
|
|
else:
|
|
ui_print("Serial connection to Arduino closed.")
|
|
else:
|
|
ui_print("Arduino not connected, can't close connection.")
|
|
|
|
|
|
def set_field_simple(vector): # forms magnetic field as specified by vector, w/o cancelling ambient field
|
|
for i in [0, 1, 2]:
|
|
try:
|
|
g.AXES[i].set_field_simple(vector[i])
|
|
except ValueError as e:
|
|
ui_print(e)
|
|
|
|
|
|
def set_field(vector): # forms magnetic field as specified by vector, corrected for ambient field
|
|
for i in [0, 1, 2]:
|
|
try:
|
|
g.AXES[i].set_field(vector[i])
|
|
except ValueError as e:
|
|
ui_print(e)
|
|
|
|
|
|
def set_current_vec(vector): # sets needed currents on each axis for given vector
|
|
i = 0
|
|
for axis in g.AXES:
|
|
try:
|
|
axis.target_field = 0
|
|
axis.target_field_comp = 0
|
|
axis.set_signed_current(vector[i])
|
|
except ValueError as e:
|
|
ui_print(e)
|
|
i += 1
|
|
|
|
|
|
def execute_csv(filepath): # runs through csv file containing times and desired field vectors
|
|
# csv format: time (s); xField (T); yField (T); zField (T)
|
|
# decimal commas
|
|
# ToDo: set to zero before start
|
|
ui_print("Reading File:", filepath)
|
|
file = pandas.read_csv(filepath, sep=';', decimal=',', header=0) # read csv file
|
|
array = file.to_numpy() # convert csv to array
|
|
t_zero = time.time() # set reference time for start of run
|
|
i = 0
|
|
ui_print("Starting File Execution...")
|
|
while i < len(array):
|
|
t = time.time() - t_zero
|
|
if t >= array[i, 0]: # time for this row has come
|
|
field_vec = array[i, 1:4] # extract desired field vector
|
|
# ui_print("t = %0.2f s, target field vector = " % (array[i, 0]), field_vec)
|
|
set_field(field_vec) # send field vector to test stand
|
|
i = i + 1 # next row
|
|
ui_print("File executed, powering down channels.")
|
|
power_down_all() # set currents and voltages to 0, set arduino pins to low
|