Partially completed front-end reintegration.

This commit is contained in:
2021-08-03 22:29:09 +02:00
parent 11b5ea4e36
commit 8f70f85c84
11 changed files with 486 additions and 677 deletions
+3 -6
View File
@@ -6,7 +6,7 @@ import traceback
from tkinter import messagebox
# import other project files:
import src.cage_func as func
from src.helmholtz_cage_device import HelmholtzCageDevice
import src.globals as g
import src.config_handling as config
import src.csv_logging as log
@@ -23,7 +23,7 @@ def program_end(): # called on exception or when user closes application
if g.app.pages[ExecuteCSVMode].csv_thread is not None: # check if a thread for executing CSVs exists
g.app.pages[ExecuteCSVMode].csv_thread.stop() # stop the thread
func.shut_down_all() # shut down devices
g.CAGE_DEVICE.destroy() # shut down devices and end all threads.
if log.unsaved_data: # Check if there is logged data that has not been saved yet
# open pop-up to ask user if he wants to save the data:
@@ -49,7 +49,7 @@ try: # start normal operations
config.CONFIG_OBJECT = config.get_config_from_file(config.CONFIG_FILE) # read configuration data from config file
print("Starting setup...")
func.setup_all() # initiate communication with devices and initialize all major program objects
g.CAGE_DEVICE = HelmholtzCageDevice() # initiate communication with devices and initialize all major program objects
print("\nOpening User Interface...")
@@ -62,9 +62,6 @@ try: # start normal operations
ui_print("Program Initialized")
config.check_config(config.CONFIG_OBJECT) # check config for values exceeding limits
ui_print("\nStarting setup...") # do setup again, so it is printed in the UI console ToDo: do it only once
func.setup_all() # initiate communication with devices and initialize all major program objects
# Create TCP/Socket listener
socket_controller = SocketInterfaceThread()
socket_controller.start()
+5 -7
View File
@@ -1,6 +1,6 @@
from src.arduino import Arduino
import src.config_handling as config
from src.utility import ui_print
import src.config_handling as config_handling
import src.globals as g
@@ -11,7 +11,7 @@ class ArduinoDevice(Arduino):
def __init__(self):
self.pins = [0, 0, 0] # initialize list with pins to switch relay of each axis
for i in range(3): # get correct pins from the config
self.pins[i] = int(config.read_from_config(g.AXIS_NAMES[i], "relay_pin", config.CONFIG_OBJECT))
self.pins[i] = int(config_handling.read_from_config(g.AXIS_NAMES[i], "relay_pin", config_handling.CONFIG_OBJECT))
ui_print("\nConnecting to Arduino...")
# try to set up the arduino. Exceptions are handled by caller
@@ -29,11 +29,9 @@ class ArduinoDevice(Arduino):
else:
self.digitalWrite(self.pins[axis_index], "LOW")
def get_axis_polarity(self):
"""Returns a list with one bool per axis indicating whether the polarity is reversed (True)."""
status = []
for pin in self.pins: # go through all three pins/axes
status.append(self.digitalRead(pin)) # pin is HIGH --> relay is switched
def get_axis_polarity(self, idx):
"""Returns a bool indicating whether the axis polarity is reversed (True)."""
return self.digitalRead(self.pins[idx]) # pin is HIGH --> relay is switched
def shutdown(self):
"""Sets relay switching pins to low to de-power most of the electronics box"""
-431
View File
@@ -1,431 +0,0 @@
# This file contains all classes and functions directly related to the operation of the helmholtz test bench.
# The two main classes are Axis and ArduinoCtrl, see their definitions for details.
# import packages:
import numpy as np
import serial
import traceback
from tkinter import messagebox
# import other project files
from src.utility import ui_print
from src.ps2000b import PS2000B
from src.arduino import Arduino
import src.config_handling as config
import src.globals as g
class Axis:
# Main class representing an axis (x,y,z) of the test bench
# contains static and dynamic status information about this axis and methods to control it
def __init__(self, index, device, PSU_channel, arduino_pin):
# static information
self.index = index # index of this axis, 0->X, 1->Y, 2->Z
self.device = device # power supply object for this axis (PS2000B class object)
self.channel = PSU_channel # power supply unit channel (0 or 1)
self.ardPin = arduino_pin # output pin on the arduino for switching polarity on this axis
self.name = g.AXIS_NAMES[index] # get name of this axis from list in globals.py (e.g. "X-Axis"
self.port = g.PORTS[index] # get serial port of this axis PSU
# read static information from the configuration object (which has read it from the config file or settings):
self.resistance = float(config.read_from_config(self.name, "resistance", config.CONFIG_OBJECT))
self.max_amps = float(config.read_from_config(self.name, "max_amps", config.CONFIG_OBJECT))
self.max_volts = float(config.read_from_config(self.name, "max_volts", config.CONFIG_OBJECT))
self.coil_constant = float(config.read_from_config(self.name, "coil_const", config.CONFIG_OBJECT))
self.ambient_field = float(config.read_from_config(self.name, "ambient_field", config.CONFIG_OBJECT))
max_field = self.max_amps * self.coil_constant # calculate max field reachable in this axis
self.max_field = np.array([-max_field, max_field]) # make array with min/max reachable field (w/o compensation)
# calculate max and min field that can be reached after compensating for the ambient field
self.max_comp_field = np.array([self.ambient_field - max_field, self.ambient_field + max_field]) # [min, max]
# initialize dynamic information, this is updated by self.update_status_info() later
self.connected = "Not Connected"
self.output_active = "Unknown" # power output on the PSU enabled?
self.remote_ctrl_active = "Unknown" # remote control on the PSU enabled?
self.voltage_setpoint = 0 # target voltage on PSU [V]
self.voltage = 0 # actual voltage on PSU [V]
self.current_setpoint = 0 # target current on PSU [A]
self.current = 0 # actual current on PSU [A]
self.polarity_switched = "Unknown" # polarity switched on the Arduino?
self.target_field_comp = 0 # field to be created by coil pair (this is sent to the coils) [T]
self.target_field = 0 # field that should occur in measurement area (ambient still needs to be compensated) [T]
self.target_current = 0 # signed current that should pass through coil pair [A]
if self.device is not None:
self.update_status_info()
def update_status_info(self): # Read out the values of the dynamic parameters stored in this object and update them
try: # try to read out the data, this will fail on connection error to PSU
# ToDo: this takes a long time, try to improve performance
self.device.update_device_information(self.channel) # update the information in the device object
device_status = self.device.get_device_status_information(self.channel) # get object with new status info
if device_status.output_active: # is the power output active?
self.output_active = "Active"
else:
self.output_active = "Inactive"
# is remote control active, allowing the device to be controlled by this program?
if device_status.remote_control_active:
self.remote_ctrl_active = "Active"
else:
self.remote_ctrl_active = "Inactive"
# get currents and voltages:
self.voltage = self.device.get_voltage(self.channel)
self.voltage_setpoint = self.device.get_voltage_setpoint(self.channel)
self.current = self.device.get_current(self.channel)
self.current_setpoint = self.device.get_current_setpoint(self.channel)
except (serial.serialutil.SerialException, IndexError): # Connection error, usually the PSU is unplugged
if self.connected == "Connected": # only show error messages if the device was connected before this error
# Show error as print-out in console and as pop-up:
ui_print("Connection Error with %s PSU on %s" % (self.name, self.port))
messagebox.showerror("PSU Error", "Connection Error with %s PSU on %s" % (self.name, self.port))
# set status attributes to connection error status:
self.connected = "Connection Error"
self.output_active = "Unknown"
self.remote_ctrl_active = "Unknown"
else: # no communications error
self.connected = "Connected" # PSU is connected
def print_status(self): # print out the current status of the PSU channel (not used at the moment)
ui_print("%s, %0.2f V, %0.2f A"
% (self.device.get_device_status_information(self.channel),
self.device.get_voltage(self.channel), self.device.get_current(self.channel)))
def power_down(self): # temporary powerdown, set outputs to 0 but keep connections enabled
try:
# set class object attributes to 0 to reflect shutdown in status displays, log files etc.
self.target_current = 0
self.target_field = 0
self.target_field_comp = 0
if self.device is not None: # there is a PSU connected for this axis
self.device.set_voltage(0, self.channel) # set voltage on PSU channel to 0
self.device.set_current(0, self.channel) # set current on PSU channel to 0
self.device.disable_output(self.channel) # disable power output on PSU channel
g.ARDUINO.digitalWrite(self.ardPin, "LOW") # set arduino pin for polarity switch relay to unpowered state
except Exception as e: # some error was encountered
# show error message:
ui_print("Error while powering down %s: %s" % (self.name, e))
messagebox.showerror("PSU Error!", "Error while powering down %s: \n%s" % (self.name, e))
def set_signed_current(self, value):
# sets current with correct polarity on this axis, this is the primary way to control the test bench
# ui_print("Attempting to set current", value, "A")
self.target_current = value # show target value in object attribute for status display, logging etc.
if abs(value) > self.max_amps: # prevent excessive currents
self.power_down() # set output to 0 and deactivate
raise ValueError("Invalid current value on %s. Tried %0.2fA, max. %0.2fA allowed"
% (self.name, value, self.max_amps))
elif value >= 0: # switch the e-box relay to change polarity as needed
g.ARDUINO.digitalWrite(self.ardPin, "LOW") # command the output pin on the arduino in the electronics box
elif value < 0:
g.ARDUINO.digitalWrite(self.ardPin, "HIGH") # command the output pin on the arduino in the electronics box
# determine voltage limit to be set on PSU, must be high enough to not limit the current:
# min. 8V, max. max_volts, in-between as needed with current value (+margin to not limit current)
maxVoltage = min(max(1.3 * abs(value) * self.resistance, 8), self.max_volts) # limit voltage
if self.connected == "Connected": # only try to command the PSU if its actually connected
self.device.set_current(abs(value), self.channel) # set desired current
self.device.set_voltage(maxVoltage, self.channel) # set voltage limit
if self.output_active == "Inactive": # don't send unnecessary commands to PSU
# ToDo: without calling update_status_info() output_active may be wrong
self.device.enable_output(self.channel) # activate the power output
else: # the PSU is not connected
ui_print(self.name, "not connected, can't set current.")
def set_field_simple(self, value): # forms magnetic field as specified by value, w/o cancelling ambient field
self.target_field = value # update object attribute for display
self.target_field_comp = value # same as above, bc no compensation
current = value / self.coil_constant # calculate needed current
self.set_signed_current(current) # command the test bench
def set_field(self, value): # forms magnetic field as specified by value, corrected for ambient field
self.target_field = value # update object attribute for display
field = value - self.ambient_field # calculate needed field after compensation
self.target_field_comp = field # update object attribute for display
current = field / self.coil_constant # calculate needed current
self.set_signed_current(current) # command the test bench
class ArduinoCtrl(Arduino):
# main class to control the electronics box (which means commanding the arduino inside)
# inherits from the Arduino library
def __init__(self):
self.connected = "Unknown" # connection status attribute, nominal "Connected"
self.pins = [0, 0, 0] # initialize list with pins to switch relay of each axis
for i in range(3): # get correct pins from the config
self.pins[i] = int(config.read_from_config(g.AXIS_NAMES[i], "relay_pin", config.CONFIG_OBJECT))
ui_print("\nConnecting to Arduino...")
try: # try to set up the arduino
Arduino.__init__(self, timeout=0.2) # search for connected arduino and connect by initializing arduino library class
for pin in self.pins:
self.pinMode(pin, "Output")
self.digitalWrite(pin, "LOW")
except Exception as e: # some error occurred, usually the arduino is not connected
ui_print("Connection to Arduino failed.", e)
self.connected = "Not Connected"
else: # connection was successfully established
self.connected = "Connected"
ui_print("Arduino ready.")
def update_status_info(self): # update the attributes stored in this class object
if self.connected == "Connected": # only do this if arduino is connected (initialize new instance to reconnect)
try: # try to read the status of the pins from the arduino
for axis in g.AXES: # go through all three axes
if g.ARDUINO.digitalRead(axis.ardPin): # pin is HIGH --> relay is switched
axis.polarity_switched = "True" # set attribute in axis object accordingly
else: # pin is LOW --> relay is not switched
axis.polarity_switched = "False" # set attribute in axis object accordingly
except Exception as e: # some error occurred while trying to read status, usually arduino is disconnected
# show warning messages to alert user
ui_print("Error with arduino:", e)
messagebox.showerror("Error with arduino!", "Connection Error with arduino: \n%s" % e)
for axis in g.AXES: # set polarity switch attributes in axis objects to "Unknown"
axis.polarity_switched = "Unknown"
self.connected = "Connection Error" # update own connection status
else: # no error occurred --> data was read successfully
self.connected = "Connected" # update own connection status
def safe(self): # sets relay switching pins to low to depower most of the electronics box
for pin in self.pins:
self.digitalWrite(pin, "LOW")
def value_in_limits(axis, key, value): # Check if value is within safe limits (set in globals.py)
# axis is string with axis name, e.g. "X-Axis"
# key specifies which value to check, e.g. current
max_value = g.default_arrays[key][1][g.AXIS_NAMES.index(axis)] # get max value from dictionary in globals.py
min_value = g.default_arrays[key][2][g.AXIS_NAMES.index(axis)] # get min value from dictionary in globals.py
if float(value) > float(max_value): # value is too high
return 'HIGH'
elif float(value) < float(min_value): # value is too low
return 'LOW'
else: # value is within limits
return 'OK'
def setup_all(): # main test bench initialization function
# creates device objects for all PSUs and Arduino and sets their values
# initializes an object of class Axis for all three axes (x,y,z)
# Setup Arduino:
try: # broad error handling for unforeseen errors, handling in ArduinoCtrl should catch most errors
if g.ARDUINO is not None: # the arduino has been initialized before, so we need to first close its connection
try:
g.ARDUINO.close() # close serial link
except serial.serialutil.SerialException:
pass
# serial.flush() in Arduino.close() fails when reconnecting
# this ignores it and allows serial.close() to execute (I think)
except AttributeError:
pass
# when no Arduino is connected but g.ARDUINO has been initialized then there is nothing to close
# this throws an exception, which can be ignored
g.ARDUINO = ArduinoCtrl() # initialize the arduino object from the control class, connects and sets up
except Exception as e: # some unforeseen error occurred (not connected issue handled in ArduinoCtrl class)
# show error messages to alert user
ui_print("Arduino setup failed:", e)
ui_print(traceback.print_exc())
messagebox.showerror("Error!", "Arduino setup failed:\n%s \nCheck traceback in console." % e)
# Setup PSUs and axis objects:
g.AXES = [] # initialize global list containing the three axis objects
# get serial ports for the PSUs from config
g.XY_PORT = config.read_from_config("PORTS", "xy_port", config.CONFIG_OBJECT)
g.Z_PORT = config.read_from_config("PORTS", "z_port", config.CONFIG_OBJECT)
g.PORTS = [g.XY_PORT, g.XY_PORT, g.Z_PORT] # write list with PSU port for each axis (X/Y share PSU)
# setup PSU and axis objects for X and Y axes:
ui_print("Connecting to XY Device on %s..." % g.XY_PORT)
try: # try to connect to the PSU
if g.XY_DEVICE is not None: # if PSU has previously been connected we need to close the serial link first
ui_print("Closing serial connection on XY device")
g.XY_DEVICE.serial.close()
g.XY_DEVICE = None
g.XY_DEVICE = PS2000B.PS2000B(g.XY_PORT) # setup PSU
ui_print("Connection established.")
g.X_AXIS = Axis(0, g.XY_DEVICE, 0, g.ARDUINO.pins[0]) # create axis objects (index, PSU, channel, relay pin)
g.Y_AXIS = Axis(1, g.XY_DEVICE, 1, g.ARDUINO.pins[1])
except serial.serialutil.SerialException: # communications error, usually PSU is not connected or wrong port set
g.X_AXIS = Axis(0, None, 0, g.ARDUINO.pins[0]) # create axis objects without the PSU
g.Y_AXIS = Axis(1, None, 1, g.ARDUINO.pins[1])
ui_print("XY Device not connected or incorrect port set.")
# same for the Z axis
ui_print("Connecting to Z Device on %s..." % g.Z_PORT)
try:
if g.Z_DEVICE is not None:
ui_print("Closing serial connection on Z device")
g.Z_DEVICE.serial.close()
g.Z_DEVICE = None
g.Z_DEVICE = PS2000B.PS2000B(g.Z_PORT)
ui_print("Connection established.")
g.Z_AXIS = Axis(2, g.Z_DEVICE, 0, g.ARDUINO.pins[2])
except serial.serialutil.SerialException:
g.Z_AXIS = Axis(2, None, 0, g.ARDUINO.pins[2])
ui_print("Z Device not connected or incorrect port set.")
# put newly created axis objects into a list for access later
g.AXES.append(g.X_AXIS)
g.AXES.append(g.Y_AXIS)
g.AXES.append(g.Z_AXIS)
ui_print("") # print new line
def set_to_zero(device): # sets voltages and currents to 0 on all channels of a specific PSU
device.voltage1 = 0
device.current1 = 0
device.voltage2 = 0
device.current2 = 0
def power_down_all(): # on all PSUs set all outputs to 0 but keep connections enabled
for axis in g.AXES:
axis.power_down() # set outputs to 0 and pin to low on this axis
def shut_down_all(): # safe shutdown at program end or on error
# set outputs to 0 and disable connections on all devices
ui_print("\nAttempting to safely shut down all devices. Check equipment to confirm.")
# start writing string to later show how shutdown on all devices went in a single info pop-up:
message = "Tried to shut down all devices. Check equipment to confirm."
# Shut down PSUs:
if g.XY_DEVICE is not None: # the PSU has been setup before
try: # try to safe the PSU
set_to_zero(g.XY_DEVICE) # set currents and voltages to 0 for both channels
g.XY_DEVICE.disable_all() # disable power output on both channels
ui_print("Closing serial connection on XY PSU")
g.XY_DEVICE.serial.close()
g.XY_DEVICE = None
except BaseException as e: # some error occurred, usually device has been disconnected
ui_print("Error while deactivating XY PSU:", e) # print the problem in the console
message += "\nError while deactivating XY PSU: %s" % e # append status to the message to show later
else: # device was successfully deactivated
ui_print("XY PSU deactivated.")
message += "\nXY PSU deactivated." # append message to show later
else: # the device was not connected before
# tell user there was no need/no possibility to deactivate:
ui_print("XY PSU not connected, can't deactivate.")
message += "\nXY PSU not connected, can't deactivate."
# same as above
if g.Z_DEVICE is not None:
try:
set_to_zero(g.Z_DEVICE)
g.Z_DEVICE.disable_all()
ui_print("Closing serial connection on Z PSU")
g.Z_DEVICE.serial.close()
g.Z_DEVICE = None
except BaseException as e:
ui_print("Error while deactivating Z PSU:", e)
message += "\nError while deactivating Z PSU: %s" % e
else:
ui_print("Z PSU deactivated.")
message += "\nZ PSU deactivated."
else:
ui_print("Z PSU not connected, can't deactivate.")
message += "\nZ PSU not connected, can't deactivate."
# Shut down arduino:
try:
g.ARDUINO.safe() # call safe method in ArduinoCtrl class (all relay pins to LOW)
except BaseException as e: # some error occurred
ui_print("Arduino safing unsuccessful:", e)
message += "\nArduino safing unsuccessful: %s" % e # append to the message to show later
# this throws no exception, even when arduino is not connected
# ToDo (optional): figure out error handling for this
try:
g.ARDUINO.close() # close the serial link
except BaseException as e: # something went wrong there
if g.ARDUINO.connected == "Connected": # Arduino was connected, some error occurred
ui_print("Closing Arduino connection failed:", e)
message += "\nClosing Arduino connection failed: %s" % e
else: # Arduino was not connected, so error is expected
ui_print("Arduino not connected, can't close connection.")
message += "\nArduino not connected, can't close connection."
else: # no problems, connection was successfully closed
ui_print("Serial connection to Arduino closed.")
message += "\nSerial connection to Arduino closed."
messagebox.showinfo("Program ended", message) # Show a unified pop-up with how the shutdown on each device went
def set_field_simple(vector): # forms magnetic field as specified by vector, w/o cancelling ambient field
for i in [0, 1, 2]:
try:
g.AXES[i].set_field_simple(vector[i]) # try to set the field on each axis
except ValueError as e: # a limit was violated, usually the needed current was too high
ui_print(e) # let the user know
def set_field(vector): # forms magnetic field as specified by vector, corrected for ambient field
# same as set_field_simple(), but with compensation
for i in [0, 1, 2]:
try:
g.AXES[i].set_field(vector[i])
except ValueError as e:
ui_print(e)
def set_current_vec(vector): # sets currents on each axis according to given vector
i = 0
for axis in g.AXES:
try:
axis.target_field = 0 # set target field attribute to 0 to show that current, not field is controlled atm
axis.target_field_comp = 0 # as above
axis.set_signed_current(vector[i]) # command test bench to set the current
except ValueError as e: # current was too high
ui_print(e) # print out the error message
i += 1
def devices_ok(xy_off=False, z_off=False, arduino_off=False):
# check if all devices are connected, return True if yes
# checks for individual devices can be disabled by parameters above (default not disabled)
try: # handle errors while checking connections
if not xy_off: # if check for this device is not disabled
if g.XY_DEVICE is not None: # has the handle for this device been set?
g.X_AXIS.update_status_info() # update info --> this actually communicates with the device
if g.X_AXIS.connected != "Connected": # if not connected
return False # return and exit function
else: # if handle has not been set the device is inactive --> not ok
return False
if not z_off: # same as above
if g.Z_DEVICE is not None:
g.Z_AXIS.update_status_info()
if g.Z_AXIS.connected != "Connected":
return False
else:
return False
if not arduino_off: # check not disabled
g.ARDUINO.update_status_info() # update status info --> attempts communication
if g.ARDUINO.connected != "Connected":
return False
except Exception as e: # if an error is encountered while checking the devices
messagebox.showerror("Error!", "Error while checking devices: \n%s" % e) # show error pop-up
return False # clearly something is not ok
else: # if nothing has triggered so far all devices are ok --> return True
return True
+7 -7
View File
@@ -7,9 +7,9 @@ from tkinter import messagebox
# import other project files:
import src.globals as g
import src.cage_func as func
from src.utility import ui_print
from src.user_interface import HardwareConfiguration
import src.user_interface as ui
import src.helmholtz_cage_device as helmholtz_cage_device
global CONFIG_FILE # string storing the path of the used config file
global CONFIG_OBJECT # object of type ConfigParser(), storing all configuration information
@@ -51,7 +51,7 @@ def edit_config(section, key, value, override=False): # edit a specific value i
try:
value_ok = 'OK'
if section in g.AXIS_NAMES and not override: # only check values in axis sections and not if check overridden
value_ok = func.value_in_limits(section, key, value) # check if value is ok, too high or too low
value_ok = helmholtz_cage_device.value_in_limits(section, key, value) # check if value is ok, too high or too low
if value_ok == 'HIGH': # value is too high
max_value = g.default_arrays[key][1][g.AXIS_NAMES.index(section)] # get max value for message printing
@@ -111,7 +111,7 @@ def check_config(config_object): # check all numeric values in the config and s
# shop pup-up warning message:
messagebox.showwarning("Warning!", "Found %i value(s) exceeding limits in config file. Check values "
"to ensure correct operation and avoid equipment damage!" % problem_counter)
g.app.show_frame(HardwareConfiguration) # open configuration window so user can check values
g.app.show_frame(ui.HardwareConfiguration) # open configuration window so user can check values
def reset_config_to_default(): # reset values in config object to defaults (set in globals.py)
@@ -126,6 +126,6 @@ def reset_config_to_default(): # reset values in config object to defaults (set
config.set(axis_name, key, str(g.default_arrays[key][0][i])) # set values
i += 1
config.add_section("PORTS") # add section for PSU serial ports
for key in g.default_ports.keys(): # go through dictionary of default serial ports
config.set("PORTS", key, str(g.default_ports[key])) # set the value for each axis
config.add_section("Supplies") # add section for PSU serial ports
for key in g.default_psu_config.keys(): # go through dictionary of default serial ports
config.set("Supplies", key, str(g.default_psu_config[key])) # set the value for each axis
-1
View File
@@ -13,7 +13,6 @@ import matplotlib.pyplot as plt
# import other project files:
from src.utility import ui_print
import src.user_interface as ui
import src.cage_func as func
import src.globals as g
+11 -5
View File
@@ -36,12 +36,18 @@ default_arrays = {
"max_amps": np.array([[5, 5, 5], [6, 6, 6], [0, 0, 0]], dtype=float), # max. allowed current (A)
"relay_pin": [[15, 16, 17], [15, 16, 17], [15, 16, 17]] # pins on the arduino for reversing [x,y,z] polarity
}
# Dictionary for PSU serial ports:
default_ports = {
"xy_port": "COM8", # Default serial port where PSU for X- and Y-Axes is connected
"z_port": "COM5", # Default serial port where PSU for Z-Axis is connected
# Dictionary for PSU configuration:
default_psu_config = {
"supply_model": "ps2000b",
"xy_port": "COM", # Default serial port where PSU for X- and Y-Axes is connected
"z_port": "COM", # Default serial port where PSU for Z-Axis is connected
}
# Configuration for socket interface
SOCKET_PORT = 6677
SOCKET_MAX_CONNECTIONS = 5
SOCKET_MAX_CONNECTIONS = 5
# Exception used globally throughout the application
class DeviceNotConnected(Exception):
pass
+292 -104
View File
@@ -3,10 +3,12 @@ from threading import RLock, Thread, Event
from tkinter import messagebox
from copy import deepcopy
import numpy as np
from src.arduino_device import ArduinoDevice
from src.psu_device import PSUDevicePS2000B, PSUDeviceQL355TP
from src.user_interface import ui_print
import src.config_handling as config
from src.utility import ui_print
import src.config_handling as config_handling
import src.globals as g
@@ -14,16 +16,20 @@ class ProxyNotOwnedException(Exception):
pass
class DeviceBusy(Exception):
pass
class HelmholtzCageDevice:
"""This is the central object for controlling all the test bench related HW. This way, access can be
synchronized and exclusive to a single controller at once. This device always exists, irrespective of
which devices are actually connected. Only the request_proxy and shutdown method should be used !!!
which devices are actually connected. Only the request_proxy, shutdown and destroy methods should be used !!!
Provides subscriber interface for periodic status information.
Provides proxy model to control access."""
# This class is thread safe
POLLING_INTERVAL = 1.5 # Seconds between polling the device state
POLLING_INTERVAL = 1 # Seconds between polling the device state
def __init__(self):
# Indicates all the threads should be joined
@@ -47,51 +53,32 @@ class HelmholtzCageDevice:
self.proxy_lock = RLock()
# Contains the id of the current in control proxy
self.proxy_id = None
# Contains the current context manager proxy. All other proxy instances are managed externally
self.context_manager_proxy = None
# --- STATUS VARIABLES ---
# These are used to store information about our last command to provide status updates asynchronously
self.target_field = [0, 0, 0]
self.target_field_raw = [0, 0, 0]
self.target_current = [0, 0, 0]
# --- DEVICE SETUP ---
# Must be acquired to access any hardware
self.hardware_lock = RLock()
# All devices are usable if the object exists. None indicates the device is not connected/not working properly
# Arduino setup
try:
self.arduino = ArduinoDevice()
except Exception as e:
self.arduino = None
# show error messages to alert user
ui_print("Arduino setup failed:", e)
ui_print(traceback.print_exc())
messagebox.showerror("Error!", "Arduino setup failed:\n%s \nCheck traceback in console." % e)
# Com ports
self.com_port_psu1 = None
self.com_port_psu2 = None
# PSU setup
self.com_port_psu1 = config.read_from_config("Supplies", "xy_port", config.CONFIG_OBJECT)
self.com_port_psu2 = config.read_from_config("Supplies", "z_port", config.CONFIG_OBJECT)
self.psu_type = config.read_from_config("Supplies", "supply_model", config.CONFIG_OBJECT)
# psu1: controls xy axis
try:
if self.psu_type == "ps2000b":
self.psu1 = PSUDevicePS2000B(self.com_port_psu1)
elif self.psu_type == "ql355tp":
self.psu1 = PSUDeviceQL355TP(self.com_port_psu1)
else:
raise Exception("Invalid psu model: {}".format(self.psu_type))
except Exception as e:
self.psu1 = None
# PSU object used
self.psu_type = None
# psu2: controls z axis
try:
if self.psu_type == "ps2000b":
self.psu2 = PSUDevicePS2000B(self.com_port_psu2)
elif self.psu_type == "ql355tp":
self.psu2 = PSUDeviceQL355TP(self.com_port_psu2)
else:
raise Exception("Invalid psu model: {}".format(self.psu_type))
except Exception as e:
self.psu2 = None
# Hardware object variables
self.arduino = None
self.psu1 = None
self.psu2 = None
# Zero and activate channels. This is a sort of "armed" state so that we can send commands later
self.psu1.idle()
self.psu2.idle()
self.connect_hardware()
# --- AXIS CONFIGURATION ---
# This is also hardware related, but in a separate section to keep it clean.
@@ -99,10 +86,8 @@ class HelmholtzCageDevice:
self.axes = []
# Loop over axes
for i in range(3):
axis_dict = {}
for key in ["coil_const", "ambient_field", "resistance", "max_volts", "max_amps"]:
axis_dict[key] = float(config.read_from_config(g.AXIS_NAMES[i], key, config.CONFIG_OBJECT))
self.axes.append(axis_dict)
# The axes talks to the HW objects (Arduino, PSU) referenced in this object
self.axes.append(Axis(i, self))
# --- HW COMMUNICATION THREAD ---
self._cmd_exec_thread = Thread(target=self._cmd_exec_thread_method)
@@ -111,6 +96,57 @@ class HelmholtzCageDevice:
self._hw_poll_thread = Thread(target=self._hw_poll_thread_method)
self._hw_poll_thread.start()
# TODO: Move to proxy
def reconnect_hardware(self):
self.shutdown()
self.connect_hardware()
# TODO: Move to proxy
def connect_hardware(self):
"""Connects devices. Does not check if they are already connected: Remember to call shutdown first"""
with self.hardware_lock:
# All devices are usable if the object exists. None indicates
# the device is not connected/not working properly
# Arduino setup
try:
self.arduino = ArduinoDevice()
except Exception as e:
self.arduino = None
# show error messages to alert user
ui_print("Arduino setup failed:", e)
# PSU setup
self.com_port_psu1 = config_handling.read_from_config("Supplies", "xy_port", config_handling.CONFIG_OBJECT)
self.com_port_psu2 = config_handling.read_from_config("Supplies", "z_port", config_handling.CONFIG_OBJECT)
psu_type_string = config_handling.read_from_config("Supplies", "supply_model",
config_handling.CONFIG_OBJECT)
if psu_type_string == "ps2000b":
self.psu_type = PSUDevicePS2000B
elif psu_type_string == "ql355tp":
self.psu_type = PSUDeviceQL355TP
else:
raise Exception("Invalid psu model: {}".format(self.psu_type))
# psu1: controls xy axis
try:
self.psu1 = self.psu_type(self.com_port_psu1)
except Exception as e:
self.psu1 = None
ui_print("Error creating PSU device:\n{}".format(e))
# psu2: controls z axis
try:
self.psu2 = self.psu_type(self.com_port_psu2)
except Exception as e:
self.psu2 = None
ui_print("Error creating PSU device:\n{}".format(e))
# Zero and activate channels. This is a sort of "armed" state so that we can send commands later
if self.psu1 is not None:
self.psu1.idle()
if self.psu2 is not None:
self.psu2.idle()
def request_proxy(self):
"""Returns a new HelmholtzCageProxy or None, depending on if access is available"""
with self.proxy_lock:
@@ -120,8 +156,13 @@ class HelmholtzCageDevice:
self.proxy_id = id(new_proxy)
return new_proxy
else:
# The interface is occupied, the caller tolerate that the request failed.
return None
# The interface is occupied, the caller must tolerate that the request failed.
raise DeviceBusy
def __enter__(self):
"""Enables: with g.CAGE_DEVICE as dev:"""
self.context_manager_proxy = self.request_proxy()
return self.context_manager_proxy
def release_proxy(self, proxy_obj):
"""Releases the proxy to free access for other controllers. Should only be called when proxy is destroyed"""
@@ -131,6 +172,10 @@ class HelmholtzCageDevice:
self.proxy_id = None
# Otherwise do nothing, this case requires no behaviour
def __exit__(self, *args):
"""Enables: with g.CAGE_DEVICE as dev:"""
self.release_proxy(self.context_manager_proxy)
def subscribe_status_updates(self, callback):
# List containing all interested subscribers.
# We won't check if a callback is added twice. Not our responsibility
@@ -139,6 +184,7 @@ class HelmholtzCageDevice:
def queue_command(self, proxy_obj, command):
""" Queues a dict for immediate execution containing the command for the cage as a whole.
Since the newest command should always be run, it is not a real queue (just a variable)"""
with self.proxy_lock:
if id(proxy_obj) != self.proxy_id:
raise ProxyNotOwnedException()
@@ -152,9 +198,11 @@ class HelmholtzCageDevice:
"""This method forms the main thread for hardware command execution."""
while not self._stop_flag.is_set():
self.new_command_flag.wait()
self.new_command_flag.clear()
# Avoid blocking the buffer while we are executing.
with self.command_lock:
command_buffer = deepcopy(self.command) # Dicts are mutable so must be copied
# Dicts and lists are mutable so must be (deep)copied
command_buffer = HelmholtzCageDevice._copy_command(self.command)
self.command = None # Processed commands are removed from "buffer"
if command_buffer:
try:
@@ -183,84 +231,70 @@ class HelmholtzCageDevice:
if stop_flag_set:
return
status_data = {'axes': []}
ard_conn = self.arduino is not None
status_data = {'axes': [],
'arduino_connected': ard_conn}
with self.hardware_lock:
# This polls all three axes at once
for i in range(3):
# Helper function to find correct psu and channel to talk to
psu, channel = self._get_psu_for_axis(i)
# This is a slow operation, watch out!
status = psu.poll_channel_state(channel)
status_data['axes'].append(status)
for axis in self.axes:
status_data['axes'].append(axis.get_status_dict())
# Distribute status data to all interested subscribers
for subscriber in self._subscribers:
subscriber(status_data)
@staticmethod
def _copy_command(command):
# PyCharm has an issue with the deepcopy tool, so just handle the copying manually.
if command is None:
return command
try:
return {'command': command['command'],
'arg': command['arg'].copy()}
except AttributeError:
# AttributeError: '---' object has no attribute 'copy'
# Should be immutable. Otherwise we have a problem
return {'command': command['command'],
'arg': command['arg']}
def _set_field_raw(self, arg):
currents = []
for i in range(3):
currents.append(arg[i] / self.axes[i]['coil_const'])
for axis, field in zip(self.axes, arg):
with self.hardware_lock:
axis.set_field_raw(field)
def _set_field_compensated(self, arg):
currents = []
for i in range(3):
target_field = arg[i] - self.axes[i]['ambient_field']
currents.append(target_field / self.axes[i]['coil_const'])
for axis, field in zip(self.axes, arg):
with self.hardware_lock:
axis.set_field_compensated(field)
def _set_signed_currents(self, arg):
"""Sets the currents in the array arg in the respective coils x->y->z.
This function imposes safety limits by clamping the current when beyond the maximum."""
if self.psu1 is None or self.psu2 is None or self.arduino is None:
ui_print("Can't set current, PSUs and Arduino are not connected.")
# One pass for every axis
for i in range(3):
# Check current limits
if abs(arg[i]) <= self.axes[i]['max_amps']:
safe_current = arg[i]
else:
safe_current = self.axes[i]['max_amps']
ui_print("Attempted to exceed current limit on {}".format(g.AXIS_NAMES[i]))
for axis, current in zip(self.axes, arg):
# Talk to hardware
with self.hardware_lock:
# TODO: Check for exceptions
# Set polarity on Arduino
if safe_current < 0:
# Reverse polarity
self.arduino.set_axis_polarity(i, True)
else:
# Positive polarity (default case)
self.arduino.set_axis_polarity(i, False)
try:
axis.set_signed_current(current)
except Exception as e:
ui_print("Error {}: Unexpected error occured:\n{}".format(axis.name, e))
# determine voltage limit to be set on PSU, must be high enough to not limit the current:
# min. 8V, max. max_volts, in-between as needed with current value (+margin to not limit current)
voltage_limit = min(max(1.3 * abs(safe_current) * self.axes[i]['resistance'], 8),
self.axes[i]['max_volts']) # limit voltage
# Helper function to find correct psu and channel to talk to
psu, channel = self._get_psu_for_axis(i)
# Set voltages and currents. Outputs should already be active from initializer.
psu.set_current(channel, safe_current)
psu.set_voltage(channel, voltage_limit)
def _get_psu_for_axis(self, axis_index):
def get_psu_for_axis(self, axis_index):
"""Determine which channel of which psu is required"""
# TODO: This kind of stuff belongs in the config and should not be hardcoded
if axis_index == 0 or axis_index == 1:
psu = self.psu1
channel = psu.valid_channels[axis_index]
channel = self.psu_type.valid_channels()[axis_index]
port = self.com_port_psu1
else:
psu = self.psu2
channel = psu.valid_channels[0]
return psu, channel
channel = self.psu_type.valid_channels()[0]
port = self.com_port_psu2
return psu, channel, port
def shutdown(self):
""" Shuts down the hardware. This special command overrides the currently active proxy.
The object cannot be recovered from this state, but may be re-instantiated."""
def destroy(self):
"""The object cannot be recovered after calling destroy"""
# Send signals to kill threads:
# TODO: Handle timeout behaviour
@@ -270,10 +304,19 @@ class HelmholtzCageDevice:
self.command = None
self.new_command_flag.set() # Causes the thread to unblock
self._cmd_exec_thread.join(timeout=2)
#_hw_poll_thread:
# _hw_poll_thread:
# This thread is stopped just by setting the _stop_flag
self._hw_poll_thread.join(timeout=2)
# Shutdown the hardware
msg = self.shutdown()
messagebox.showinfo("Program ended",
msg) # Show a unified pop-up with how the shutdown on each device went
def shutdown(self):
""" Shuts down the hardware. This special command overrides the currently active proxy."""
# This waiting period is not easily removed without resulting in unexpected behaviour
with self.hardware_lock:
ui_print("\nAttempting to safely shut down all devices. Check equipment to confirm.")
@@ -331,8 +374,138 @@ class HelmholtzCageDevice:
ui_print("Arduino not connected, can't deactivate.")
message += "\nArduino not connected, can't deactivate."
messagebox.showinfo("Program ended",
message) # Show a unified pop-up with how the shutdown on each device went
return message
class Axis:
def __init__(self, axis_idx, cage_dev):
"""
This class is an adapter to axis-specific and non-specific (e.g. psu status) information. Most attributes
are supplied as properties, which query the parent component of that attribute/info. It also wraps device calls.
"""
self.idx = axis_idx
self.cage_dev = cage_dev
self.name = g.AXIS_NAMES[axis_idx] # Also used in some indexing operations, like in the config file
# Create other axis properties using config parameters
self.coil_const = float(config_handling.read_from_config(self.name, "coil_const", config_handling.CONFIG_OBJECT))
self.ambient_field = float(config_handling.read_from_config(self.name, "ambient_field", config_handling.CONFIG_OBJECT))
self.resistance = float(config_handling.read_from_config(self.name, "resistance", config_handling.CONFIG_OBJECT))
self.max_volts = float(config_handling.read_from_config(self.name, "max_volts", config_handling.CONFIG_OBJECT))
self.max_amps = float(config_handling.read_from_config(self.name, "max_amps", config_handling.CONFIG_OBJECT))
# State variables
self.target_current = 0
def set_field_raw(self, field):
self.set_signed_current(field / self.coil_const)
def set_field_compensated(self, field):
self.set_field_raw(field - self.ambient_field)
def set_signed_current(self, current):
"""Sets current on axis"""
# Check current limits
if abs(current) <= self.max_amps:
safe_current = current
else:
safe_current = self.max_amps
ui_print("Warning {}: Attempted to exceed current limit".format(self.name))
# Update state variables to be queried. This should be set even if it is only a "virtual" action with no
# connected devices.
self.target_current = safe_current
if not self.arduino:
ui_print("Warning {}: Cannot set field/current without Arduino".format(self.name))
return
if not self.psu:
ui_print("Warning {}: Cannot set field/current without PSU".format(self.name))
return
# TODO: Check for exceptions
# Set polarity on Arduino
if safe_current < 0:
# Reverse polarity
self.arduino.set_axis_polarity(self.idx, True)
else:
# Positive polarity (default case)
self.arduino.set_axis_polarity(self.idx, False)
# determine voltage limit to be set on PSU, must be high enough to not limit the current:
# min. 8V, max. max_volts, in-between as needed with current value (+margin to not limit current)
voltage_limit = min(max(1.3 * abs(safe_current) * self.resistance, 8), self.max_volts) # limit voltage
# Set voltages and currents. Outputs should already be active from initializer.
self.psu.set_current(self.channel, safe_current)
self.psu.set_voltage(self.channel, voltage_limit)
@property
def arduino(self):
return self.cage_dev.arduino
@property
def com_port(self):
_, _, port = self.cage_dev.get_psu_for_axis(self.idx)
return port
@property
def psu(self):
psu, _, _ = self.cage_dev.get_psu_for_axis(self.idx)
return psu
@property
def channel(self):
_, channel, _ = self.cage_dev.get_psu_for_axis(self.idx)
return channel
@property
def target_field(self):
return self.target_current * self.coil_const + self.ambient_field
@property
def target_field_raw(self):
return self.target_current * self.coil_const
@property
def connected(self):
return self.psu is not None and self.arduino is not None
@property
def polarity(self):
return self.arduino.get_axis_polarity(self.idx)
@property
def max_field(self):
max_field_magnitude = self.max_amps * self.coil_const
return np.array([-max_field_magnitude, max_field_magnitude])
@property
def max_comp_field(self):
max_field_magnitude = self.max_amps * self.coil_const
return np.array([self.ambient_field - max_field_magnitude, self.ambient_field + max_field_magnitude])
def get_status_dict(self):
"""Dict containing all data from this model to pass to the front-end. Some data is only available through
this interface, since it also polls the hardware for current set-points"""
# This is a slow operation, watch out!
if self.psu:
status = self.psu.poll_channel_state(self.channel)
else:
status = {}
if self.arduino:
status['polarity'] = self.polarity
status['connected'] = self.connected
status['port'] = self.com_port
status['channel'] = self.channel
status['target_field_raw'] = self.target_field_raw
status['target_field'] = self.target_field
status['target_current'] = self.target_current
return status
class HelmholtzCageProxy:
@@ -351,4 +524,19 @@ class HelmholtzCageProxy:
self.cage_device.queue_command(self, {'command': 'set_field_compensated', 'arg': vector})
def __del__(self):
self.cage_device.release_proxy(self)
self.cage_device.release_proxy(self)
def value_in_limits(axis, key, value):
"""Check if value is within safe limits (set in globals.py)"""
# axis is string with axis name, e.g. "X-Axis"
# key specifies which value to check, e.g. current
max_value = g.default_arrays[key][1][g.AXIS_NAMES.index(axis)] # get max value from dictionary in globals.py
min_value = g.default_arrays[key][2][g.AXIS_NAMES.index(axis)] # get min value from dictionary in globals.py
if float(value) > float(max_value): # value is too high
return 'HIGH'
elif float(value) < float(min_value): # value is too low
return 'LOW'
else: # value is within limits
return 'OK'
+30 -38
View File
@@ -3,6 +3,7 @@ from abc import ABC, abstractmethod
import serial
from src.ps2000b import PS2000B # Module containing all PS2000B HW functions and classes
from src.utility import ui_print
class PSUDevice(ABC):
@@ -10,9 +11,9 @@ class PSUDevice(ABC):
It can be subclassed to easily support new hardware."""
def __init__(self, com_port):
"""PSUDevice assumes a serial connection. Class must provide a connected state variable"""
"""PSUDevice assumes a serial connection"""
ui_print("\nConnecting to power supply...")
self.com_port = com_port
self.connected = False
@abstractmethod
def enable_channel(self, channel_nr):
@@ -57,7 +58,7 @@ class PSUDevice(ABC):
def idle(self):
"""Zero all outputs but activate channels so commands can be sent."""
for ch in self.valid_channels:
for ch in self.valid_channels():
self.set_current(ch, 0)
self.set_voltage(ch, 0)
self.enable_channel(ch)
@@ -72,9 +73,9 @@ class PSUDevice(ABC):
"""Disconnects the device"""
pass
@property
@staticmethod
@abstractmethod
def valid_channels(self):
def valid_channels():
"""Returns a list containing valid channel numbers"""
pass
@@ -85,17 +86,13 @@ class PSUDevicePS2000B(PSUDevice):
def __init__(self, com_port):
super().__init__(com_port)
try:
self.dev = PS2000B.PS2000B(com_port)
# dev_info is a class which contains hw specific constants, such as nominal voltage and current
self.dev_info = self.dev.get_device_information() # Cache this result
self.connected = True
except serial.SerialException:
self.dev = None
self.connected = False
"""Can fail; Check for serial.SerialException"""
self.dev = PS2000B.PS2000B(com_port)
# dev_info is a class which contains hw specific constants, such as nominal voltage and current
self.dev_info = self.dev.get_device_information() # Cache this result
@property
def valid_channels(self):
@staticmethod
def valid_channels():
# Dependent on PSU, the PS2000B has 2 channels
return [0, 1]
@@ -123,11 +120,11 @@ class PSUDevicePS2000B(PSUDevice):
# Format should match the provided template in abstract PSUDevice class.
return {'active': dev_status.output_active, 'remote_active': dev_status.remote_control_active,
'actual_voltage': voltage, 'limit_voltage': voltage_setp,
'actual_current': current, 'limit_current': current_setp}
'voltage': voltage, 'voltage_setpoint': voltage_setp,
'current': current, 'current_setpoint': current_setp}
def shutdown(self):
for ch in self.valid_channels:
for ch in self.valid_channels():
self.disable_channel(ch)
self.set_current(ch, 0)
self.set_voltage(ch, 0)
@@ -140,27 +137,22 @@ class PSUDeviceQL355TP(PSUDevice):
"""HW interface for QL355TP from AIM-TTi Instruments"""
def __init__(self, com_port):
"""Can fail; Check for serial.SerialException"""
super().__init__(com_port)
try:
self._serial_object = serial.Serial(
port=self.com_port,
baudrate=19200,
timeout=0.5,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
)
self.set_output_range(0, 0) # Put the PSU into the 15V/5A range
self.set_output_range(1, 0)
self.reset_breaker() # Reset the breaker in case we are coming from an unclean state
self._serial_object = serial.Serial(
port=self.com_port,
baudrate=19200,
timeout=0.5,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
)
self.set_output_range(0, 0) # Put the PSU into the 15V/5A range
self.set_output_range(1, 0)
self.reset_breaker() # Reset the breaker in case we are coming from an unclean state
self.connected = True
except serial.SerialException:
self.dev = None
self.connected = False
@property
def valid_channels(self):
@staticmethod
def valid_channels():
# Dependent on PSU, the QL355TP has 2 normal channels. The auxiliary channel is not usable for our purpose
return [1, 2]
@@ -228,7 +220,7 @@ class PSUDeviceQL355TP(PSUDevice):
self._serial_object.write("TRIPRST\n".encode())
def shutdown(self):
for ch in self.valid_channels:
for ch in self.valid_channels():
self.disable_channel(ch)
self.set_current(ch, 0)
self.set_voltage(ch, 0)
+27 -7
View File
@@ -4,7 +4,7 @@ import numpy as np
import src.globals as g
from src.utility import ui_print
import src.cage_func as cage_controls
import src.helmholtz_cage_device as helmholtz_cage_device
# --- Definition of TCP interface ---
#
@@ -56,12 +56,27 @@ class ClientConnectionThread(Thread):
self.client_socket = client_socket
self.client_address = address
# Throws DeviceBusy exception if it couldn't be acquired
try:
self.cage_dev = g.CAGE_DEVICE.request_proxy()
except helmholtz_cage_device.DeviceBusy as e:
self.client_socket.sendall("err".encode('utf-8'))
# Bubble up to connection manager
raise e
self.api_compat = False # Indicates whether the client has a compatible API version
def run(self):
msg = ''
while True:
raw_msg = self.client_socket.recv(2048).decode()
# Check for end of stream
if raw_msg == "":
self.client_socket.close()
# TODO: This should be done explicitly instead of relying on __del__...
self.cage_dev = None
return
# Process message
for char in raw_msg:
if char == '\n':
msg = msg.rstrip() # Some systems will try to send \r characters... looking at you windows O_O
@@ -98,7 +113,7 @@ class ClientConnectionThread(Thread):
z = float(tokens[3])
field_vec = np.array([x, y, z], dtype=np.float32)
# uncompensated
cage_controls.set_field_simple(field_vec)
self.cage_dev.set_field_raw(field_vec)
return "1"
elif tokens[0] == "set_compensated_field":
x = float(tokens[1])
@@ -106,14 +121,14 @@ class ClientConnectionThread(Thread):
z = float(tokens[3])
field_vec = np.array([x, y, z], dtype=np.float32)
# compensated
cage_controls.set_field(field_vec)
self.cage_dev.set_field_compensated(field_vec)
return "1"
elif tokens[0] == "set_coil_currents":
x = float(tokens[1])
y = float(tokens[2])
z = float(tokens[3])
current_vec = np.array([x, y, z], dtype=np.float32)
cage_controls.set_current_vec(current_vec)
self.cage_dev.set_signed_currents(current_vec)
return "1"
else:
# The message given is unknown. The programmer probably did not intend for this, so display an error
@@ -134,9 +149,14 @@ class SocketInterfaceThread(Thread):
def run(self):
while True:
(client_socket, address) = self.server_socket.accept()
new_thread = ClientConnectionThread(client_socket, address)
new_thread.start()
ui_print("Accepted connection from {}".format(address))
try:
new_thread = ClientConnectionThread(client_socket, address)
new_thread.start()
ui_print("Accepted connection from {}".format(address))
except helmholtz_cage_device.DeviceBusy:
ui_print("Denied connection from {}. Device is busy.".format(address))
client_socket.close()
def configure_tcp_port(self):
# Creates and configures the listening port
+110 -68
View File
@@ -3,6 +3,8 @@
# ToDo: optimize layout for smaller screen (like on IRS clean room PC)
# import packages for user interface:
from queue import Queue, Empty
from tkinter import *
from tkinter import ttk
from tkinter import messagebox
@@ -18,11 +20,11 @@ from datetime import datetime
# import other project files:
import src.globals as g
import src.cage_func as func
import src.csv_threading as csv
import src.config_handling as config
import src.csv_logging as log
from src.utility import ui_print
import src.helmholtz_cage_device as helmholtz_cage_device
# define font styles:
HEADER_FONT = ("Arial", 13, "bold")
@@ -245,30 +247,28 @@ class ManualMode(Frame):
# update the labels showing the min/max achievable values
compensate = self.compensate.get() # read out if compensate field checkbox is checked (True or False)
i = 0
for var in self.max_value_vars: # go through the max value labels for each axis
for i, var in enumerate(self.max_value_vars): # go through the max value labels for each axis
if not compensate: # ambient field should not be compensated
field = g.AXES[i].max_field * 1e6 # get max values from the axis object
field = g.CAGE_DEVICE.axes[i].max_field * 1e6 # get max values from the axis object
elif compensate: # ambient field should be compensated
field = g.AXES[i].max_comp_field * 1e6
field = g.CAGE_DEVICE.axes[i].max_comp_field * 1e6
else: # this really should never happen
field = [0, 0]
ui_print("Unexpected value encountered: compensate =", compensate)
messagebox.showerror("Unexpected Value!", ("Unexpected value encountered: compensate =", compensate))
var.set("(%0.1f to %0.1f \u03BCT)" % (field[0], field[1])) # update the label text with the new values
i += 1
def switch_to_current_mode(self): # called when switching to the input current mode
self.compensate_checkbox.config(state=DISABLED) # disable the compensate ambient field checkbox
# update the labels showing the min/max achievable values
i = 0
for var in self.max_value_vars: # go through the max value labels for each axis
var.set("(%0.2f to %0.2f A)" % (-g.AXES[i].max_amps, g.AXES[i].max_amps)) # update the label
i += 1
for i, var in enumerate(self.max_value_vars): # go through the max value labels for each axis
# update the label
var.set("(%0.2f to %0.2f A)" % (-g.CAGE_DEVICE.axes[i].max_amps, g.CAGE_DEVICE.axes[i].max_amps))
def reinitialize(self): # called on "Reinitialize!" button press
func.setup_all() # reinitialize all PSUs and the Arduino
# reinitialize all PSUs and the Arduino
g.CAGE_DEVICE.reconnect_hardware()
# log change to the log file if user has selected event logging in the Configure Logging window
logger = self.controller.pages[ConfigureLogging] # get object of logging configurator
@@ -276,7 +276,7 @@ class ManualMode(Frame):
logger.log_datapoint() # log data
def power_down(self): # called on "power down" button press
func.power_down_all() # power down outputs on all PSUs and the Arduino
g.CAGE_DEVICE.shutdown() # power down outputs on all PSUs and the Arduino
# log change to the log file if user has selected event logging in the Configure Logging window
logger = self.controller.pages[ConfigureLogging] # get object of logging configurator
@@ -297,7 +297,6 @@ class ManualMode(Frame):
else: # no issues while reading entries (user entered correct format)
function_to_call = self.modes[self.input_mode.get()][0] # get function of appropriate mode
function_to_call(vector) # call function (self.execute_field() or self.execute_current())
self.controller.StatusDisplay.update_labels() # update status display after change
# log change to the log file if user has selected event logging in the Configure Logging window
logger = self.controller.pages[ConfigureLogging] # get object of logging configurator
@@ -305,20 +304,34 @@ class ManualMode(Frame):
logger.log_datapoint() # log data
def execute_field(self, vector): # convert magnetic field vector and send to test bench
ui_print("Field executing:", vector, "\u03BCT")
compensate = self.compensate.get() # read out if compensate ambient field checkbox is ticked
if compensate: # ambient field should be compensated
func.set_field(vector * 1e-6) # convert to Tesla and send to test bench
elif not compensate: # ambient field should not be compensated
func.set_field_simple(vector * 1e-6) # convert to Tesla and send to test bench
else: # this really should never happen
ui_print("Unexpected value encountered: compensate =", compensate)
messagebox.showerror("Unexpected Value!", ("Unexpected value encountered: compensate =", compensate))
ui_print("\nField executing:", vector, "\u03BCT")
# Acquire a proxy to the helmholtz cage:
# This can fail if already in use
try:
with g.CAGE_DEVICE as cage_dev:
compensate = self.compensate.get() # read out if compensate ambient field checkbox is ticked
if compensate: # ambient field should be compensated
cage_dev.set_field_compensated(vector * 1e-6) # convert to Tesla and send to test bench
pass
elif not compensate: # ambient field should not be compensated
cage_dev.set_field_raw(vector * 1e-6) # convert to Tesla and send to test bench
pass
else: # this really should never happen
ui_print("Unexpected value encountered: compensate =", compensate)
messagebox.showerror("Unexpected Value!", ("Unexpected value encountered: compensate =", compensate))
except helmholtz_cage_device.DeviceBusy:
ui_print("Error: Could not acquire control. Is the HW already in use?")
@staticmethod
def execute_current(vector): # send current vector to the test bench
ui_print("Current executing:", vector, "A")
func.set_current_vec(vector) # command test bench
ui_print("\nCurrent executing:", vector, "A")
with g.CAGE_DEVICE as cage_dev:
# This can fail if already in use
if cage_dev is None:
ui_print("Error: Could not acquire control. Is the HW already in use?")
return
cage_dev.set_signed_currents(vector) # command test bench
class ExecuteCSVMode(Frame):
@@ -552,8 +565,8 @@ class HardwareConfiguration(Frame):
# text for the description labels:
entry_texts = ["XY PSU Serial Port:", "Z PSU Serial Port:"]
# create variables to store the port names and set to current names
self.XY_port = StringVar(value=g.XY_PORT)
self.Z_port = StringVar(value=g.Z_PORT)
self.XY_port = StringVar(value=g.CAGE_DEVICE.com_port_psu1)
self.Z_port = StringVar(value=g.CAGE_DEVICE.com_port_psu2)
port_vars = [self.XY_port, self.Z_port] # list to store both port variables
row = 0
for text in entry_texts: # do this for both ports
@@ -596,7 +609,7 @@ class HardwareConfiguration(Frame):
# Fill in header (axis names):
col = 1
for text in ["X-Axis", "Y-Axis", "Z-Axis"]:
for text in g.AXIS_NAMES:
label = Label(value_frame, text=text, font=SUB_HEADER_FONT)
label.grid(row=0, column=col, sticky="ew")
col += 1
@@ -649,16 +662,16 @@ class HardwareConfiguration(Frame):
def restore_defaults(self): # restore default settings
config.reset_config_to_default() # overwrite config file with default
ui_print("\nReinitializing devices...")
func.setup_all() # setup everything with the defaults
g.CAGE_DEVICE.reconnect_hardware() # setup everything with the defaults
self.update_fields() # update fields in config window
def update_fields(self): # set current values for all entry variables from config file
# set values for PSU serial ports:
self.XY_port.set(g.XY_PORT)
self.Z_port.set(g.Z_PORT)
self.XY_port.set(g.CAGE_DEVICE.com_port_psu1)
self.Z_port.set(g.CAGE_DEVICE.com_port_psu2)
for key in self.entries.keys(): # go through the main value table
for i in [0, 1, 2]: # go through all three axes
for i in range(3): # go through all three axes
# get value from config file:
value = config.read_from_config(g.AXIS_NAMES[i], self.entries[key][3], config.CONFIG_OBJECT)
self.entries[key][0][i].set(value) # set initial value on the entry field variable
@@ -667,7 +680,7 @@ class HardwareConfiguration(Frame):
self.entries[key][0][i].set(round(type_value * factor, 3)) # set value with correct unit conversion
# check if value is within safe limits:
value_check = func.value_in_limits(g.AXIS_NAMES[i], self.entries[key][3], value)
value_check = helmholtz_cage_device.value_in_limits(g.AXIS_NAMES[i], self.entries[key][3], value)
if value_check == 'OK': # value is acceptable
self.fields[key][i].config(background="White") # set colour of this entry to white
else: # value exceeds limits
@@ -676,8 +689,8 @@ class HardwareConfiguration(Frame):
def write_values(self): # update config object with user inputs into entry fields and reinitialize devices
# set serial ports for PSUs:
config.edit_config("PORTS", "xy_port", self.XY_port.get())
config.edit_config("PORTS", "z_port", self.Z_port.get())
config.edit_config("Supplies", "xy_port", self.XY_port.get())
config.edit_config("Supplies", "z_port", self.Z_port.get())
# set numeric values for all axes
for key in self.entries.keys(): # go through rows of entry table
@@ -695,7 +708,7 @@ class HardwareConfiguration(Frame):
# Check if value is within safe limits
config_key = self.entries[key][3] # get handle by which value is indexed in config file
value_ok = func.value_in_limits(g.AXIS_NAMES[i], config_key, value) # perform value check
value_ok = helmholtz_cage_device.value_in_limits(g.AXIS_NAMES[i], config_key, value) # perform value check
if value_ok == 'OK': # value is within safe limits
config.edit_config(g.AXIS_NAMES[i], config_key, value) # write new value to config file
@@ -723,7 +736,7 @@ class HardwareConfiguration(Frame):
message = "Unknown case for value limits check, this should not happen."
# display pop-up message to ask user if he really wants the value
answer = messagebox.askquestion("Value out of Bounds", message)
answer = messagebox.askquestion("Value out of bounds", message)
# answer becomes 'yes' or 'no' depending on user choice
if answer == 'yes': # user really wants the value
# call function to write new value to config file with override=True
@@ -733,7 +746,7 @@ class HardwareConfiguration(Frame):
def implement(self): # "Update and Reinitialize" button, update config with new values and reinitialize devices
self.write_values() # write current values from entry fields to config object
ui_print("\nReinitializing devices...")
func.setup_all() # reinitialize devices and program with new values
g.CAGE_DEVICE.reconnect_hardware() # reinitialize devices and program with new values
self.update_fields() # update entry fields to show new values
def load_config(self): # load configuration from some config file
@@ -747,7 +760,7 @@ class HardwareConfiguration(Frame):
config.check_config(config.CONFIG_OBJECT) # check and display warnings if values are out of bounds
ui_print("\nReinitializing devices...")
func.setup_all() # reinitialize devices and program with new values
g.CAGE_DEVICE.reconnect_hardware() # reinitialize devices and program with new values
self.update_fields() # update entry fields to show new values
elif filename == '': # this happens when file selection window is closed without selecting a file
ui_print("No file selected, could not load config.")
@@ -768,14 +781,14 @@ class HardwareConfiguration(Frame):
self.write_values() # write current entry field values to the config object
config.write_config_to_file(config.CONFIG_OBJECT) # write contents of config object to file
ui_print("\nReinitializing devices...")
func.setup_all() # reinitialize devices and program with new values
g.CAGE_DEVICE.reconnect_hardware() # reinitialize devices and program with new values
self.update_fields() # update entry fields to show values as they are in the config
def save_config(self): # same as save_config_as() but with the current config file
self.write_values() # write current entry field values to the config object
config.write_config_to_file(config.CONFIG_OBJECT) # write contents of config object to file
ui_print("\nReinitializing devices...")
func.setup_all() # reinitialize devices and program with new values
g.CAGE_DEVICE.reconnect_hardware() # reinitialize devices and program with new values
self.update_fields() # update entry fields to show values as they are in the config
@@ -1026,6 +1039,10 @@ class StatusDisplay(Frame):
# noinspection PyUnusedLocal
def __init__(self, parent, controller):
Frame.__init__(self, parent, relief=SUNKEN, bd=1)
self.controller = controller
# Queue to store status updates which arrive from another thread by callback
self.update_label_queue = Queue()
# configure Tkinter grid
self.grid_rowconfigure(ALL, weight=1)
@@ -1075,38 +1092,63 @@ class StatusDisplay(Frame):
LabelCol[row].grid(row=row + rowCounter, column=col, sticky="nsew") # place label
col += 1
self.update_labels() # fill in all values
# Register callback to populate new data:
g.CAGE_DEVICE.subscribe_status_updates(self.enqueue_new_status)
# Starts polling loop for status display
self.update_label_poll_method()
def continuous_label_update(self, controller, interval): # update display values in regular intervals (ms)
if not g.exitFlag: # application ist still running
self.update_labels() # update the label values
# call function again after time interval:
controller.after(interval, lambda: self.continuous_label_update(controller, interval))
def update_labels(self, status): # update all values in the status display
for i in range(3): # go through all three axes
# update all label variables with the new values:
axis = status['axes'][i]
if axis['connected']:
# Deal with variables that are dependent on the current hardware state
active = "True" if axis['active'] else "False"
remote_active = "True" if axis['active'] else "False"
voltage_setpoint = "%0.3f V" % axis['voltage_setpoint']
voltage = "%0.3f V" % axis['voltage']
current_setpoint = "%0.3f A" % axis['current_setpoint']
current = "%0.3f A" % axis['current']
polarity = axis['polarity']
else:
active = "N/A"
remote_active = "N/A"
voltage_setpoint = "N/A"
voltage = "N/A"
current_setpoint = "N/A"
current = "N/A"
polarity = "N/A"
psu_connected = "Connected" if axis['connected'] else "Not Connected"
arduino_connected = "Connected" if status['arduino_connected'] else "Not Connected"
def update_labels(self): # update all values in the status display
g.ARDUINO.update_status_info() # get latest status info from arduino
i = 0
for axis in g.AXES: # go through all three axes
if axis.device is not None: # there is a PSU for this axis connected
axis.update_status_info() # get latest status info from PSU (Takes very long...)
# update all label variables with current values:
# ToDo (optional): Use the central dictionary currently defined in csv_logging.py for this
self.label_dict["PSU Serial Port:"][i].set(g.PORTS[i])
self.label_dict["PSU Channel:"][i].set(axis.channel)
self.label_dict["PSU Status:"][i].set(axis.connected)
self.label_dict["Arduino Status:"][i].set(g.ARDUINO.connected) # ToDo (optional): make this multicolumn
self.label_dict["Output:"][i].set(axis.output_active)
self.label_dict["Remote Control:"][i].set(axis.remote_ctrl_active)
self.label_dict["Voltage Setpoint:"][i].set("%0.3f V" % axis.voltage_setpoint)
self.label_dict["Actual Voltage:"][i].set("%0.3f V" % axis.voltage)
self.label_dict["Current Setpoint:"][i].set("%0.3f A" % axis.current_setpoint)
self.label_dict["Actual Current:"][i].set("%0.3f A" % axis.current)
self.label_dict["Target Field:"][i].set("%0.3f \u03BCT" % (axis.target_field * 1e6))
self.label_dict["Trgt. Field Raw:"][i].set("%0.3f \u03BCT" % (axis.target_field_comp * 1e6))
self.label_dict["Target Current:"][i].set("%0.3f A" % axis.target_current)
self.label_dict["Inverted:"][i].set(axis.polarity_switched)
i += 1
self.label_dict["PSU Serial Port:"][i].set(axis['port'])
self.label_dict["PSU Channel:"][i].set(axis['channel'])
self.label_dict["PSU Status:"][i].set(psu_connected)
self.label_dict["Arduino Status:"][i].set(arduino_connected) # ToDo (optional): make this multicolumn
self.label_dict["Output:"][i].set(active)
self.label_dict["Remote Control:"][i].set(remote_active)
self.label_dict["Voltage Setpoint:"][i].set(voltage_setpoint)
self.label_dict["Actual Voltage:"][i].set(voltage)
self.label_dict["Current Setpoint:"][i].set(current_setpoint)
self.label_dict["Actual Current:"][i].set(current)
self.label_dict["Target Field:"][i].set("%0.3f \u03BCT" % (axis['target_field'] * 1e6))
self.label_dict["Trgt. Field Raw:"][i].set("%0.3f \u03BCT" % (axis['target_field_raw'] * 1e6))
self.label_dict["Target Current:"][i].set("%0.3f A" % axis['target_current'])
self.label_dict["Inverted:"][i].set(polarity)
def enqueue_new_status(self, status):
"""Runs in caller thread and places status onto queue to display when polled in update_label_poll_method"""
self.update_label_queue.put(status)
def update_label_poll_method(self):
"""Infinite loop to poll for status updates to display"""
try:
new_status = self.update_label_queue.get(block=False) # Blocks until new data is available.
self.update_labels(new_status)
except Empty:
pass
self.controller.after(200, self.update_label_poll_method)
class OutputConsole(Frame):
# console to print information to user in, similar to standard python output
+1 -3
View File
@@ -5,9 +5,7 @@ import src.globals as g
def ui_print(*content):
"""prints text to built-in console, use exactly like normal print(). Requires the ui to be initialized"""
output = "" # initialize output as empty string
for text in content: # go through all elements to be printed
output = " ".join((output, str(text))) # add element to the output string
output = " ".join([str(c) for c in content])
if not g.exitFlag and g.app is not None: # application is still running --> output window is visible
output = "".join(("\n", output)) # begin new line each time