Rewrite partially completed. Not runnable.

This commit is contained in:
2021-07-28 10:09:07 +02:00
parent 38f3793c27
commit fa6d50d04c
5 changed files with 434 additions and 13 deletions
+41
View File
@@ -0,0 +1,41 @@
from src.arduino import Arduino
import src.config_handling as config
from src.user_interface import ui_print
import src.globals as g
class ArduinoDevice(Arduino):
""" Main class to control the electronics box (which means commanding the arduino inside).
Inherits from the Arduino library. All axis indices are go from 0-2 for x,y and z respectively"""
def __init__(self):
self.pins = [0, 0, 0] # initialize list with pins to switch relay of each axis
for i in range(3): # get correct pins from the config
self.pins[i] = int(config.read_from_config(g.AXIS_NAMES[i], "relay_pin", config.CONFIG_OBJECT))
ui_print("\nConnecting to Arduino...")
# try to set up the arduino. Exceptions are handled by caller
# search for connected arduino and connect by initializing arduino library class
Arduino.__init__(self, timeout=0.2)
for pin in self.pins:
self.pinMode(pin, "Output")
self.digitalWrite(pin, "LOW")
ui_print("Arduino ready.")
def set_axis_polarity(self, axis_index, reverse):
"""Sets the polarity of the axis (indexed with 0-2). True is reverse polarity"""
if reverse:
self.digitalWrite(self.pins[axis_index], "HIGH")
else:
self.digitalWrite(self.pins[axis_index], "LOW")
def get_axis_polarity(self):
"""Returns a list with one bool per axis indicating whether the polarity is reversed (True)."""
status = []
for pin in self.pins: # go through all three pins/axes
status.append(self.digitalRead(pin)) # pin is HIGH --> relay is switched
def shutdown(self):
"""Sets relay switching pins to low to de-power most of the electronics box"""
for pin in self.pins:
self.digitalWrite(pin, "LOW")
+1 -1
View File
@@ -171,7 +171,7 @@ class ArduinoCtrl(Arduino):
ui_print("\nConnecting to Arduino...")
try: # try to set up the arduino
Arduino.__init__(self) # search for connected arduino and connect by initializing arduino library class
Arduino.__init__(self, timeout=0.2) # search for connected arduino and connect by initializing arduino library class
for pin in self.pins:
self.pinMode(pin, "Output")
self.digitalWrite(pin, "LOW")
+6 -12
View File
@@ -3,20 +3,14 @@
import numpy as np
XY_DEVICE = None # XY PSU object will be stored here (class PS2000B)
Z_DEVICE = None # Z PSU object will be stored here (class PS2000B)
ARDUINO = None # Arduino object will be stored here (class ArduinoCtrl)
# Main Tkinter application object will be stored here (class HelmholtzGUI)
app = None
# Axis objects will be stored here (class Axis)
X_AXIS = None
Y_AXIS = None
Z_AXIS = None
# The main access point for all hardware commands
CAGE_DEVICE = None
AXES = None # list containing [X_AXIS, Y_AXIS, Z_AXIS]
app = None # Main Tkinter application object will be stored here (class HelmholtzGUI)
AXIS_NAMES = ["X-Axis", "Y-Axis", "Z-Axis"] # list with the names of each axis, used mainly for printing functions
# list with the names of each axis, used mainly for printing functions
AXIS_NAMES = ["X-Axis", "Y-Axis", "Z-Axis"]
global XY_PORT # serial port for XY PSU will be stored here (string)
global Z_PORT # serial port for Z PSU will be stored here (string)
+156
View File
@@ -0,0 +1,156 @@
import traceback
from threading import RLock
from tkinter import messagebox
from src.arduino_device import ArduinoDevice
from src.psu_device import PSUDevicePS2000B, PSUDeviceQL355TP
from src.user_interface import ui_print
import src.config_handling as config
import src.globals as g
class HelmholtzCageDevice:
"""This is the central object for controlling all the test bench related HW. This way, access can be
synchronized and exclusive to a single controller at once. This device always exists, irrespective of
which devices are actually connected. Only the request_proxy and shutdown method should be used !!!
Provides subscriber interface for periodic status information.
Provides proxy model to control access."""
# This class is thread safe
def __init__(self):
# --- PROXY MODEL SETUP ---
# Locks all proxy requesting and releasing access
self.proxy_lock = RLock()
# Contains the id of the current in control proxy
self.proxy_id = None
# --- DEVICE SETUP ---
# All devices are usable if the object exists. None indicates the device is not connected/not working properly
# Arduino setup
try:
self.arduino = ArduinoDevice()
except Exception as e:
self.arduino = None
# show error messages to alert user
ui_print("Arduino setup failed:", e)
ui_print(traceback.print_exc())
messagebox.showerror("Error!", "Arduino setup failed:\n%s \nCheck traceback in console." % e)
# PSU setup
self.com_port_psu1 = config.read_from_config("Supplies", "xy_port", config.CONFIG_OBJECT)
self.com_port_psu2 = config.read_from_config("Supplies", "z_port", config.CONFIG_OBJECT)
self.psu_type = config.read_from_config("Supplies", "supply_model", config.CONFIG_OBJECT)
# psu1: controls xy axis
try:
if self.psu_type == "ps2000b":
self.psu1 = PSUDevicePS2000B(self.com_port_psu1)
elif self.psu_type == "ql355tp":
self.psu1 = PSUDeviceQL355TP(self.com_port_psu1)
else:
raise Exception("Invalid psu model: {}".format(self.psu_type))
except Exception as e:
self.psu1 = None
# psu2: controls z axis
try:
if self.psu_type == "ps2000b":
self.psu2 = PSUDevicePS2000B(self.com_port_psu2)
elif self.psu_type == "ql355tp":
self.psu2 = PSUDeviceQL355TP(self.com_port_psu2)
else:
raise Exception("Invalid psu model: {}".format(self.psu_type))
except Exception as e:
self.psu2 = None
def request_proxy(self):
"""Returns a new HelmholtzCageProxy or None, depending on if access is available"""
with self.proxy_lock:
if not self.proxy_id:
# The interface is available, return a new proxy object
new_proxy = HelmholtzCageProxy(self)
self.proxy_id = id(new_proxy)
return new_proxy
else:
# The interface is occupied, the caller tolerate that the request failed.
return None
def release_proxy(self, proxy_obj):
"""Releases the proxy to free access for other controllers. Should only be called when proxy is destroyed"""
with self.proxy_lock:
# This only frees the interface if it really was the active proxy
if id(proxy_obj) == self.proxy_id:
self.proxy_id = None
# Otherwise do nothing, this case requires no behaviour
def queue_command(self, proxy_obj, command):
""" Queues a dict for immediate execution containing the command for the cage as a whole. """
def shutdown(self):
""" Shuts down the hardware. This special command overrides the currently active proxy."""
ui_print("\nAttempting to safely shut down all devices. Check equipment to confirm.")
# start writing string to later show how shutdown on all devices went in a single info pop-up:
message = "Tried to shut down all devices. Check equipment to confirm."
# Shutdown XY PSU
if self.psu1 is not None:
try:
self.psu1.shutdown()
self.psu1.destroy()
self.psu1 = None
except Exception as e:
ui_print("Error while deactivating XY PSU:", e) # print the problem in the console
message += "\nError while deactivating XY PSU: %s" % e # append status to the message to show later
else: # device was successfully deactivated
ui_print("XY PSU deactivated.")
message += "\nXY PSU deactivated." # append message to show later
else: # the device was not connected before
# tell user there was no need/no possibility to deactivate:
ui_print("XY PSU not connected, can't deactivate.")
message += "\nXY PSU not connected, can't deactivate."
# Shutdown Z PSU
if self.psu2 is not None:
try:
self.psu2.shutdown()
self.psu2.destroy()
self.psu2 = None
except Exception as e:
ui_print("Error while deactivating Z PSU:", e) # print the problem in the console
message += "\nError while deactivating Z PSU: %s" % e # append status to the message to show later
else: # device was successfully deactivated
ui_print("Z PSU deactivated.")
message += "\nZ PSU deactivated." # append message to show later
else: # the device was not connected before
# tell user there was no need/no possibility to deactivate:
ui_print("Z PSU not connected, can't deactivate.")
message += "\nZ PSU not connected, can't deactivate."
# Shut down arduino:
if self.arduino is not None:
try:
self.arduino.shutdown()
self.arduino.close()
self.arduino = None
except Exception as e:
ui_print("Error while deactivating Arduino:", e) # print the problem in the console
message += "\nError while deactivating Arduino: %s" % e # append status to the message to show later
else: # device was successfully deactivated
ui_print("Arduino deactivated.")
message += "\nArduino deactivated." # append message to show later
else: # the device was not connected before
# tell user there was no need/no possibility to deactivate:
ui_print("Arduino not connected, can't deactivate.")
message += "\nArduino not connected, can't deactivate."
messagebox.showinfo("Program ended",
message) # Show a unified pop-up with how the shutdown on each device went
class HelmholtzCageProxy:
""" Proxy for the HelmholtzCageDevice.
This is the only way the application should communicate with the HelmholtzCageDevice object"""
def __init__(self, cage_device):
self.cage_device = cage_device
def __del__(self):
self.cage_device.release_proxy(self)
+230
View File
@@ -0,0 +1,230 @@
# ABC is template used to create python abstract classes
from abc import ABC, abstractmethod
import serial
from src.ps2000b import PS2000B # Module containing all PS2000B HW functions and classes
class PSUDevice(ABC):
"""The PSUDevice abstract class defines a generic interface for power supplies used with the test bench.
It can be subclassed to easily support new hardware."""
def __init__(self, com_port):
"""PSUDevice assumes a serial connection. Class must provide a connected state variable"""
self.com_port = com_port
self.connected = False
@abstractmethod
def enable_channel(self, channel_nr):
"""Most PSUs offer completely disabling or enabling a channel, beyond setting V=0, I=0.
Can throw exceptions"""
pass
@abstractmethod
def disable_channel(self, channel_nr):
"""Most PSUs offer completely disabling or enabling a channel, beyond setting V=0, I=0.
Can throw exceptions"""
pass
# Warning: not abstract, does not need to be implemented
def set_channel_state(self, channel_nr, enabled):
"""True: Enable channel, False: Disable channel
Can throw exceptions"""
if enabled:
self.enable_channel(channel_nr)
else:
self.disable_channel(channel_nr)
@abstractmethod
def set_current(self, channel_nr, current):
"""Set the current limit. Actual current dependent on OVP or OCP mode.
Can throw exceptions"""
pass
@abstractmethod
def set_voltage(self, channel_nr, voltage):
"""Set the voltage limit. Actual voltage dependent on OVP or OCP mode.
Can throw exceptions"""
pass
@abstractmethod
def poll_channel_state(self, channel_nr):
"""Return a dictionary with the entries below. WARNING: this call is blocking and potentially slow!
Can throw exceptions"""
# return {'active': False, 'remote_active': False,
# 'actual_voltage':0, 'limit_voltage':0, 'actual_current':0, 'limit_current':0}
pass
@abstractmethod
def shutdown(self):
"""Shuts the PSU down safely and makes sure ALL outputs are off."""
pass
@abstractmethod
def destroy(self):
"""Disconnects the device"""
pass
@property
@abstractmethod
def valid_channels(self):
"""Returns a list containing valid channel numbers"""
pass
class PSUDevicePS2000B(PSUDevice):
"""Provides HW support for the Elektro-Automatik PS2000B power supply.
This class is an adapter for the already existing PS2000B library, to have a consistent interface"""
def __init__(self, com_port):
super().__init__(com_port)
try:
self.dev = PS2000B.PS2000B(com_port)
# dev_info is a class which contains hw specific constants, such as nominal voltage and current
self.dev_info = self.dev.get_device_information() # Cache this result
self.connected = True
except serial.SerialException:
self.dev = None
self.connected = False
@property
def valid_channels(self):
# Dependent on PSU, the PS2000B has 2 channels
return [0, 1]
def enable_channel(self, channel_nr):
self.dev.enable_output(channel_nr)
def disable_channel(self, channel_nr):
self.dev.enable_output(channel_nr)
def set_current(self, channel_nr, current):
self.dev.set_current(current, channel_nr)
def set_voltage(self, channel_nr, voltage):
self.dev.set_voltage(voltage, channel_nr)
def poll_channel_state(self, channel_nr):
self.dev.update_device_information(channel_nr) # update the information in the device object
dev_status = self.dev.get_device_status_information(channel_nr) # get object with new status info
# This is more efficient than the dev.get_voltage() call, since it does not require another update_device_info
voltage = dev_status.actual_voltage_percent * self.dev_info.nominal_voltage # Extracted from PS2000B library
voltage_setp = self.dev.get_voltage_setpoint(channel_nr)
current = dev_status.actual_current_percent * self.dev_info.nominal_current
current_setp = self.dev.get_current_setpoint(channel_nr)
# Format should match the provided template in abstract PSUDevice class.
return {'active': dev_status.output_active, 'remote_active': dev_status.remote_control_active,
'actual_voltage': voltage, 'limit_voltage': voltage_setp,
'actual_current': current, 'limit_current': current_setp}
def shutdown(self):
for ch in self.valid_channels:
self.disable_channel(ch)
self.set_current(ch, 0)
self.set_voltage(ch, 0)
def destroy(self):
self.dev.serial.close()
class PSUDeviceQL355TP(PSUDevice):
"""HW interface for QL355TP from AIM-TTi Instruments"""
def __init__(self, com_port):
super().__init__(com_port)
try:
self._serial_object = serial.Serial(
port=self.com_port,
baudrate=19200,
timeout=0.5,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS
)
self.set_output_range(0, 0) # Put the PSU into the 15V/5A range
self.set_output_range(1, 0)
self.reset_breaker() # Reset the breaker in case we are coming from an unclean state
self.connected = True
except serial.SerialException:
self.dev = None
self.connected = False
@property
def valid_channels(self):
# Dependent on PSU, the QL355TP has 2 normal channels. The auxiliary channel is not usable for our purpose
return [1, 2]
def enable_channel(self, channel_nr):
# The serial interface is documented in the QL355TP handbook
self._serial_object.write("OP{} 1\n".format(channel_nr).encode())
def disable_channel(self, channel_nr):
# The serial interface is documented in the QL355TP handbook
self._serial_object.write("OP{} 0\n".format(channel_nr).encode())
def set_current(self, channel_nr, current):
# The serial interface is documented in the QL355TP handbook
self._serial_object.write("I{} {}\n".format(channel_nr, current).encode())
def set_voltage(self, channel_nr, voltage):
# The serial interface is documented in the QL355TP handbook
self._serial_object.write("V{} {}\n".format(channel_nr, voltage).encode())
def poll_channel_state(self, channel_nr):
# Request channel state
self._serial_object.write("OP{}?\n".format(channel_nr).encode())
resp = self._serial_object.read_until()
output_active = resp.decode().rstrip()
# Request current current
self._serial_object.write("I{}O?\n".format(channel_nr).encode())
resp = self._serial_object.read_until()
# Trim whitespace and units
current = float(resp.decode().rstrip()[:-1])
# Request current setpoint
self._serial_object.write("I{}?\n".format(channel_nr).encode())
resp = self._serial_object.read_until()
# Trim whitespace and units
current_setp = float(resp.decode().rstrip()[:-1])
# Request current voltage
self._serial_object.write("V{}O?\n".format(channel_nr).encode())
resp = self._serial_object.read_until()
# Trim whitespace and units
voltage = float(resp.decode().rstrip()[:-1])
# Request voltage setpoint
self._serial_object.write("V{}?\n".format(channel_nr).encode())
resp = self._serial_object.read_until()
# Trim whitespace and units
voltage_setp = float(resp.decode().rstrip()[:-1])
# Format should match the provided template in abstract PSUDevice class.
# The remote_active property is assumed to always be True since it cant be read
# (it should be since we are talking to it)
return {'active': output_active, 'remote_active': True,
'actual_voltage': voltage, 'limit_voltage': voltage_setp,
'actual_current': current, 'limit_current': current_setp}
def set_output_range(self, channel_nr, value_range):
"""The QL355TP supports various output ranges. We require the 15V/5A mode to achieve the greatest range."""
# range 0: 15V/5A, 1=35V/3A, 2=35V/500mA
self._serial_object.write("RANGE{} {}\n".format(channel_nr, value_range).encode())
def reset_breaker(self):
"""The QL355TP includes an internal breaker, which must be reset if the
maximum voltage or current is exceeded"""
self._serial_object.write("TRIPRST\n".encode())
def shutdown(self):
for ch in self.valid_channels:
self.disable_channel(ch)
self.set_current(ch, 0)
self.set_voltage(ch, 0)
def destroy(self):
self._serial_object.close()