forked from zietzm/Helmholtz_Test_Bench
8d1870956f
file is now only written when explicitly wanted, e.g. on button press. global config stored instead as config object
487 lines
20 KiB
Python
487 lines
20 KiB
Python
from pyps2000b import PS2000B
|
|
from Arduino import Arduino
|
|
import globals as g
|
|
import pandas
|
|
import time
|
|
import numpy as np
|
|
import serial
|
|
import traceback
|
|
# noinspection PyPep8Naming
|
|
import User_Interface as ui
|
|
from tkinter import *
|
|
from tkinter import messagebox
|
|
from configparser import ConfigParser
|
|
|
|
|
|
class Axis:
|
|
def __init__(self, index, device, PSU_channel, arduino_pin):
|
|
# static information
|
|
self.index = index
|
|
self.device = device # power supply object (PS2000B class)
|
|
self.channel = PSU_channel # power supply unit channel (1 or 2)
|
|
self.ardPin = arduino_pin # output pin on the arduino for switching polarity on this axis
|
|
|
|
self.name = g.AXIS_NAMES[index]
|
|
self.port = g.PORTS[index]
|
|
|
|
self.resistance = float(read_from_config(self.name, "resistance", g.CONFIG_OBJECT))
|
|
self.max_watts = float(read_from_config(self.name, "max_watts", g.CONFIG_OBJECT))
|
|
self.max_amps = np.sqrt(self.max_watts / self.resistance)
|
|
self.max_volts = float(read_from_config(self.name, "max_volts", g.CONFIG_OBJECT))
|
|
|
|
self.coil_constant = float(read_from_config(self.name, "coil_const", g.CONFIG_OBJECT))
|
|
self.ambient_field = float(read_from_config(self.name, "ambient_field", g.CONFIG_OBJECT))
|
|
|
|
max_field = self.max_amps * self.coil_constant # max field reachable in this axis
|
|
self.max_field = np.array([-max_field, max_field])
|
|
self.max_comp_field = np.array([self.ambient_field - max_field, self.ambient_field + max_field])
|
|
|
|
# dynamic information
|
|
self.connected = "Not Connected"
|
|
self.output_active = "Unknown" # power output on the PSU enabled?
|
|
self.remote_ctrl_active = "Unknown" # remote control on the PSU enabled?
|
|
self.voltage_setpoint = 0 # target voltage on PSU [V]
|
|
self.voltage = 0 # actual voltage on PSU [V]
|
|
self.current_setpoint = 0 # target current on PSU [A]
|
|
self.current = 0 # actual current on PSU [A]
|
|
|
|
self.polarity_switched = "Unknown" # polarity switched on the Arduino?
|
|
|
|
self.target_field_comp = 0 # field to be created by coil pair (this is sent to the coils) [T]
|
|
self.target_field = 0 # field that should occur in measurement area (ambient still needs to be compensated) [T]
|
|
self.target_current = 0 # signed current that should pass through coil pair [A]
|
|
|
|
if self.device is not None:
|
|
self.update_status_info()
|
|
|
|
def update_status_info(self): # Read out the values of the parameters stored in this class and update them
|
|
try:
|
|
self.device.update_device_information(self.channel)
|
|
device_status = self.device.get_device_status_information(self.channel)
|
|
if device_status.output_active:
|
|
self.output_active = "Active"
|
|
else:
|
|
self.output_active = "Inactive"
|
|
if device_status.remote_control_active:
|
|
self.remote_ctrl_active = "Active"
|
|
else:
|
|
self.remote_ctrl_active = "Inactive"
|
|
|
|
self.voltage = self.device.get_voltage(self.channel)
|
|
self.voltage_setpoint = self.device.get_voltage_setpoint(self.channel)
|
|
self.current = self.device.get_current(self.channel)
|
|
self.current_setpoint = self.device.get_current_setpoint(self.channel)
|
|
except (serial.serialutil.SerialException, IndexError):
|
|
# ui_print("Connection Error with %s PSU on %s" % (self.name, self.port))
|
|
self.connected = "Connection Error"
|
|
self.output_active = "Unknown"
|
|
self.remote_ctrl_active = "Unknown"
|
|
else:
|
|
self.connected = "Connected"
|
|
|
|
def print_status(self): # axis = axis control variable, stored in globals.py
|
|
ui_print("%s, %0.2f V, %0.2f A"
|
|
% (self.device.get_device_status_information(self.channel),
|
|
self.device.get_voltage(self.channel), self.device.get_current(self.channel)))
|
|
|
|
def power_down(self): # temporary powerdown, set outputs to 0 but keep connections enabled
|
|
try:
|
|
self.target_current = 0
|
|
self.target_field = 0
|
|
self.target_field_comp = 0
|
|
if self.device is not None:
|
|
self.device.set_voltage(0, self.channel)
|
|
self.device.set_current(0, self.channel)
|
|
self.device.disable_output(self.channel)
|
|
g.ARDUINO.digitalWrite(self.ardPin, "LOW")
|
|
except Exception as e:
|
|
ui_print("Error while powering down %s: %s" % (self.name, e))
|
|
|
|
def set_signed_current(self, value): # sets current with correct polarity on this axis
|
|
device = self.device
|
|
channel = self.channel
|
|
ardPin = self.ardPin
|
|
# ui_print("Attempting to set current", value, "A")
|
|
self.target_current = value
|
|
if self.connected == "Connected" or True: # ToDo!: remove True, only for arduino testing!
|
|
|
|
if abs(value) > self.max_amps: # prevent excessive currents
|
|
self.power_down() # set output to 0 and deactivate
|
|
raise ValueError("Invalid current value. Tried %0.2fA, max. %0.2fA allowed" % (value, self.max_amps))
|
|
|
|
elif value >= 0: # switch polarity as needed
|
|
g.ARDUINO.digitalWrite(ardPin, "LOW") # ToDo: tie to arduino?
|
|
elif value < 0:
|
|
g.ARDUINO.digitalWrite(ardPin, "HIGH") # ToDo: tie to arduino?
|
|
else:
|
|
raise Exception("This should be impossible.")
|
|
|
|
maxVoltage = min(max(1.1 * self.max_amps * self.resistance, 8), self.max_volts) # limit voltage
|
|
# ui_print("sending values to device: U =", maxVoltage, "I =", abs(value))
|
|
if self.connected == "Connected": # ToDo!: remove if clause, only for arduino testing!
|
|
device.set_current(abs(value), channel)
|
|
device.set_voltage(maxVoltage, channel)
|
|
device.enable_output(channel)
|
|
else:
|
|
ui_print(self.name, "not connected, can't set current.")
|
|
|
|
def set_field_simple(self, value): # forms magnetic field as specified by value, w/o cancelling ambient field
|
|
self.target_field = value
|
|
self.target_field_comp = value
|
|
current = value / self.coil_constant
|
|
self.set_signed_current(current)
|
|
|
|
def set_field(self, value): # forms magnetic field as specified by value, corrected for ambient field
|
|
self.target_field = value
|
|
field = value - self.ambient_field
|
|
self.target_field_comp = field
|
|
current = field / self.coil_constant
|
|
self.set_signed_current(current)
|
|
|
|
|
|
class ArduinoCtrl(Arduino):
|
|
|
|
def __init__(self):
|
|
self.connected = "Unknown"
|
|
self.pins = [0, 0, 0]
|
|
for i in range(3): # get correct pins from config file
|
|
self.pins[i] = int(read_from_config(g.AXIS_NAMES[i], "relay_pin", g.CONFIG_OBJECT))
|
|
ui_print("\nConnecting to Arduino...")
|
|
try:
|
|
Arduino.__init__(self) # search for connected arduino and connect
|
|
for pin in self.pins:
|
|
self.pinMode(pin, "Output")
|
|
self.digitalWrite(pin, "LOW")
|
|
except Exception as e:
|
|
ui_print("Connection to Arduino failed.", e)
|
|
self.connected = "Not Connected"
|
|
else:
|
|
self.connected = "Connected"
|
|
ui_print("Arduino ready.")
|
|
|
|
def update_status_info(self):
|
|
if self.connected == "Connected":
|
|
try:
|
|
for axis in g.AXES:
|
|
if g.ARDUINO.digitalRead(axis.ardPin):
|
|
axis.polarity_switched = "True"
|
|
else:
|
|
axis.polarity_switched = "False"
|
|
except Exception as e:
|
|
ui_print("Error with Arduino:", e)
|
|
for axis in g.AXES:
|
|
axis.polarity_switched = "Unknown"
|
|
self.connected = "Connection Error"
|
|
else:
|
|
self.connected = "Connected"
|
|
|
|
def safe(self): # sets output pins to low and closes serial connection
|
|
for pin in self.pins:
|
|
self.digitalWrite(pin, "LOW")
|
|
|
|
|
|
def get_config_from_file(file=g.CONFIG_FILE):
|
|
config_object = ConfigParser() # initialize config parser
|
|
config_object.read(file) # open config file
|
|
return config_object # return config object, that contains all info from the file
|
|
|
|
|
|
def write_config_to_file(config_object): # ToDo: comments
|
|
with open(g.CONFIG_FILE, 'w') as conf: # Write changes to config file
|
|
config_object.write(conf)
|
|
|
|
|
|
def read_from_config(section, key, config_object): # read specific value from config object
|
|
try:
|
|
section_obj = config_object[section] # get relevant section
|
|
value = section_obj[key] # get relevant value in the section
|
|
return value
|
|
except KeyError as e:
|
|
ui_print("Error while reading config file:", e)
|
|
raise KeyError("Could not find key", key, "in config file.")
|
|
|
|
|
|
def edit_config(section, key, value, override=False): # edit specific value in config file
|
|
config_object = g.CONFIG_OBJECT
|
|
# ToDo: add check for data types, e.g. int for arduino ports
|
|
|
|
# Check if value to write is within acceptable limits (set in globals.py)
|
|
try:
|
|
value_ok = 'OK'
|
|
if section in g.AXIS_NAMES and not override: # only check numerical values and not if overridden by user
|
|
value_ok = value_in_limits(section, key, value) # check if value is ok, too high or too low
|
|
max_value = g.default_arrays[key][1][g.AXIS_NAMES.index(section)] # get max value
|
|
min_value = g.default_arrays[key][2][g.AXIS_NAMES.index(section)] # get min value
|
|
if value_ok == 'HIGH':
|
|
message = "Prevented writing too high value for {s} {k} to config file:\n" \
|
|
"{v}, max. {mv} allowed. Erroneous values may damage equipment!" \
|
|
.format(s=section, k=key, v=value, mv=max_value)
|
|
raise ValueError(message)
|
|
elif value_ok == 'LOW':
|
|
message = "Prevented writing too low value for {s} {k} to config file:\n" \
|
|
"{v}, max. {mv} allowed. Erroneous values may damage equipment!" \
|
|
.format(s=section, k=key, v=value, mv=min_value)
|
|
raise ValueError(message)
|
|
|
|
if value_ok == 'OK' or override: # value is within limits or user has overridden
|
|
try:
|
|
section_obj = config_object[section] # get relevant section
|
|
except KeyError:
|
|
ui_print("Could not find section", section, "in config file, creating new.")
|
|
config_object.add_section(section)
|
|
section_obj = config_object[section]
|
|
try:
|
|
section_obj[key] = str(value) # set relevant value in the section
|
|
except KeyError:
|
|
ui_print("Could not find key", key, "in config file, creating new.")
|
|
config_object.set(section, key, str(value))
|
|
|
|
except KeyError as e:
|
|
ui_print("Error while editing config file:", e)
|
|
raise KeyError("Could not find key", key, "in config file.")
|
|
|
|
|
|
def check_config(config_object): # check all numeric values in the config and see if they are within safe limits
|
|
ui_print("Checking config file for values exceeding limits:")
|
|
i = 0
|
|
concerns = {} # initialize dictionary for found problems
|
|
problem_counter = 0
|
|
for axis in g.AXIS_NAMES:
|
|
concerns[axis] = [] # create dictionary entry for this axis
|
|
for key in g.default_arrays.keys(): # go over entries in this axis
|
|
value = float(read_from_config(axis, key, config_object)) # read value to check from config file
|
|
max_value = g.default_arrays[key][1][i] # get max value
|
|
min_value = g.default_arrays[key][2][i] # get min value
|
|
|
|
if not min_value <= value <= max_value: # value is not in safe limits
|
|
concerns[axis].append(key) # add this entry to the problem dictionary
|
|
problem_counter += 1
|
|
|
|
if len(concerns[axis]) == 0:
|
|
concerns[axis].append("No problems detected.")
|
|
|
|
ui_print(axis, ":", *concerns[axis]) # print out results for this axis
|
|
i += 1
|
|
if problem_counter > 0: # some values are not ok
|
|
# shop pup-up warning message:
|
|
messagebox.showwarning("Warning!", "Found values exceeding limits in config file. Check values "
|
|
"to ensure correct operation and avoid equipment damage!")
|
|
g.app.show_frame(ui.Configuration) # open configuration window so user can check values
|
|
|
|
|
|
def reset_config_to_default(file): # reset values in config object to defaults (stored in globals.py)
|
|
config = ConfigParser() # initialize global config object
|
|
g.CONFIG_OBJECT = config
|
|
|
|
i = 0
|
|
for axis_name in g.AXIS_NAMES: # go through axes
|
|
config.add_section(axis_name) # add section for this axis
|
|
for key in g.default_arrays.keys(): # go through dictionary with default values
|
|
config.set(axis_name, key, str(g.default_arrays[key][0][i])) # set value
|
|
i += 1
|
|
|
|
config.add_section("PORTS") # add section for PSU serial ports
|
|
for key in g.default_ports.keys():
|
|
config.set("PORTS", key, str(g.default_ports[key]))
|
|
|
|
|
|
def ui_print(*content): # prints text to built in console
|
|
output = ""
|
|
for text in content:
|
|
output = " ".join((output, str(text))) # append content
|
|
if g.app is not None:
|
|
output = "".join(("\n", output)) # begin new line each time
|
|
g.app.OutputConsole.console.insert(END, output) # print to console
|
|
g.app.OutputConsole.console.see(END) # scroll to bottom
|
|
else: # if window is not open, do normal print
|
|
print(output)
|
|
|
|
|
|
def value_in_limits(axis, key, value): # Check if value is within safe limits (set in globals.py)
|
|
max_value = g.default_arrays[key][1][g.AXIS_NAMES.index(axis)] # get max value
|
|
min_value = g.default_arrays[key][2][g.AXIS_NAMES.index(axis)] # get min value
|
|
|
|
if float(value) > float(max_value):
|
|
return 'HIGH'
|
|
elif float(value) < float(min_value):
|
|
return 'LOW'
|
|
else:
|
|
return 'OK'
|
|
|
|
|
|
def setup_all(): # main initialization function, creates device objects for all PSUs and Arduino and sets their values
|
|
# Connect to Arduino:
|
|
try:
|
|
if g.ARDUINO is not None:
|
|
# ui_print("\nClosing arduino link")
|
|
try:
|
|
g.ARDUINO.close() # close serial link before attempting reconnection
|
|
except serial.serialutil.SerialException:
|
|
pass
|
|
# serial.flush() in Arduino.close() fails when reconnecting
|
|
# this ignores it and allows serial.close() to execute (I think)
|
|
except AttributeError:
|
|
pass
|
|
# when no Arduino is connected but g.ARDUINO has been initialized then there is nothing to close
|
|
# this throws exception, which can be ignored
|
|
g.ARDUINO = ArduinoCtrl()
|
|
except Exception as e:
|
|
ui_print("Arduino setup failed:", e)
|
|
ui_print(traceback.print_exc())
|
|
|
|
g.AXES = []
|
|
|
|
g.XY_PORT = read_from_config("PORTS", "xy_port", g.CONFIG_OBJECT)
|
|
g.Z_PORT = read_from_config("PORTS", "z_port", g.CONFIG_OBJECT)
|
|
g.PORTS = [g.XY_PORT, g.XY_PORT, g.Z_PORT]
|
|
|
|
ui_print("\nConnecting to XY Device on %s..." % g.XY_PORT)
|
|
try:
|
|
if g.XY_DEVICE is not None:
|
|
ui_print("Closing serial connection on XY device")
|
|
g.XY_DEVICE.serial.close()
|
|
g.XY_DEVICE = None
|
|
g.XY_DEVICE = PS2000B.PS2000B(g.XY_PORT) # setup PSU
|
|
ui_print("Connection established.")
|
|
g.X_AXIS = Axis(0, g.XY_DEVICE, 0, g.ARDUINO.pins[0]) # create axis objects
|
|
g.Y_AXIS = Axis(1, g.XY_DEVICE, 1, g.ARDUINO.pins[1])
|
|
except serial.serialutil.SerialException:
|
|
g.X_AXIS = Axis(0, None, 0, g.ARDUINO.pins[0]) # create axis objects
|
|
g.Y_AXIS = Axis(1, None, 1, g.ARDUINO.pins[1])
|
|
ui_print("XY Device not connected or incorrect port set.")
|
|
|
|
ui_print("Connecting to Z Device on %s..." % g.Z_PORT)
|
|
try:
|
|
if g.Z_DEVICE is not None:
|
|
ui_print("Closing serial connection on Z device")
|
|
g.Z_DEVICE.serial.close()
|
|
g.Z_DEVICE = None
|
|
g.Z_DEVICE = PS2000B.PS2000B(g.Z_PORT)
|
|
ui_print("Connection established.")
|
|
g.Z_AXIS = Axis(2, g.Z_DEVICE, 0, g.ARDUINO.pins[2])
|
|
except serial.serialutil.SerialException:
|
|
g.Z_AXIS = Axis(2, None, 0, g.ARDUINO.pins[2])
|
|
ui_print("Z Device not connected or incorrect port set.")
|
|
|
|
g.AXES.append(g.X_AXIS)
|
|
g.AXES.append(g.Y_AXIS)
|
|
g.AXES.append(g.Z_AXIS)
|
|
|
|
ui_print("") # new line
|
|
|
|
|
|
def activate_all(): # enables remote control and output on all PSUs and channels
|
|
g.XY_DEVICE.enable_all()
|
|
g.Z_DEVICE.enable_all()
|
|
|
|
|
|
def print_status_3():
|
|
ui_print("X-Axis:")
|
|
g.X_AXIS.print_status()
|
|
ui_print("Y-Axis:")
|
|
g.Y_AXIS.print_status()
|
|
ui_print("Z-Axis:")
|
|
g.Z_AXIS.print_status()
|
|
|
|
|
|
def set_to_zero(device): # sets voltages and currents to 0
|
|
device.voltage1 = 0
|
|
device.current1 = 0
|
|
device.voltage2 = 0
|
|
device.current2 = 0
|
|
|
|
|
|
def power_down_all(): # temporary, set all outputs to 0 but keep connections enabled
|
|
for axis in g.AXES:
|
|
axis.power_down() # set outputs to 0 and pin to low on this axis
|
|
|
|
|
|
def shut_down_all(): # shutdown at program end or on error, set outputs to 0 and disable connections
|
|
# ToDo: remove checks if connected or make them only for printing
|
|
ui_print("\nAttempting to safely shut down all devices. Check equipment to confirm.")
|
|
if g.XY_DEVICE is not None:
|
|
try:
|
|
set_to_zero(g.XY_DEVICE)
|
|
g.XY_DEVICE.disable_all()
|
|
except BaseException as e:
|
|
ui_print("Error while deactivating XY PSU:", e)
|
|
else:
|
|
ui_print("XY PSU deactivated.")
|
|
else:
|
|
ui_print("XY PSU not connected, can't deactivate.")
|
|
if g.Z_DEVICE is not None:
|
|
try:
|
|
set_to_zero(g.Z_DEVICE)
|
|
g.Z_DEVICE.disable_all()
|
|
except BaseException as e:
|
|
ui_print("Error while deactivating Z PSU:", e)
|
|
else:
|
|
ui_print("Z PSU deactivated.")
|
|
else:
|
|
ui_print("Z PSU not connected, can't deactivate.")
|
|
|
|
try:
|
|
g.ARDUINO.safe()
|
|
except BaseException as e:
|
|
ui_print("Arduino safing unsuccessful:", e)
|
|
# this throws no exception, even when arduino is not connected
|
|
# ToDo (optional): figure out error handling for this
|
|
if g.ARDUINO.connected == "Connected":
|
|
try:
|
|
g.ARDUINO.close()
|
|
except BaseException as e:
|
|
ui_print("Closing Arduino connection failed:", e)
|
|
else:
|
|
ui_print("Serial connection to Arduino closed.")
|
|
else:
|
|
ui_print("Arduino not connected, can't close connection.")
|
|
|
|
|
|
def set_field_simple(vector): # forms magnetic field as specified by vector, w/o cancelling ambient field
|
|
for i in [0, 1, 2]:
|
|
try:
|
|
g.AXES[i].set_field_simple(vector[i])
|
|
except ValueError as e:
|
|
ui_print(e)
|
|
|
|
|
|
def set_field(vector): # forms magnetic field as specified by vector, corrected for ambient field
|
|
for i in [0, 1, 2]:
|
|
try:
|
|
g.AXES[i].set_field(vector[i])
|
|
except ValueError as e:
|
|
ui_print(e)
|
|
|
|
|
|
def set_current_vec(vector): # sets needed currents on each axis for given vector
|
|
i = 0
|
|
for axis in g.AXES:
|
|
try:
|
|
axis.target_field = 0
|
|
axis.target_field_comp = 0
|
|
axis.set_signed_current(vector[i])
|
|
except ValueError as e:
|
|
ui_print(e)
|
|
i += 1
|
|
|
|
|
|
def execute_csv(filepath): # runs through csv file containing times and desired field vectors
|
|
# csv format: time (s); xField (T); yField (T); zField (T)
|
|
# decimal commas
|
|
# ToDo: set to zero before start
|
|
ui_print("Reading File:", filepath)
|
|
file = pandas.read_csv(filepath, sep=';', decimal=',', header=0) # read csv file
|
|
array = file.to_numpy() # convert csv to array
|
|
t_zero = time.time() # set reference time for start of run
|
|
i = 0
|
|
ui_print("Starting File Execution...")
|
|
while i < len(array):
|
|
t = time.time() - t_zero
|
|
if t >= array[i, 0]: # time for this row has come
|
|
field_vec = array[i, 1:4] # extract desired field vector
|
|
# ui_print("t = %0.2f s, target field vector = " % (array[i, 0]), field_vec)
|
|
set_field(field_vec) # send field vector to test stand
|
|
i = i + 1 # next row
|
|
ui_print("File executed, powering down channels.")
|
|
power_down_all() # set currents and voltages to 0, set arduino pins to low
|