forked from zietzm/Helmholtz_Test_Bench
410 lines
14 KiB
Python
410 lines
14 KiB
Python
# This file enables communication with PS2000B Power Supply Units.
|
|
|
|
#!/usr/bin/env python3
|
|
# coding=utf-8
|
|
# Python access to Elektro Automatik PS 2000 B devices via USB/serial
|
|
#
|
|
# Supported features:
|
|
# - read static device information (manufacturer, serial, device type ...)
|
|
# - read dynamic device information (current, voltage)
|
|
# - read/write remote control
|
|
# - read/write output control
|
|
#
|
|
# - wrap error results in own telegram
|
|
#
|
|
# References
|
|
# [1] = "PS 2000B Programming Guide" from 2015-05-28
|
|
# [2] = "PS 2000B object list"
|
|
#
|
|
|
|
import serial
|
|
import struct
|
|
import sys
|
|
PY_3 = sys.version_info >= (3, 0)
|
|
|
|
# noinspection SpellCheckingInspection
|
|
__author__ = "Sören Sprößig <ssproessig@gmail.com>"
|
|
|
|
|
|
def as_string(raw_data):
|
|
"""Converts the given raw bytes to a string (removes NULL)"""
|
|
return bytearray(raw_data[:-1])
|
|
|
|
|
|
def as_float(raw_data):
|
|
"""Converts the given raw bytes to a float"""
|
|
f = struct.unpack_from(">f", bytearray(raw_data))[0]
|
|
return f
|
|
|
|
|
|
def as_word(raw_data):
|
|
"""Converts the given raw bytes to a word"""
|
|
w = struct.unpack_from(">H", bytearray(raw_data))[0]
|
|
return w
|
|
|
|
|
|
def _ord(x):
|
|
"""Wrap ord() call as we only need it in Python 2"""
|
|
return x if PY_3 else ord(x)
|
|
|
|
|
|
# noinspection PyClassHasNoInit
|
|
class Constants:
|
|
"""Communication constants"""
|
|
# communication parameters taken from [1], chapter 2.2
|
|
CONNECTION_BAUD_RATE = 115200
|
|
CONNECTION_PARITY = serial.PARITY_ODD
|
|
CONNECTION_STOP_BITS = 1
|
|
# timeout taken from [1], chapter 3.7
|
|
TIMEOUT_BETWEEN_COMMANDS = 0.05
|
|
# according to spec [1] 2.4:
|
|
# maximum length of a telegram is 21 bytes (Byte 0..20)
|
|
MAX_LEN_IN_BYTES = 21
|
|
|
|
|
|
# noinspection PyClassHasNoInit
|
|
class Objects:
|
|
"""Supported objects ids / commands"""
|
|
DEVICE_TYPE = 0
|
|
DEVICE_SERIAL_NO = 1
|
|
NOMINAL_VOLTAGE = 2
|
|
NOMINAL_CURRENT = 3
|
|
NOMINAL_POWER = 4
|
|
DEVICE_ARTICLE_NO = 6
|
|
MANUFACTURER = 8
|
|
SOFTWARE_VERSION = 9
|
|
SET_VALUE_VOLTAGE = 50
|
|
SET_VALUE_CURRENT = 51
|
|
POWER_SUPPLY_CONTROL = 54
|
|
STATUS_ACTUAL_VALUES = 71
|
|
|
|
|
|
# noinspection PyClassHasNoInit
|
|
class ControlParam:
|
|
"""Parameters for controlling the device"""
|
|
SWITCH_MODE_CMD = 0x10
|
|
SWITCH_MODE_REMOTE = 0x10
|
|
SWITCH_MODE_MANUAL = 0x00
|
|
SWITCH_POWER_OUTPUT_CMD = 0x1
|
|
SWITCH_POWER_OUTPUT_ON = 0x1
|
|
SWITCH_POWER_OUTPUT_OFF = 0x0
|
|
|
|
|
|
class Telegram:
|
|
"""Base class of a PS2000B telegram - basically allows accessing the raw bytes and does checksum calculation"""
|
|
|
|
def __init__(self):
|
|
self._bytes = []
|
|
self._checksum = []
|
|
self.checksum_ok = False
|
|
|
|
def _calc_checksum(self):
|
|
cs = 0
|
|
|
|
for b in self._bytes:
|
|
cs += b
|
|
|
|
cs_high = (cs & 0xff00) >> 8
|
|
cs_low = cs & 0xff
|
|
|
|
return [cs_high, cs_low]
|
|
|
|
@staticmethod
|
|
def _get_start_delimiter(transmission, expected_data_length):
|
|
result = 0b00000000
|
|
|
|
if expected_data_length > 16:
|
|
raise Exception("only 4 bits for expected length can be used")
|
|
|
|
result |= (expected_data_length - 1)
|
|
result |= 0b10000
|
|
result |= 0b100000
|
|
result |= transmission << 6
|
|
return result
|
|
|
|
def get_byte_array(self):
|
|
return bytearray(self._bytes + self._checksum)
|
|
|
|
|
|
class FromPowerSupply(Telegram):
|
|
"""Telegram received from the power supply"""
|
|
|
|
def __init__(self, raw_data):
|
|
Telegram.__init__(self)
|
|
data = [_ord(x) for x in raw_data]
|
|
self._bytes = data[0:-2]
|
|
self._checksum = data[len(data) - 2:len(data)]
|
|
self.checksum_ok = self._checksum == self._calc_checksum()
|
|
|
|
def get_sd(self):
|
|
return self._bytes[0]
|
|
|
|
def get_device_node(self):
|
|
return self._bytes[1]
|
|
|
|
def get_object(self):
|
|
return self._bytes[3]
|
|
|
|
def get_data(self):
|
|
return self._bytes[3:len(self._bytes)]
|
|
|
|
# noinspection PyMethodMayBeStatic
|
|
def get_error(self):
|
|
# ToDo: [1] chapter 3.6 add support for error codes here
|
|
return None
|
|
|
|
|
|
class ToPowerSupply(Telegram):
|
|
"""A telegram sent to the power supply"""
|
|
|
|
def __init__(self, transmission, data, expected_data_length):
|
|
Telegram.__init__(self)
|
|
self._bytes = []
|
|
self._bytes.append(self._get_start_delimiter(transmission, expected_data_length))
|
|
self._bytes.extend(data)
|
|
self._checksum = self._calc_checksum()
|
|
self.checksum_ok = True
|
|
|
|
|
|
class DeviceInformation:
|
|
"""A class carrying all static device information read from the device"""
|
|
|
|
def __init__(self):
|
|
self.device_type = ""
|
|
self.device_serial_no = ""
|
|
self.nominal_voltage = 0
|
|
self.nominal_current = 0
|
|
self.nominal_power = 0
|
|
self.manufacturer = ""
|
|
self.device_art_no = ""
|
|
self.software_version = ""
|
|
|
|
def __str__(self):
|
|
return "%s %s [%s], SW: %s, Art-Nr: %s, [%0.2f V, %0.2f A, %0.2f W]" % \
|
|
(self.manufacturer,
|
|
self.device_type, self.device_serial_no,
|
|
self.software_version, self.device_art_no,
|
|
self.nominal_voltage, self.nominal_current, self.nominal_power)
|
|
|
|
|
|
class DeviceStatusInformation:
|
|
"""A class carrying all dynamic device status information"""
|
|
|
|
def __init__(self, raw_data):
|
|
self.remote_control_active = raw_data[0] & 0b1
|
|
self.output_active = raw_data[1] & 0b1
|
|
self.actual_voltage_percent = float(as_word(raw_data[2:4])) / 256
|
|
self.actual_current_percent = float(as_word(raw_data[4:6])) / 256
|
|
|
|
def __str__(self):
|
|
if self.remote_control_active == 1:
|
|
remote = "active"
|
|
else:
|
|
remote = "inactive"
|
|
if self.output_active == 1:
|
|
output = "active"
|
|
else:
|
|
output = "inactive"
|
|
return "Remote control %s, Output %s" % (remote, output)
|
|
|
|
|
|
class PS2000B:
|
|
"""PS 2000 B main communication class"""
|
|
|
|
def __init__(self, serial_port):
|
|
self.__device_status_information1 = None
|
|
self.__device_status_information2 = None
|
|
self.serial = serial.Serial(serial_port,
|
|
baudrate=Constants.CONNECTION_BAUD_RATE,
|
|
timeout=Constants.TIMEOUT_BETWEEN_COMMANDS * 2,
|
|
parity=serial.PARITY_ODD,
|
|
stopbits=Constants.CONNECTION_STOP_BITS)
|
|
|
|
self.__device_information = self.__read_device_information()
|
|
|
|
def is_open(self):
|
|
return self.serial.is_open
|
|
|
|
def get_device_information(self):
|
|
return self.__device_information
|
|
|
|
def __read_device_information(self, channel=0): # reads static device information, usually not channel dependant
|
|
result = DeviceInformation()
|
|
|
|
# taken from [2]
|
|
result.device_type = as_string(self.__read_device_data(16, Objects.DEVICE_TYPE, channel).get_data())
|
|
result.device_serial_no = as_string(self.__read_device_data(16, Objects.DEVICE_SERIAL_NO, channel).get_data())
|
|
result.nominal_voltage = as_float(self.__read_device_data(4, Objects.NOMINAL_VOLTAGE, channel).get_data())
|
|
result.nominal_current = as_float(self.__read_device_data(4, Objects.NOMINAL_CURRENT, channel).get_data())
|
|
result.nominal_power = as_float(self.__read_device_data(4, Objects.NOMINAL_POWER, channel).get_data())
|
|
result.device_art_no = as_string(self.__read_device_data(16, Objects.DEVICE_ARTICLE_NO, channel).get_data())
|
|
result.manufacturer = as_string(self.__read_device_data(16, Objects.MANUFACTURER, channel).get_data())
|
|
result.software_version = as_string(self.__read_device_data(16, Objects.SOFTWARE_VERSION, channel).get_data())
|
|
|
|
return result
|
|
|
|
def __read_device_data(self, expected_length, object_id, channel): # reads data from device based on object_id
|
|
telegram = ToPowerSupply(0b01, [channel, object_id], expected_length)
|
|
result = self.__send_and_receive(telegram.get_byte_array())
|
|
return result
|
|
|
|
def __send_and_receive(self, raw_bytes): # sends request for info to device and reads reply
|
|
self.serial.write(raw_bytes)
|
|
result = FromPowerSupply(self.serial.read(Constants.MAX_LEN_IN_BYTES))
|
|
return result
|
|
|
|
def get_device_status_information(self, channel): # gets dynamic device information (e.g. current, voltage)
|
|
if channel == 0:
|
|
if self.__device_status_information1 is None:
|
|
self.update_device_information(0)
|
|
info = self.__device_status_information1
|
|
elif channel == 1:
|
|
if self.__device_status_information2 is None:
|
|
self.update_device_information(1)
|
|
info = self.__device_status_information2
|
|
else:
|
|
raise ValueError("Invalid Channel")
|
|
return info
|
|
|
|
def update_device_information(self, channel): # updates dynamic device info stored in __device_status_information
|
|
telegram = ToPowerSupply(0b01, [channel, Objects.STATUS_ACTUAL_VALUES], 6)
|
|
device_information = self.__send_and_receive(telegram.get_byte_array())
|
|
if channel == 0:
|
|
self.__device_status_information1 = DeviceStatusInformation(device_information.get_data())
|
|
elif channel == 1:
|
|
self.__device_status_information2 = DeviceStatusInformation(device_information.get_data())
|
|
else:
|
|
raise ValueError("Invalid Channel")
|
|
|
|
def __send_device_control(self, p1, p2, channel): # sends commands to PSU, commands given in p1, p2
|
|
telegram = ToPowerSupply(0b11, [channel, Objects.POWER_SUPPLY_CONTROL, p1, p2], 2)
|
|
_ = self.__send_and_receive(telegram.get_byte_array()) # send command to PSU
|
|
self.update_device_information(channel) # update info after change
|
|
|
|
def __send_device_data(self, obj, data, channel):
|
|
# Send integer data with obj-id to the PSU
|
|
|
|
telegram = ToPowerSupply(0b11, [channel, obj, data >> 8, data & 0xff], 4)
|
|
_ = self.__send_and_receive(telegram.get_byte_array())
|
|
self.update_device_information(channel)
|
|
|
|
def enable_all(self):
|
|
self.enable_remote_control(0)
|
|
self.enable_remote_control(1)
|
|
self.enable_output(0)
|
|
self.enable_output(1)
|
|
|
|
def disable_all(self):
|
|
self.disable_output(0)
|
|
self.disable_output(1)
|
|
self.disable_remote_control(0)
|
|
self.disable_remote_control(1)
|
|
|
|
def enable_remote_control(self, channel):
|
|
self.__send_device_control(ControlParam.SWITCH_MODE_CMD, ControlParam.SWITCH_MODE_REMOTE, channel)
|
|
|
|
def disable_remote_control(self, channel):
|
|
self.__send_device_control(ControlParam.SWITCH_MODE_CMD, ControlParam.SWITCH_MODE_MANUAL, channel)
|
|
|
|
def enable_output(self, channel):
|
|
self.__send_device_control(ControlParam.SWITCH_POWER_OUTPUT_CMD, ControlParam.SWITCH_POWER_OUTPUT_ON, channel)
|
|
|
|
def disable_output(self, channel):
|
|
self.__send_device_control(ControlParam.SWITCH_POWER_OUTPUT_CMD, ControlParam.SWITCH_POWER_OUTPUT_OFF, channel)
|
|
|
|
@property
|
|
def output1(self): # object controlling output 1 on/off
|
|
return self.get_device_status_information(0).output_active
|
|
|
|
@output1.setter
|
|
def output1(self, value):
|
|
if value:
|
|
self.enable_output(0)
|
|
else:
|
|
self.disable_output(0)
|
|
|
|
@property
|
|
def output2(self): # object controlling output 2 on/off
|
|
return self.get_device_status_information(1).output_active
|
|
|
|
@output2.setter
|
|
def output2(self, value):
|
|
if value:
|
|
self.enable_output(1)
|
|
else:
|
|
self.disable_output(1)
|
|
|
|
def get_voltage(self, channel):
|
|
self.update_device_information(channel)
|
|
if channel == 0:
|
|
v_perc = self.__device_status_information1.actual_voltage_percent
|
|
elif channel == 1:
|
|
v_perc = self.__device_status_information2.actual_voltage_percent
|
|
else:
|
|
raise ValueError("Invalid channel")
|
|
voltage = self.__device_information.nominal_voltage * v_perc
|
|
return voltage / 100
|
|
|
|
def get_voltage_setpoint(self, channel):
|
|
res = self.__read_device_data(2, Objects.SET_VALUE_VOLTAGE, channel).get_data()
|
|
return self.__device_information.nominal_voltage * ((res[0] << 8) + res[1]) / 25600.0
|
|
|
|
def set_voltage(self, value, channel):
|
|
self.update_device_information(channel)
|
|
self.enable_remote_control(channel)
|
|
volt = int(round((value * 25600.0) / self.__device_information.nominal_voltage))
|
|
self.__send_device_data(Objects.SET_VALUE_VOLTAGE, volt, channel)
|
|
|
|
@property
|
|
def voltage1(self): # object storing and controlling the voltage of channel 1
|
|
return self.get_voltage(0)
|
|
|
|
@voltage1.setter
|
|
def voltage1(self, value): # voltage of channel 1
|
|
self.set_voltage(value, 0)
|
|
|
|
@property
|
|
def voltage2(self): # object storing and controlling the voltage of channel 2
|
|
return self.get_voltage(1)
|
|
|
|
@voltage2.setter
|
|
def voltage2(self, value):
|
|
self.set_voltage(value, 1)
|
|
|
|
def get_current(self, channel):
|
|
self.update_device_information(channel)
|
|
if channel == 0:
|
|
c_perc = self.__device_status_information1.actual_current_percent
|
|
elif channel == 1:
|
|
c_perc = self.__device_status_information2.actual_current_percent
|
|
else:
|
|
raise ValueError("Invalid channel")
|
|
current = self.__device_information.nominal_current * c_perc
|
|
return current / 100
|
|
|
|
def get_current_setpoint(self, channel):
|
|
res = self.__read_device_data(2, Objects.SET_VALUE_CURRENT, channel).get_data()
|
|
return self.__device_information.nominal_current * ((res[0] << 8) + res[1]) / 25600.0
|
|
|
|
def set_current(self, value, channel):
|
|
self.update_device_information(channel)
|
|
self.enable_remote_control(channel)
|
|
curr = int(round((value * 25600.0) / self.__device_information.nominal_current))
|
|
self.__send_device_data(Objects.SET_VALUE_CURRENT, curr, channel)
|
|
|
|
@property
|
|
def current1(self):
|
|
return self.get_current(0)
|
|
|
|
@current1.setter
|
|
def current1(self, value): # current of channel 1
|
|
self.set_current(value, 0)
|
|
|
|
@property
|
|
def current2(self):
|
|
return self.get_current(1)
|
|
|
|
@current2.setter
|
|
def current2(self, value): # current of channel 2
|
|
self.set_current(value, 1)
|