Files
Helmholtz_Test_Bench/src/helmholtz_cage_device.py
T

580 lines
24 KiB
Python

import time
import traceback
from threading import RLock, Thread, Event
from tkinter import messagebox
import numpy as np
from src.arduino_device import ArduinoDevice
from src.psu_device import PSUDevicePS2000B, PSUDeviceQL355TP
from src.utility import ui_print
from src.exceptions import DeviceBusy, ProxyNotOwnedException
import src.config_handling as config_handling
import src.globals as g
class HelmholtzCageDevice:
"""This is the central object for controlling all the test bench related HW. This way, access can be
synchronized and exclusive to a single controller at once. This device always exists, irrespective of
which devices are actually connected. Only the request_proxy, shutdown and destroy methods should be used !!!
Provides subscriber interface for periodic status information.
Provides proxy model to control access."""
# This class is thread safe
POLLING_INTERVAL = 1 # Seconds between polling the device state
def __init__(self):
# Indicates all the threads should be joined
self._stop_flag = Event()
# --- POLLING SUBSCRIBERS ---
# This is a list of object callbacks interested in receiving device status updates.
# This will primarily include the front-end which wants to update its display data.
# The callback functions should accept a dict as an argument of the form {'arduino':, 'axes':[{}, {}, {}]}
self._subscribers = []
# --- COMMAND QUEUEING ---
self.command_lock = RLock()
# Indicates to the command executing thread that a new command has arrived for execution
self.new_command_flag = Event()
# Contains the next command to be executed
self.command = None
# --- PROXY MODEL SETUP ---
# Locks all proxy requesting and releasing access
self.proxy_lock = RLock()
# Contains the id of the current in control proxy
self.proxy_id = None
# Contains the current context manager proxy. All other proxy instances are managed externally
self.context_manager_proxy = None
# --- STATUS VARIABLES ---
# These are used to store information about our last command to provide status updates asynchronously
self.target_field = [0, 0, 0]
self.target_field_raw = [0, 0, 0]
self.target_current = [0, 0, 0]
# --- DEVICE SETUP ---
# Must be acquired to access any hardware
self.hardware_lock = RLock()
# Com ports
self.com_port_psu1 = None
self.com_port_psu2 = None
# PSU object used
self.psu_type = None
# Hardware object variables
self.arduino = None
self.psu1 = None
self.psu2 = None
# --- AXIS CONFIGURATION ---
# This is also hardware related, but in a separate section to keep it clean.
# Get all settings from the config file
self.axes = []
# Loop over axes
for i in range(3):
# The axes talks to the HW objects (Arduino, PSU) referenced in this object
self.axes.append(Axis(i, self))
# --- HW COMMUNICATION THREAD ---
self._cmd_exec_thread = Thread(target=self._cmd_exec_thread_method)
self._cmd_exec_thread.start()
self._hw_poll_thread = Thread(target=self._hw_poll_thread_method)
self._hw_poll_thread.start()
# TODO: Move to proxy
def reconnect_hardware(self):
with self.hardware_lock:
self.shutdown()
self.connect_hardware()
# TODO: Move to proxy
def connect_hardware(self):
"""Connects devices. Does not check if they are already connected: Remember to call shutdown first"""
with self.hardware_lock:
# All devices are usable if the object exists. None indicates
# the device is not connected/not working properly
# Arduino setup
try:
self.arduino = ArduinoDevice()
except Exception as e:
self.arduino = None
# show error messages to alert user
ui_print("Arduino setup failed:", e)
# PSU setup
self.com_port_psu1 = config_handling.read_from_config("Supplies", "xy_port", config_handling.CONFIG_OBJECT)
self.com_port_psu2 = config_handling.read_from_config("Supplies", "z_port", config_handling.CONFIG_OBJECT)
psu_type_string = config_handling.read_from_config("Supplies", "supply_model",
config_handling.CONFIG_OBJECT)
if psu_type_string == "ps2000b":
self.psu_type = PSUDevicePS2000B
elif psu_type_string == "ql355tp":
self.psu_type = PSUDeviceQL355TP
else:
raise Exception("Invalid psu model: {}".format(self.psu_type))
# psu1: controls xy axis
try:
self.psu1 = self.psu_type(self.com_port_psu1)
except Exception as e:
self.psu1 = None
ui_print("Error creating PSU device:\n{}".format(e))
# psu2: controls z axis
try:
self.psu2 = self.psu_type(self.com_port_psu2)
except Exception as e:
self.psu2 = None
ui_print("Error creating PSU device:\n{}".format(e))
# The axes may not be deleted, so a special method for reinitialization is provided.
for axis in self.axes:
axis.reload_config()
# Zero and activate channels. This is a sort of "armed" state so that we can send commands later
self.idle()
def reconnect_hardware_async(self):
"""Disconnects and reconnects devices in a non-blocking call.
Acquires hardware lock and blocks other cage operations."""
connect_hardware_thread = Thread(target=self.reconnect_hardware)
connect_hardware_thread.start()
def idle(self):
""" Zero and activate channels """
if self.psu1 is not None:
self.psu1.idle()
if self.psu2 is not None:
self.psu2.idle()
if self.arduino is not None:
self.arduino.idle()
# Since these actions are not handled by the axes objects, also make sure to update their target field status
for axis in self.axes:
axis.target_current = 0
def request_proxy(self):
"""Returns a new HelmholtzCageProxy or None, depending on if access is available"""
with self.proxy_lock:
if not self.proxy_id:
# The interface is available, return a new proxy object
new_proxy = HelmholtzCageProxy(self)
self.proxy_id = id(new_proxy)
return new_proxy
else:
# The interface is occupied, the caller must tolerate that the request failed.
raise DeviceBusy
def __enter__(self):
"""Enables: with g.CAGE_DEVICE as dev:"""
self.context_manager_proxy = self.request_proxy()
return self.context_manager_proxy
def release_proxy(self, proxy_obj):
"""Releases the proxy to free access for other controllers. Should only be called when proxy is destroyed"""
if self.proxy_valid(proxy_obj):
# This only frees the interface if it really was the active proxy
self.proxy_id = None
# Otherwise do nothing, this case requires no behaviour
def __exit__(self, *args):
"""Enables: with g.CAGE_DEVICE as dev:"""
self.release_proxy(self.context_manager_proxy)
def proxy_valid(self, proxy_obj):
"""Returns True if the proxy currently owns the device."""
with self.proxy_lock:
return id(proxy_obj) == self.proxy_id
def subscribe_status_updates(self, callback):
# List containing all interested subscribers.
# We won't check if a callback is added twice. Not our responsibility
self._subscribers.append(callback)
def queue_command(self, proxy_obj, command):
""" Queues a dict for immediate execution containing the command for the cage as a whole.
Since the newest command should always be run, it is not a real queue (just a variable)"""
with self.proxy_lock:
if id(proxy_obj) != self.proxy_id:
raise ProxyNotOwnedException()
with self.command_lock:
# Overwrite any command that was queued but not yet executed. We now only care about the newer command
self.command = command
self.new_command_flag.set()
def _cmd_exec_thread_method(self):
"""This method forms the main thread for hardware command execution."""
while not self._stop_flag.is_set():
self.new_command_flag.wait()
self.new_command_flag.clear()
# Avoid blocking the buffer while we are executing.
with self.command_lock:
# Dicts and lists are mutable so must be (deep)copied
command_buffer = HelmholtzCageDevice._copy_command(self.command)
self.command = None # Processed commands are removed from "buffer"
if command_buffer:
# Unpack command into "action" and argument
command_string = command_buffer['command']
command_arg = command_buffer['arg']
# Check which command and delegate to responsible function
if command_string == "set_signed_currents":
self._set_signed_currents(command_arg)
elif command_string == "set_field_raw":
self._set_field_raw(command_arg)
elif command_string == "set_field_compensated":
self._set_field_compensated(command_arg)
elif command_string == 'idle':
self.idle()
else:
raise Exception("Command unknown!")
def _hw_poll_thread_method(self):
"""This method forms the main thread for hardware command execution."""
while True:
# We will have to check if we passed this statement due to a stop flag or due to the polling interval
stop_flag_set = self._stop_flag.wait(timeout=self.POLLING_INTERVAL)
if stop_flag_set:
return
ard_conn = self.arduino is not None
status_data = {'axes': [],
'arduino_connected': ard_conn}
with self.hardware_lock:
# This polls all three axes at once
for axis in self.axes:
status_data['axes'].append(axis.get_status_dict())
# Distribute status data to all interested subscribers
for subscriber in self._subscribers:
subscriber(status_data)
@staticmethod
def _copy_command(command):
# PyCharm has an issue with the deepcopy tool, so just handle the copying manually.
if command is None:
return command
try:
return {'command': command['command'],
'arg': command['arg'].copy()}
except AttributeError:
# AttributeError: '---' object has no attribute 'copy'
# Should be immutable. Otherwise we have a problem
return {'command': command['command'],
'arg': command['arg']}
def _set_field_raw(self, arg):
for axis, field in zip(self.axes, arg):
with self.hardware_lock:
axis.set_field_raw(field)
def _set_field_compensated(self, arg):
for axis, field in zip(self.axes, arg):
with self.hardware_lock:
axis.set_field_compensated(field)
def _set_signed_currents(self, arg):
"""Sets the currents in the array arg in the respective coils x->y->z.
This function imposes safety limits by clamping the current when beyond the maximum."""
# One pass for every axis
for axis, current in zip(self.axes, arg):
# Talk to hardware
with self.hardware_lock:
try:
axis.set_signed_current(current)
except Exception as e:
ui_print("Error {}: Unexpected error occured:\n{}".format(axis.name, e))
traceback.print_exc()
def get_psu_for_axis(self, axis_index):
"""Determine which channel of which psu is required"""
# TODO: This kind of stuff belongs in the config and should not be hardcoded
if axis_index == 0 or axis_index == 1:
psu = self.psu1
channel = self.psu_type.valid_channels()[axis_index]
port = self.com_port_psu1
else:
psu = self.psu2
channel = self.psu_type.valid_channels()[0]
port = self.com_port_psu2
return psu, channel, port
def destroy(self):
"""The object cannot be recovered after calling destroy"""
# Send signals to kill threads:
# TODO: Handle timeout behaviour
self._stop_flag.set()
# _cmd_exec_thread:
with self.command_lock:
self.command = None
self.new_command_flag.set() # Causes the thread to unblock
self._cmd_exec_thread.join(timeout=2)
# _hw_poll_thread:
# This thread is stopped just by setting the _stop_flag
self._hw_poll_thread.join(timeout=2)
# Shutdown the hardware
msg = self.shutdown()
messagebox.showinfo("Program ended",
msg) # Show a unified pop-up with how the shutdown on each device went
def shutdown(self):
""" Shuts down the hardware. This special command overrides the currently active proxy."""
# This waiting period is not easily removed without resulting in unexpected behaviour
with self.hardware_lock:
ui_print("\nAttempting to safely shut down all devices. Check equipment to confirm.")
# start writing string to later show how shutdown on all devices went in a single info pop-up:
message = "Tried to shut down all devices. Check equipment to confirm."
# Shutdown XY PSU
if self.psu1 is not None:
try:
self.psu1.shutdown()
self.psu1.destroy()
self.psu1 = None
except Exception as e:
ui_print("Error while deactivating XY PSU:", e) # print the problem in the console
message += "\nError while deactivating XY PSU: %s" % e # append status to the message to show later
else: # device was successfully deactivated
ui_print("XY PSU deactivated.")
message += "\nXY PSU deactivated." # append message to show later
else: # the device was not connected before
# tell user there was no need/no possibility to deactivate:
ui_print("XY PSU not connected, can't deactivate.")
message += "\nXY PSU not connected, can't deactivate."
# Shutdown Z PSU
if self.psu2 is not None:
try:
self.psu2.shutdown()
self.psu2.destroy()
self.psu2 = None
except Exception as e:
ui_print("Error while deactivating Z PSU:", e) # print the problem in the console
message += "\nError while deactivating Z PSU: %s" % e # append status to the message to show later
else: # device was successfully deactivated
ui_print("Z PSU deactivated.")
message += "\nZ PSU deactivated." # append message to show later
else: # the device was not connected before
# tell user there was no need/no possibility to deactivate:
ui_print("Z PSU not connected, can't deactivate.")
message += "\nZ PSU not connected, can't deactivate."
# Shut down arduino:
if self.arduino is not None:
try:
self.arduino.shutdown()
self.arduino.close()
self.arduino = None
except Exception as e:
ui_print("Error while deactivating Arduino:", e) # print the problem in the console
message += "\nError while deactivating Arduino: %s" % e # append status to the message to show later
else: # device was successfully deactivated
ui_print("Arduino deactivated.")
message += "\nArduino deactivated." # append message to show later
else: # the device was not connected before
# tell user there was no need/no possibility to deactivate:
ui_print("Arduino not connected, can't deactivate.")
message += "\nArduino not connected, can't deactivate."
return message
class Axis:
def __init__(self, axis_idx, cage_dev):
"""
This class is an adapter to axis-specific and non-specific (e.g. psu status) information. Most attributes
are supplied as properties, which query the parent component of that attribute/info. It also wraps device calls.
"""
self.idx = axis_idx
self.cage_dev = cage_dev
self.name = g.AXIS_NAMES[axis_idx] # Also used in some indexing operations, like in the config file
# Create other axis properties using config parameters
self.coil_const = None
self.ambient_field = None
self.resistance = None
self.max_volts = None
self.max_amps = None
self.reload_config()
# State variables
self.target_current = 0
self.polarity = False
def reload_config(self):
self.coil_const = float(config_handling.read_from_config(self.name, "coil_const", config_handling.CONFIG_OBJECT))
self.ambient_field = float(config_handling.read_from_config(self.name, "ambient_field", config_handling.CONFIG_OBJECT))
self.resistance = float(config_handling.read_from_config(self.name, "resistance", config_handling.CONFIG_OBJECT))
self.max_volts = float(config_handling.read_from_config(self.name, "max_volts", config_handling.CONFIG_OBJECT))
self.max_amps = float(config_handling.read_from_config(self.name, "max_amps", config_handling.CONFIG_OBJECT))
def set_field_raw(self, field):
self.set_signed_current(field / self.coil_const)
def set_field_compensated(self, field):
self.set_field_raw(field - self.ambient_field)
def set_signed_current(self, current):
"""Sets current on axis"""
# Check current limits
if abs(current) <= self.max_amps:
safe_current = current
else:
safe_current = self.max_amps
ui_print("Warning {}: Attempted to exceed current limit".format(self.name))
# Update state variables to be queried. This should be set even if it is only a "virtual" action with no
# connected devices.
self.target_current = safe_current
if not self.arduino:
ui_print("Warning {}: Cannot set field/current without Arduino".format(self.name))
return
if not self.psu:
ui_print("Warning {}: Cannot set field/current without PSU".format(self.name))
return
# TODO: Check for exceptions
# Set polarity on Arduino
if safe_current < 0:
# Reverse polarity
self.polarity = True # Track the state
self.arduino.set_axis_polarity(self.idx, True)
else:
# Positive polarity (default case)
self.polarity = False # Track the state
self.arduino.set_axis_polarity(self.idx, False)
# determine voltage limit to be set on PSU, must be high enough to not limit the current:
# min. 8V, max. max_volts, in-between as needed with current value (+margin to not limit current)
voltage_limit = min(max(1.3 * abs(safe_current) * self.resistance, 8), self.max_volts) # limit voltage
# Set voltages and currents. Outputs should already be active from initializer.
self.psu.set_current(self.channel, abs(safe_current))
self.psu.set_voltage(self.channel, voltage_limit)
@property
def arduino(self):
return self.cage_dev.arduino
@property
def com_port(self):
_, _, port = self.cage_dev.get_psu_for_axis(self.idx)
return port
@property
def psu(self):
psu, _, _ = self.cage_dev.get_psu_for_axis(self.idx)
return psu
@property
def channel(self):
_, channel, _ = self.cage_dev.get_psu_for_axis(self.idx)
return channel
@property
def target_field(self):
return self.target_current * self.coil_const + self.ambient_field
@property
def target_field_raw(self):
return self.target_current * self.coil_const
@property
def connected(self):
return self.psu is not None and self.arduino is not None
@property
def max_field(self):
max_field_magnitude = self.max_amps * self.coil_const
return np.array([-max_field_magnitude, max_field_magnitude])
@property
def max_comp_field(self):
max_field_magnitude = self.max_amps * self.coil_const
return np.array([self.ambient_field - max_field_magnitude, self.ambient_field + max_field_magnitude])
@property
def arduino_connected(self):
return self.arduino is not None
@property
def psu_connected(self):
return self.psu is not None
def get_status_dict(self):
"""Dict containing all data from this model to pass to the front-end. Some data is only available through
this interface, since it also polls the hardware for current set-points"""
# This is a slow operation, watch out!
if self.psu:
status = self.psu.poll_channel_state(self.channel)
else:
status = {}
if self.arduino:
status['polarity'] = self.polarity
status['connected'] = self.connected
status['port'] = self.com_port
status['channel'] = self.channel
status['target_field_raw'] = self.target_field_raw
status['target_field'] = self.target_field
status['target_current'] = self.target_current
return status
class HelmholtzCageProxy:
""" Proxy for the HelmholtzCageDevice.
This is the only way the application should communicate with the HelmholtzCageDevice object"""
def __init__(self, cage_device):
self.cage_device = cage_device
def set_signed_currents(self, vector):
self.cage_device.queue_command(self, {'command': 'set_signed_currents', 'arg': vector})
def set_field_raw(self, vector):
self.cage_device.queue_command(self, {'command': 'set_field_raw', 'arg': vector})
def set_field_compensated(self, vector):
self.cage_device.queue_command(self, {'command': 'set_field_compensated', 'arg': vector})
def idle(self):
"""Puts the helmholtz cage into an idle state with zeroed fields"""
self.cage_device.queue_command(self, {'command': 'idle', 'arg': None})
def close(self):
self.cage_device.release_proxy(self)
def __del__(self):
# This is a fallback method and should not be relied on. Call 'close' manually
if self.cage_device.proxy_valid(self):
self.cage_device.release_proxy(self)
ui_print("Warning: Proxy implicitly released. Use close() instead.")
def value_in_limits(axis, key, value):
"""Check if value is within safe limits (set in globals.py)"""
# axis is string with axis name, e.g. "X-Axis"
# key specifies which value to check, e.g. current
max_value = g.default_arrays[key][1][g.AXIS_NAMES.index(axis)] # get max value from dictionary in globals.py
min_value = g.default_arrays[key][2][g.AXIS_NAMES.index(axis)] # get min value from dictionary in globals.py
if float(value) > float(max_value): # value is too high
return 'HIGH'
elif float(value) < float(min_value): # value is too low
return 'LOW'
else: # value is within limits
return 'OK'