diff --git a/src/arduino_device.py b/src/arduino_device.py new file mode 100644 index 0000000..45e9fc1 --- /dev/null +++ b/src/arduino_device.py @@ -0,0 +1,41 @@ +from src.arduino import Arduino +import src.config_handling as config +from src.user_interface import ui_print +import src.globals as g + + +class ArduinoDevice(Arduino): + """ Main class to control the electronics box (which means commanding the arduino inside). + Inherits from the Arduino library. All axis indices are go from 0-2 for x,y and z respectively""" + + def __init__(self): + self.pins = [0, 0, 0] # initialize list with pins to switch relay of each axis + for i in range(3): # get correct pins from the config + self.pins[i] = int(config.read_from_config(g.AXIS_NAMES[i], "relay_pin", config.CONFIG_OBJECT)) + + ui_print("\nConnecting to Arduino...") + # try to set up the arduino. Exceptions are handled by caller + # search for connected arduino and connect by initializing arduino library class + Arduino.__init__(self, timeout=0.2) + for pin in self.pins: + self.pinMode(pin, "Output") + self.digitalWrite(pin, "LOW") + ui_print("Arduino ready.") + + def set_axis_polarity(self, axis_index, reverse): + """Sets the polarity of the axis (indexed with 0-2). True is reverse polarity""" + if reverse: + self.digitalWrite(self.pins[axis_index], "HIGH") + else: + self.digitalWrite(self.pins[axis_index], "LOW") + + def get_axis_polarity(self): + """Returns a list with one bool per axis indicating whether the polarity is reversed (True).""" + status = [] + for pin in self.pins: # go through all three pins/axes + status.append(self.digitalRead(pin)) # pin is HIGH --> relay is switched + + def shutdown(self): + """Sets relay switching pins to low to de-power most of the electronics box""" + for pin in self.pins: + self.digitalWrite(pin, "LOW") diff --git a/src/cage_func.py b/src/cage_func.py index 7ae72bb..4e6e839 100644 --- a/src/cage_func.py +++ b/src/cage_func.py @@ -171,7 +171,7 @@ class ArduinoCtrl(Arduino): ui_print("\nConnecting to Arduino...") try: # try to set up the arduino - Arduino.__init__(self) # search for connected arduino and connect by initializing arduino library class + Arduino.__init__(self, timeout=0.2) # search for connected arduino and connect by initializing arduino library class for pin in self.pins: self.pinMode(pin, "Output") self.digitalWrite(pin, "LOW") diff --git a/src/globals.py b/src/globals.py index e38c910..ce64b11 100644 --- a/src/globals.py +++ b/src/globals.py @@ -3,20 +3,14 @@ import numpy as np -XY_DEVICE = None # XY PSU object will be stored here (class PS2000B) -Z_DEVICE = None # Z PSU object will be stored here (class PS2000B) -ARDUINO = None # Arduino object will be stored here (class ArduinoCtrl) +# Main Tkinter application object will be stored here (class HelmholtzGUI) +app = None -# Axis objects will be stored here (class Axis) -X_AXIS = None -Y_AXIS = None -Z_AXIS = None +# The main access point for all hardware commands +CAGE_DEVICE = None -AXES = None # list containing [X_AXIS, Y_AXIS, Z_AXIS] - -app = None # Main Tkinter application object will be stored here (class HelmholtzGUI) - -AXIS_NAMES = ["X-Axis", "Y-Axis", "Z-Axis"] # list with the names of each axis, used mainly for printing functions +# list with the names of each axis, used mainly for printing functions +AXIS_NAMES = ["X-Axis", "Y-Axis", "Z-Axis"] global XY_PORT # serial port for XY PSU will be stored here (string) global Z_PORT # serial port for Z PSU will be stored here (string) diff --git a/src/helmholtz_cage_device.py b/src/helmholtz_cage_device.py new file mode 100644 index 0000000..9d0da32 --- /dev/null +++ b/src/helmholtz_cage_device.py @@ -0,0 +1,156 @@ +import traceback +from threading import RLock +from tkinter import messagebox + +from src.arduino_device import ArduinoDevice +from src.psu_device import PSUDevicePS2000B, PSUDeviceQL355TP +from src.user_interface import ui_print +import src.config_handling as config +import src.globals as g + +class HelmholtzCageDevice: + """This is the central object for controlling all the test bench related HW. This way, access can be + synchronized and exclusive to a single controller at once. This device always exists, irrespective of + which devices are actually connected. Only the request_proxy and shutdown method should be used !!! + Provides subscriber interface for periodic status information. + Provides proxy model to control access.""" + + # This class is thread safe + + def __init__(self): + # --- PROXY MODEL SETUP --- + # Locks all proxy requesting and releasing access + self.proxy_lock = RLock() + # Contains the id of the current in control proxy + self.proxy_id = None + + # --- DEVICE SETUP --- + # All devices are usable if the object exists. None indicates the device is not connected/not working properly + # Arduino setup + try: + self.arduino = ArduinoDevice() + except Exception as e: + self.arduino = None + # show error messages to alert user + ui_print("Arduino setup failed:", e) + ui_print(traceback.print_exc()) + messagebox.showerror("Error!", "Arduino setup failed:\n%s \nCheck traceback in console." % e) + + # PSU setup + self.com_port_psu1 = config.read_from_config("Supplies", "xy_port", config.CONFIG_OBJECT) + self.com_port_psu2 = config.read_from_config("Supplies", "z_port", config.CONFIG_OBJECT) + self.psu_type = config.read_from_config("Supplies", "supply_model", config.CONFIG_OBJECT) + # psu1: controls xy axis + try: + if self.psu_type == "ps2000b": + self.psu1 = PSUDevicePS2000B(self.com_port_psu1) + elif self.psu_type == "ql355tp": + self.psu1 = PSUDeviceQL355TP(self.com_port_psu1) + else: + raise Exception("Invalid psu model: {}".format(self.psu_type)) + except Exception as e: + self.psu1 = None + # psu2: controls z axis + try: + if self.psu_type == "ps2000b": + self.psu2 = PSUDevicePS2000B(self.com_port_psu2) + elif self.psu_type == "ql355tp": + self.psu2 = PSUDeviceQL355TP(self.com_port_psu2) + else: + raise Exception("Invalid psu model: {}".format(self.psu_type)) + except Exception as e: + self.psu2 = None + + def request_proxy(self): + """Returns a new HelmholtzCageProxy or None, depending on if access is available""" + with self.proxy_lock: + if not self.proxy_id: + # The interface is available, return a new proxy object + new_proxy = HelmholtzCageProxy(self) + self.proxy_id = id(new_proxy) + return new_proxy + else: + # The interface is occupied, the caller tolerate that the request failed. + return None + + def release_proxy(self, proxy_obj): + """Releases the proxy to free access for other controllers. Should only be called when proxy is destroyed""" + with self.proxy_lock: + # This only frees the interface if it really was the active proxy + if id(proxy_obj) == self.proxy_id: + self.proxy_id = None + # Otherwise do nothing, this case requires no behaviour + + def queue_command(self, proxy_obj, command): + """ Queues a dict for immediate execution containing the command for the cage as a whole. """ + + def shutdown(self): + """ Shuts down the hardware. This special command overrides the currently active proxy.""" + + ui_print("\nAttempting to safely shut down all devices. Check equipment to confirm.") + # start writing string to later show how shutdown on all devices went in a single info pop-up: + message = "Tried to shut down all devices. Check equipment to confirm." + + # Shutdown XY PSU + if self.psu1 is not None: + try: + self.psu1.shutdown() + self.psu1.destroy() + self.psu1 = None + except Exception as e: + ui_print("Error while deactivating XY PSU:", e) # print the problem in the console + message += "\nError while deactivating XY PSU: %s" % e # append status to the message to show later + else: # device was successfully deactivated + ui_print("XY PSU deactivated.") + message += "\nXY PSU deactivated." # append message to show later + else: # the device was not connected before + # tell user there was no need/no possibility to deactivate: + ui_print("XY PSU not connected, can't deactivate.") + message += "\nXY PSU not connected, can't deactivate." + + # Shutdown Z PSU + if self.psu2 is not None: + try: + self.psu2.shutdown() + self.psu2.destroy() + self.psu2 = None + except Exception as e: + ui_print("Error while deactivating Z PSU:", e) # print the problem in the console + message += "\nError while deactivating Z PSU: %s" % e # append status to the message to show later + else: # device was successfully deactivated + ui_print("Z PSU deactivated.") + message += "\nZ PSU deactivated." # append message to show later + else: # the device was not connected before + # tell user there was no need/no possibility to deactivate: + ui_print("Z PSU not connected, can't deactivate.") + message += "\nZ PSU not connected, can't deactivate." + + # Shut down arduino: + if self.arduino is not None: + try: + self.arduino.shutdown() + self.arduino.close() + self.arduino = None + except Exception as e: + ui_print("Error while deactivating Arduino:", e) # print the problem in the console + message += "\nError while deactivating Arduino: %s" % e # append status to the message to show later + else: # device was successfully deactivated + ui_print("Arduino deactivated.") + message += "\nArduino deactivated." # append message to show later + else: # the device was not connected before + # tell user there was no need/no possibility to deactivate: + ui_print("Arduino not connected, can't deactivate.") + message += "\nArduino not connected, can't deactivate." + + messagebox.showinfo("Program ended", + message) # Show a unified pop-up with how the shutdown on each device went + + +class HelmholtzCageProxy: + """ Proxy for the HelmholtzCageDevice. + This is the only way the application should communicate with the HelmholtzCageDevice object""" + def __init__(self, cage_device): + self.cage_device = cage_device + + def __del__(self): + self.cage_device.release_proxy(self) \ No newline at end of file diff --git a/src/psu_device.py b/src/psu_device.py new file mode 100644 index 0000000..dca9204 --- /dev/null +++ b/src/psu_device.py @@ -0,0 +1,230 @@ +# ABC is template used to create python abstract classes +from abc import ABC, abstractmethod +import serial + +from src.ps2000b import PS2000B # Module containing all PS2000B HW functions and classes + + +class PSUDevice(ABC): + """The PSUDevice abstract class defines a generic interface for power supplies used with the test bench. + It can be subclassed to easily support new hardware.""" + + def __init__(self, com_port): + """PSUDevice assumes a serial connection. Class must provide a connected state variable""" + self.com_port = com_port + self.connected = False + + @abstractmethod + def enable_channel(self, channel_nr): + """Most PSUs offer completely disabling or enabling a channel, beyond setting V=0, I=0. + Can throw exceptions""" + pass + + @abstractmethod + def disable_channel(self, channel_nr): + """Most PSUs offer completely disabling or enabling a channel, beyond setting V=0, I=0. + Can throw exceptions""" + pass + + # Warning: not abstract, does not need to be implemented + def set_channel_state(self, channel_nr, enabled): + """True: Enable channel, False: Disable channel + Can throw exceptions""" + if enabled: + self.enable_channel(channel_nr) + else: + self.disable_channel(channel_nr) + + @abstractmethod + def set_current(self, channel_nr, current): + """Set the current limit. Actual current dependent on OVP or OCP mode. + Can throw exceptions""" + pass + + @abstractmethod + def set_voltage(self, channel_nr, voltage): + """Set the voltage limit. Actual voltage dependent on OVP or OCP mode. + Can throw exceptions""" + pass + + @abstractmethod + def poll_channel_state(self, channel_nr): + """Return a dictionary with the entries below. WARNING: this call is blocking and potentially slow! + Can throw exceptions""" + # return {'active': False, 'remote_active': False, + # 'actual_voltage':0, 'limit_voltage':0, 'actual_current':0, 'limit_current':0} + pass + + @abstractmethod + def shutdown(self): + """Shuts the PSU down safely and makes sure ALL outputs are off.""" + pass + + @abstractmethod + def destroy(self): + """Disconnects the device""" + pass + + @property + @abstractmethod + def valid_channels(self): + """Returns a list containing valid channel numbers""" + pass + + +class PSUDevicePS2000B(PSUDevice): + """Provides HW support for the Elektro-Automatik PS2000B power supply. + This class is an adapter for the already existing PS2000B library, to have a consistent interface""" + + def __init__(self, com_port): + super().__init__(com_port) + try: + self.dev = PS2000B.PS2000B(com_port) + # dev_info is a class which contains hw specific constants, such as nominal voltage and current + self.dev_info = self.dev.get_device_information() # Cache this result + self.connected = True + except serial.SerialException: + self.dev = None + self.connected = False + + @property + def valid_channels(self): + # Dependent on PSU, the PS2000B has 2 channels + return [0, 1] + + def enable_channel(self, channel_nr): + self.dev.enable_output(channel_nr) + + def disable_channel(self, channel_nr): + self.dev.enable_output(channel_nr) + + def set_current(self, channel_nr, current): + self.dev.set_current(current, channel_nr) + + def set_voltage(self, channel_nr, voltage): + self.dev.set_voltage(voltage, channel_nr) + + def poll_channel_state(self, channel_nr): + self.dev.update_device_information(channel_nr) # update the information in the device object + dev_status = self.dev.get_device_status_information(channel_nr) # get object with new status info + + # This is more efficient than the dev.get_voltage() call, since it does not require another update_device_info + voltage = dev_status.actual_voltage_percent * self.dev_info.nominal_voltage # Extracted from PS2000B library + voltage_setp = self.dev.get_voltage_setpoint(channel_nr) + current = dev_status.actual_current_percent * self.dev_info.nominal_current + current_setp = self.dev.get_current_setpoint(channel_nr) + + # Format should match the provided template in abstract PSUDevice class. + return {'active': dev_status.output_active, 'remote_active': dev_status.remote_control_active, + 'actual_voltage': voltage, 'limit_voltage': voltage_setp, + 'actual_current': current, 'limit_current': current_setp} + + def shutdown(self): + for ch in self.valid_channels: + self.disable_channel(ch) + self.set_current(ch, 0) + self.set_voltage(ch, 0) + + def destroy(self): + self.dev.serial.close() + + +class PSUDeviceQL355TP(PSUDevice): + """HW interface for QL355TP from AIM-TTi Instruments""" + + def __init__(self, com_port): + super().__init__(com_port) + try: + self._serial_object = serial.Serial( + port=self.com_port, + baudrate=19200, + timeout=0.5, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + bytesize=serial.EIGHTBITS + ) + self.set_output_range(0, 0) # Put the PSU into the 15V/5A range + self.set_output_range(1, 0) + self.reset_breaker() # Reset the breaker in case we are coming from an unclean state + + self.connected = True + except serial.SerialException: + self.dev = None + self.connected = False + + @property + def valid_channels(self): + # Dependent on PSU, the QL355TP has 2 normal channels. The auxiliary channel is not usable for our purpose + return [1, 2] + + def enable_channel(self, channel_nr): + # The serial interface is documented in the QL355TP handbook + self._serial_object.write("OP{} 1\n".format(channel_nr).encode()) + + def disable_channel(self, channel_nr): + # The serial interface is documented in the QL355TP handbook + self._serial_object.write("OP{} 0\n".format(channel_nr).encode()) + + def set_current(self, channel_nr, current): + # The serial interface is documented in the QL355TP handbook + self._serial_object.write("I{} {}\n".format(channel_nr, current).encode()) + + def set_voltage(self, channel_nr, voltage): + # The serial interface is documented in the QL355TP handbook + self._serial_object.write("V{} {}\n".format(channel_nr, voltage).encode()) + + def poll_channel_state(self, channel_nr): + # Request channel state + self._serial_object.write("OP{}?\n".format(channel_nr).encode()) + resp = self._serial_object.read_until() + output_active = resp.decode().rstrip() + + # Request current current + self._serial_object.write("I{}O?\n".format(channel_nr).encode()) + resp = self._serial_object.read_until() + # Trim whitespace and units + current = float(resp.decode().rstrip()[:-1]) + + # Request current setpoint + self._serial_object.write("I{}?\n".format(channel_nr).encode()) + resp = self._serial_object.read_until() + # Trim whitespace and units + current_setp = float(resp.decode().rstrip()[:-1]) + + # Request current voltage + self._serial_object.write("V{}O?\n".format(channel_nr).encode()) + resp = self._serial_object.read_until() + # Trim whitespace and units + voltage = float(resp.decode().rstrip()[:-1]) + + # Request voltage setpoint + self._serial_object.write("V{}?\n".format(channel_nr).encode()) + resp = self._serial_object.read_until() + # Trim whitespace and units + voltage_setp = float(resp.decode().rstrip()[:-1]) + + # Format should match the provided template in abstract PSUDevice class. + # The remote_active property is assumed to always be True since it cant be read + # (it should be since we are talking to it) + return {'active': output_active, 'remote_active': True, + 'actual_voltage': voltage, 'limit_voltage': voltage_setp, + 'actual_current': current, 'limit_current': current_setp} + + def set_output_range(self, channel_nr, value_range): + """The QL355TP supports various output ranges. We require the 15V/5A mode to achieve the greatest range.""" + # range 0: 15V/5A, 1=35V/3A, 2=35V/500mA + self._serial_object.write("RANGE{} {}\n".format(channel_nr, value_range).encode()) + + def reset_breaker(self): + """The QL355TP includes an internal breaker, which must be reset if the + maximum voltage or current is exceeded""" + self._serial_object.write("TRIPRST\n".encode()) + + def shutdown(self): + for ch in self.valid_channels: + self.disable_channel(ch) + self.set_current(ch, 0) + self.set_voltage(ch, 0) + + def destroy(self): + self._serial_object.close()