diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6609df1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,96 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# IPython Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# dotenv +.env + +# virtualenv +.venv/ +venv/ +ENV/ + +# Spyder project settings +.spyderproject + +# Rope project settings +.ropeproject + +# PyCharm +.idea + +# VScode +.vscode/ diff --git a/Arduino/__init__.py b/Arduino/__init__.py new file mode 100644 index 0000000..ffc5252 --- /dev/null +++ b/Arduino/__init__.py @@ -0,0 +1,2 @@ +name="arduino-python3" +from .arduino import Arduino, Shrimp diff --git a/Arduino/arduino.py b/Arduino/arduino.py new file mode 100644 index 0000000..e76a5b5 --- /dev/null +++ b/Arduino/arduino.py @@ -0,0 +1,657 @@ +#!/usr/bin/env python +import logging +import itertools +import platform +import serial +import time +from serial.tools import list_ports + +import sys +if sys.platform.startswith('win'): + import winreg +else: + import glob + +libraryVersion = 'V0.6' + +log = logging.getLogger(__name__) + + +def enumerate_serial_ports(): + """ + Uses the Win32 registry to return a iterator of serial + (COM) ports existing on this computer. + """ + path = 'HARDWARE\\DEVICEMAP\\SERIALCOMM' + try: + key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, path) + except OSError: + raise Exception + + for i in itertools.count(): + try: + val = winreg.EnumValue(key, i) + yield (str(val[1])) # , str(val[0])) + except EnvironmentError: + break + + +def build_cmd_str(cmd, args=None): + """ + Build a command string that can be sent to the arduino. + + Input: + cmd (str): the command to send to the arduino, must not + contain a % character + args (iterable): the arguments to send to the command + + @TODO: a strategy is needed to escape % characters in the args + """ + if args: + args = '%'.join(map(str, args)) + else: + args = '' + return "@{cmd}%{args}$!".format(cmd=cmd, args=args) + + +def find_port(baud, timeout): + """ + Find the first port that is connected to an arduino with a compatible + sketch installed. + """ + if platform.system() == 'Windows': + ports = enumerate_serial_ports() + elif platform.system() == 'Darwin': + ports = [i[0] for i in list_ports.comports()] + ports = ports[::-1] + else: + ports = glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") + for p in ports: + log.debug('Found {0}, testing...'.format(p)) + try: + sr = serial.Serial(p, baud, timeout=timeout) + except (serial.serialutil.SerialException, OSError) as e: + log.debug(str(e)) + continue + + sr.readline() # wait for board to start up again + + version = get_version(sr) + + if version != libraryVersion: + try: + ver = version[0] + except Exception: + ver = '' + + if ver == 'V' or version == "version": + print("You need to update the version of the Arduino-Python3", + "library running on your Arduino.") + print("The Arduino sketch is", version) + print("The Python installation is", libraryVersion) + print("Flash the prototype sketch again.") + return sr + + # established to be the wrong board + log.debug('Bad version {0}. This is not a Shrimp/Arduino!'.format( + version)) + sr.close() + continue + + log.info('Using port {0}.'.format(p)) + if sr: + return sr + return None + +def get_version(sr): + cmd_str = build_cmd_str("version") + try: + sr.write(str.encode(cmd_str)) + sr.flush() + except Exception: + return None + return sr.readline().decode("utf-8").replace("\r\n", "") + + +class Arduino(object): + + + def __init__(self, baud=115200, port=None, timeout=2, sr=None): + """ + Initializes serial communication with Arduino if no connection is + given. Attempts to self-select COM port, if not specified. + """ + if not sr: + if not port: + sr = find_port(baud, timeout) + if not sr: + raise ValueError("Could not find port.") + else: + sr = serial.Serial(port, baud, timeout=timeout) + sr.readline() # wait til board has rebooted and is connected + + version = get_version(sr) + + if version != libraryVersion: + # check version + try: + ver = version[0] + except Exception: + ver = '' + + if ver == 'V' or version == "version": + print("You need to update the version of the Arduino-Python3", + "library running on your Arduino.") + print("The Arduino sketch is", version) + print("The Python installation is", libraryVersion) + print("Flash the prototype sketch again.") + + sr.flush() + self.sr = sr + self.SoftwareSerial = SoftwareSerial(self) + self.Servos = Servos(self) + self.EEPROM = EEPROM(self) + + def version(self): + return get_version(self.sr) + + def digitalWrite(self, pin, val): + """ + Sends digitalWrite command + to digital pin on Arduino + ------------- + inputs: + pin : digital pin number + val : either "HIGH" or "LOW" + """ + if val.upper() == "LOW": + pin_ = -pin + else: + pin_ = pin + cmd_str = build_cmd_str("dw", (pin_,)) + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + except: + pass + + def analogWrite(self, pin, val): + """ + Sends analogWrite pwm command + to pin on Arduino + ------------- + inputs: + pin : pin number + val : integer 0 (off) to 255 (always on) + """ + if val > 255: + val = 255 + elif val < 0: + val = 0 + cmd_str = build_cmd_str("aw", (pin, val)) + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + except: + pass + + def analogRead(self, pin): + """ + Returns the value of a specified + analog pin. + inputs: + pin : analog pin number for measurement + returns: + value: integer from 1 to 1023 + """ + cmd_str = build_cmd_str("ar", (pin,)) + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + except: + pass + rd = self.sr.readline().decode("utf-8").replace("\r\n", "") + try: + return int(rd) + except: + return 0 + + def pinMode(self, pin, val): + """ + Sets I/O mode of pin + inputs: + pin: pin number to toggle + val: "INPUT" or "OUTPUT" + """ + if val == "INPUT": + pin_ = -pin + else: + pin_ = pin + cmd_str = build_cmd_str("pm", (pin_,)) + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + except: + pass + + def pulseIn(self, pin, val): + """ + Reads a pulse from a pin + + inputs: + pin: pin number for pulse measurement + returns: + duration : pulse length measurement + """ + if val.upper() == "LOW": + pin_ = -pin + else: + pin_ = pin + cmd_str = build_cmd_str("pi", (pin_,)) + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + except: + pass + rd = self.sr.readline().decode("utf-8").replace("\r\n", "") + try: + return float(rd) + except: + return -1 + + def pulseIn_set(self, pin, val, numTrials=5): + """ + Sets a digital pin value, then reads the response + as a pulse width. + Useful for some ultrasonic rangefinders, etc. + + inputs: + pin: pin number for pulse measurement + val: "HIGH" or "LOW". Pulse is measured + when this state is detected + numTrials: number of trials (for an average) + returns: + duration : an average of pulse length measurements + + This method will automatically toggle + I/O modes on the pin and precondition the + measurment with a clean LOW/HIGH pulse. + Arduino.pulseIn_set(pin,"HIGH") is + equivalent to the Arduino sketch code: + + pinMode(pin, OUTPUT); + digitalWrite(pin, LOW); + delayMicroseconds(2); + digitalWrite(pin, HIGH); + delayMicroseconds(5); + digitalWrite(pin, LOW); + pinMode(pin, INPUT); + long duration = pulseIn(pin, HIGH); + """ + if val.upper() == "LOW": + pin_ = -pin + else: + pin_ = pin + cmd_str = build_cmd_str("ps", (pin_,)) + durations = [] + for s in range(numTrials): + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + except: + pass + rd = self.sr.readline().decode("utf-8").replace("\r\n", "") + if rd.isdigit(): + if (int(rd) > 1): + durations.append(int(rd)) + if len(durations) > 0: + duration = int(sum(durations)) / int(len(durations)) + else: + duration = None + + try: + return float(duration) + except: + return -1 + + def close(self): + if self.sr.isOpen(): + self.sr.flush() + self.sr.close() + + def digitalRead(self, pin): + """ + Returns the value of a specified + digital pin. + inputs: + pin : digital pin number for measurement + returns: + value: 0 for "LOW", 1 for "HIGH" + """ + cmd_str = build_cmd_str("dr", (pin,)) + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + except: + pass + rd = self.sr.readline().decode("utf-8").replace("\r\n", "") + try: + return int(rd) + except: + return 0 + + def Melody(self, pin, melody, durations): + """ + Plays a melody. + inputs: + pin: digital pin number for playback + melody: list of tones + durations: list of duration (4=quarter note, 8=eighth note, etc.) + length of melody should be of same + length as length of duration + + Melodies of the following length, can cause trouble + when playing it multiple times. + board.Melody(9,["C4","G3","G3","A3","G3",0,"B3","C4"], + [4,8,8,4,4,4,4,4]) + Playing short melodies (1 or 2 tones) didn't cause + trouble during testing + """ + NOTES = dict( + B0=31, C1=33, CS1=35, D1=37, DS1=39, E1=41, F1=44, FS1=46, G1=49, + GS1=52, A1=55, AS1=58, B1=62, C2=65, CS2=69, D2=73, DS2=78, E2=82, + F2=87, FS2=93, G2=98, GS2=104, A2=110, AS2=117, B2=123, C3=131, + CS3=139, D3=147, DS3=156, E3=165, F3=175, FS3=185, G3=196, GS3=208, + A3=220, AS3=233, B3=247, C4=262, CS4=277, D4=294, DS4=311, E4=330, + F4=349, FS4=370, G4=392, GS4=415, A4=440, + AS4=466, B4=494, C5=523, CS5=554, D5=587, DS5=622, E5=659, F5=698, + FS5=740, G5=784, GS5=831, A5=880, AS5=932, B5=988, C6=1047, + CS6=1109, D6=1175, DS6=1245, E6=1319, F6=1397, FS6=1480, G6=1568, + GS6=1661, A6=1760, AS6=1865, B6=1976, C7=2093, CS7=2217, D7=2349, + DS7=2489, E7=2637, F7=2794, FS7=2960, G7=3136, GS7=3322, A7=3520, + AS7=3729, B7=3951, C8=4186, CS8=4435, D8=4699, DS8=4978) + if (isinstance(melody, list)) and (isinstance(durations, list)): + length = len(melody) + cmd_args = [length, pin] + if length == len(durations): + cmd_args.extend([NOTES.get(melody[note]) + for note in range(length)]) + cmd_args.extend([durations[duration] + for duration in range(len(durations))]) + cmd_str = build_cmd_str("to", cmd_args) + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + except: + pass + cmd_str = build_cmd_str("nto", [pin]) + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + except: + pass + else: + return -1 + else: + return -1 + + def capacitivePin(self, pin): + ''' + Input: + pin (int): pin to use as capacitive sensor + + Use it in a loop! + DO NOT CONNECT ANY ACTIVE DRIVER TO THE USED PIN ! + + the pin is toggled to output mode to discharge the port, + and if connected to a voltage source, + will short circuit the pin, potentially damaging + the Arduino/Shrimp and any hardware attached to the pin. + ''' + cmd_str = build_cmd_str("cap", (pin,)) + self.sr.write(str.encode(cmd_str)) + rd = self.sr.readline().decode("utf-8").replace("\r\n", "") + if rd.isdigit(): + return int(rd) + + def shiftOut(self, dataPin, clockPin, pinOrder, value): + """ + Shift a byte out on the datapin using Arduino's shiftOut() + + Input: + dataPin (int): pin for data + clockPin (int): pin for clock + pinOrder (String): either 'MSBFIRST' or 'LSBFIRST' + value (int): an integer from 0 and 255 + """ + cmd_str = build_cmd_str("so", + (dataPin, clockPin, pinOrder, value)) + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + + def shiftIn(self, dataPin, clockPin, pinOrder): + """ + Shift a byte in from the datapin using Arduino's shiftIn(). + + Input: + dataPin (int): pin for data + clockPin (int): pin for clock + pinOrder (String): either 'MSBFIRST' or 'LSBFIRST' + Output: + (int) an integer from 0 to 255 + """ + cmd_str = build_cmd_str("si", (dataPin, clockPin, pinOrder)) + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + rd = self.sr.readline().decode("utf-8").replace("\r\n", "") + if rd.isdigit(): + return int(rd) + + +class Shrimp(Arduino): + + def __init__(self): + Arduino.__init__(self) + + +class Wires(object): + + """ + Class for Arduino wire (i2c) support + """ + + def __init__(self, board): + self.board = board + self.sr = board.sr + + +class Servos(object): + + """ + Class for Arduino servo support + 0.03 second delay noted + """ + + def __init__(self, board): + self.board = board + self.sr = board.sr + self.servo_pos = {} + + def attach(self, pin, min=544, max=2400): + cmd_str = build_cmd_str("sva", (pin, min, max)) + + while True: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + + rd = self.sr.readline().decode("utf-8").replace("\r\n", "") + if rd: + break + else: + log.debug("trying to attach servo to pin {0}".format(pin)) + position = int(rd) + self.servo_pos[pin] = position + return 1 + + def detach(self, pin): + position = self.servo_pos[pin] + cmd_str = build_cmd_str("svd", (position,)) + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + except: + pass + del self.servo_pos[pin] + + def write(self, pin, angle): + position = self.servo_pos[pin] + cmd_str = build_cmd_str("svw", (position, angle)) + + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + + def writeMicroseconds(self, pin, uS): + position = self.servo_pos[pin] + cmd_str = build_cmd_str("svwm", (position, uS)) + + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + + def read(self, pin): + if pin not in self.servo_pos.keys(): + self.attach(pin) + position = self.servo_pos[pin] + cmd_str = build_cmd_str("svr", (position,)) + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + except: + pass + rd = self.sr.readline().decode("utf-8").replace("\r\n", "") + try: + angle = int(rd) + return angle + except: + return None + + + +class SoftwareSerial(object): + + """ + Class for Arduino software serial functionality + """ + + def __init__(self, board): + self.board = board + self.sr = board.sr + self.connected = False + + def begin(self, p1, p2, baud): + """ + Create software serial instance on + specified tx,rx pins, at specified baud + """ + cmd_str = build_cmd_str("ss", (p1, p2, baud)) + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + except: + pass + response = self.sr.readline().decode("utf-8").replace("\r\n", "") + if response == "ss OK": + self.connected = True + return True + else: + self.connected = False + return False + + def write(self, data): + """ + sends data to existing software serial instance + using Arduino's 'write' function + """ + if self.connected: + cmd_str = build_cmd_str("sw", (data,)) + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + except: + pass + response = self.sr.readline().decode("utf-8").replace("\r\n", "") + if response == "ss OK": + return True + else: + return False + + def read(self): + """ + returns first character read from + existing software serial instance + """ + if self.connected: + cmd_str = build_cmd_str("sr") + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + response = self.sr.readline().decode("utf-8").replace("\r\n", "") + if response: + return response + else: + return False + + +class EEPROM(object): + """ + Class for reading and writing to EEPROM. + """ + + def __init__(self, board): + self.board = board + self.sr = board.sr + + def size(self): + """ + Returns size of EEPROM memory. + """ + cmd_str = build_cmd_str("sz") + + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + response = self.sr.readline().decode("utf-8").replace("\r\n", "") + return int(response) + except: + return 0 + + def write(self, address, value=0): + """ Write a byte to the EEPROM. + + :address: the location to write to, starting from 0 (int) + :value: the value to write, from 0 to 255 (byte) + """ + + if value > 255: + value = 255 + elif value < 0: + value = 0 + cmd_str = build_cmd_str("eewr", (address, value)) + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + except: + pass + + def read(self, adrress): + """ Reads a byte from the EEPROM. + + :address: the location to write to, starting from 0 (int) + """ + cmd_str = build_cmd_str("eer", (adrress,)) + try: + self.sr.write(str.encode(cmd_str)) + self.sr.flush() + response = self.sr.readline().decode("utf-8").replace("\r\n", "") + if response: + return int(response) + except: + return 0 diff --git a/Arduino/prototype.ino b/Arduino/prototype.ino new file mode 100644 index 0000000..9170196 --- /dev/null +++ b/Arduino/prototype.ino @@ -0,0 +1,416 @@ +#include +#include +#include +#include + +void Version(){ + Serial.println(F("V0.6")); +} + + +SoftwareSerial *sserial = NULL; +Servo servos[8]; +int servo_pins[] = {0, 0, 0, 0, 0, 0, 0, 0}; +boolean connected = false; + +int Str2int (String Str_value) +{ + char buffer[10]; //max length is three units + Str_value.toCharArray(buffer, 10); + int int_value = atoi(buffer); + return int_value; +} + +void split(String results[], int len, String input, char spChar) { + String temp = input; + for (int i=0; ibegin(baud_); + Serial.println("ss OK"); +} + +void SS_write(String data) { + int len = data.length()+1; + char buffer[len]; + data.toCharArray(buffer,len); + Serial.println("ss OK"); + sserial->write(buffer); +} +void SS_read(String data) { + char c = sserial->read(); + Serial.println(c); +} + +void pulseInHandler(String data){ + int pin = Str2int(data); + long duration; + if(pin <=0){ + pinMode(-pin, INPUT); + duration = pulseIn(-pin, LOW); + }else{ + pinMode(pin, INPUT); + duration = pulseIn(pin, HIGH); + } + Serial.println(duration); +} + +void pulseInSHandler(String data){ + int pin = Str2int(data); + long duration; + if(pin <=0){ + pinMode(-pin, OUTPUT); + digitalWrite(-pin, HIGH); + delayMicroseconds(2); + digitalWrite(-pin, LOW); + delayMicroseconds(5); + digitalWrite(-pin, HIGH); + pinMode(-pin, INPUT); + duration = pulseIn(-pin, LOW); + }else{ + pinMode(pin, OUTPUT); + digitalWrite(pin, LOW); + delayMicroseconds(2); + digitalWrite(pin, HIGH); + delayMicroseconds(5); + digitalWrite(pin, LOW); + pinMode(pin, INPUT); + duration = pulseIn(pin, HIGH); + } + Serial.println(duration); +} + +void SV_add(String data) { + String sdata[3]; + split(sdata,3,data,'%'); + int pin = Str2int(sdata[0]); + int min = Str2int(sdata[1]); + int max = Str2int(sdata[2]); + int pos = -1; + for (int i = 0; i<8;i++) { + if (servo_pins[i] == pin) { //reset in place + servos[pos].detach(); + servos[pos].attach(pin, min, max); + servo_pins[pos] = pin; + Serial.println(pos); + return; + } + } + for (int i = 0; i<8;i++) { + if (servo_pins[i] == 0) {pos = i;break;} // find spot in servo array + } + if (pos == -1) {;} //no array position available! + else { + servos[pos].attach(pin, min, max); + servo_pins[pos] = pin; + Serial.println(pos); + } +} + +void SV_remove(String data) { + int pos = Str2int(data); + servos[pos].detach(); + servo_pins[pos] = 0; +} + +void SV_read(String data) { + int pos = Str2int(data); + int angle; + angle = servos[pos].read(); + Serial.println(angle); +} + +void SV_write(String data) { + String sdata[2]; + split(sdata,2,data,'%'); + int pos = Str2int(sdata[0]); + int angle = Str2int(sdata[1]); + servos[pos].write(angle); +} + +void SV_write_ms(String data) { + String sdata[2]; + split(sdata,2,data,'%'); + int pos = Str2int(sdata[0]); + int uS = Str2int(sdata[1]); + servos[pos].writeMicroseconds(uS); +} + +void sizeEEPROM() { + Serial.println(E2END + 1); +} + +void EEPROMHandler(int mode, String data) { + String sdata[2]; + split(sdata, 2, data, '%'); + if (mode == 0) { + EEPROM.write(Str2int(sdata[0]), Str2int(sdata[1])); + } else { + Serial.println(EEPROM.read(Str2int(sdata[0]))); + } +} + +void SerialParser(void) { + char readChar[64]; + Serial.readBytesUntil(33,readChar,64); + String read_ = String(readChar); + //Serial.println(readChar); + int idx1 = read_.indexOf('%'); + int idx2 = read_.indexOf('$'); + // separate command from associated data + String cmd = read_.substring(1,idx1); + String data = read_.substring(idx1+1,idx2); + + // determine command sent + if (cmd == "dw") { + DigitalHandler(1, data); + } + else if (cmd == "dr") { + DigitalHandler(0, data); + } + else if (cmd == "aw") { + AnalogHandler(1, data); + } + else if (cmd == "ar") { + AnalogHandler(0, data); + } + else if (cmd == "pm") { + ConfigurePinHandler(data); + } + else if (cmd == "ps") { + pulseInSHandler(data); + } + else if (cmd == "pi") { + pulseInHandler(data); + } + else if (cmd == "ss") { + SS_set(data); + } + else if (cmd == "sw") { + SS_write(data); + } + else if (cmd == "sr") { + SS_read(data); + } + else if (cmd == "sva") { + SV_add(data); + } + else if (cmd == "svr") { + SV_read(data); + } + else if (cmd == "svw") { + SV_write(data); + } + else if (cmd == "svwm") { + SV_write_ms(data); + } + else if (cmd == "svd") { + SV_remove(data); + } + else if (cmd == "version") { + Version(); + } + else if (cmd == "to") { + Tone(data); + } + else if (cmd == "nto") { + ToneNo(data); + } + else if (cmd == "cap") { + readCapacitivePin(data); + } + else if (cmd == "so") { + shiftOutHandler(data); + } + else if (cmd == "si") { + shiftInHandler(data); + } + else if (cmd == "eewr") { + EEPROMHandler(0, data); + } + else if (cmd == "eer") { + EEPROMHandler(1, data); + } + else if (cmd == "sz") { + sizeEEPROM(); + } +} + +void setup() { + Serial.begin(115200); + while (!Serial) { + ; // wait for serial port to connect. Needed for Leonardo only + } + Serial.println("connected"); +} + +void loop() { + SerialParser(); +} diff --git a/ArduinoTest.py b/ArduinoTest.py new file mode 100644 index 0000000..ff6d0a6 --- /dev/null +++ b/ArduinoTest.py @@ -0,0 +1,21 @@ +import time + +from Arduino import Arduino + +print("Searching for Arduino...") +board = Arduino() +print("Arduino found.") +board.pinMode(9, "Output") + +i = 0 +while i <= 4: + print("running: ", i) + board.digitalWrite(9, "HIGH") + time.sleep(0.5) + time.sleep(1) + board.digitalWrite(9, "LOW") + time.sleep(0.5) + time.sleep(1) + i = i + 1 + +board.close() diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..e5c8e54 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2016 Sören Sprößig + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/One_Unit_Test.py b/One_Unit_Test.py new file mode 100644 index 0000000..ebd2afa --- /dev/null +++ b/One_Unit_Test.py @@ -0,0 +1,66 @@ +# import platform +import time as t +import numpy as np +import globals as g +import cage_func as func +from pyps2000b import PS2000B + +# User Inputs/Configuration---------------------------------- +Test1 = 0 +Test2 = 0 +Test3 = 0 +Test4 = 0 + +# Constants: +g.Coil_const = np.array([38.6, 38.45, 37.9])*1e-9 # Coil constants [x,y,z] in T/A +g.ambientField = np.array([80])*1e-6 # ambient magnetic field in measurement area, to be cancelled out +g.resistances = np.array([3.9, 1, 1]) # resistance of [x,y,z] circuits +g.maxWatts = np.array([8, 0, 0]) # max. allowed power for [x,y,z] circuits + +# COM-Ports for power supply units: +xyPort = "COM7" +g.xyDevice = PS2000B.PS2000B(xyPort) + +g.maxAmps = np.sqrt(g.maxWatts/g.resistances) +print(g.maxAmps) + + +def print_status(): + print("Output 1:") + func.print_status(g.xAxis) + print("Output 2:") + func.print_status(g.yAxis) + + +g.xyDevice.enable_all() +func.set_to_zero(g.xyDevice) +print_status() +t.sleep(3) + +if Test1 == 1: + g.xyDevice.voltage1 = 5 + t.sleep(1) + print_status() + t.sleep(5) + func.set_to_zero(g.xyDevice) + +if Test2 == 1: + g.xyDevice.current1 = 0.2 + t.sleep(1) + print_status() + t.sleep(5) + func.set_to_zero(g.xyDevice) + +if Test4 == 1: + func.set_axis_current(g.xyDevice, 0.2) + t.sleep(1) + print_status() + t.sleep(10) + func.set_to_zero(g.xyDevice) + t.sleep(1) + +print_status() + +g.xyDevice.disable_all() + +print_status() diff --git a/README.md b/README.md index 23c891c..b42145c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,68 @@ -# Magnetfeldteststand +# Python-PS2000B +Python library to work with Elektro-Automatik PS 2000 B power supplies via USB. + +## Compatibility +Tested with: + ++ Python 2.7 ++ Python 3.5, 3.6 + +Tested on: + ++ Windows 7 x64 ++ Windows 10 x64 Version 1607 (OS Build 14393.2035) ++ Ubuntu 16.04.1 LTS ++ Ubuntu 20.04.1 LTS + +## Features of Python-PS2000B +### Supported +- read static device information (manufacturer, serial, device type ...) +- read dynamic device information (current, voltage) +- read/write remote control +- read/write output control + +### Still todo +- set voltage and current +- wrap error results in own telegram + +## Prerequisites + +### Python +The following third-party Python libraries are needed: + +* `pyserial` - serial communication library for Python, see https://pypi.python.org/pypi/pyserial + +### Windows +On Windows the USB driver (fetch it from http://www.elektroautomatik.de/files/eautomatik/treiber/usb/ea_device_driver.rar) must be installed. Afterwards you can find the serial port `COMxx` in the *device manager*. + +### Linux +On Linux the device is detected as `/dev/ttyACMx` (abstract control model, see https://www.rfc1149.net/blog/2013/03/05/what-is-the-difference-between-devttyusbx-and-devttyacmx/). Use `dmesg` after connecting the device to find out `x`. + +Most Linuxes will require users to elevate for accessing the device. If you want to access the device as your current user, just add it to the group `dialout` (`ls -lah /dev/ttyACM0` will show you the group to use, usually this is `dialout`) and login again. + +## Usage +```python +import time +from pyps2000b import PS2000B + + +device = PS2000B.PS2000B("/dev/ttyACM0") + +print("Device status: %s" % device.get_device_status_information()) + +device.enable_remote_control() +device.voltage1 = 5.1 +device.current1 = 1 +device.enable_output() + +time.sleep(1) + +print("Current output: %0.2f V , %0.2f A" % (device.get_voltage(), device.get_current())) + +device.output1 = False +``` + +## Documentation ++ product website: http://www.elektroautomatik.de/en/ps2000b.html ++ programming guide PS 2000 B: http://www.elektroautomatik.de/files/eautomatik/treiber/ps2000b/programming_ps2000b.zip diff --git a/README_arduino.md b/README_arduino.md new file mode 100644 index 0000000..903cc6f --- /dev/null +++ b/README_arduino.md @@ -0,0 +1,206 @@ +# Arduino-Python3 Command API + +This API is forked from the original [Python Arduino Command API](https://github.com/thearn/Python-Arduino-Command-API) to add support for Python 3. + +The Arduino-Python3 Command API is a lightweight Python library for +communicating with [Arduino microcontroller boards](http://www.arduino.cc/) from a connected computer using +standard serial IO, either over a physical wire +or wirelessly. It is written using a custom protocol, similar to [Firmata](http://firmata.org/wiki/Main_Page). + +This allows a user to quickly prototype programs for Arduino using Python code, or to +simply read/control/troubleshoot/experiment +with hardware connected to an Arduino board without ever having to recompile and reload sketches to the board itself. + +Method names within the Arduino-Python3 Command API are designed to be as close +as possible to their Arduino programming language counterparts + +## Simple usage example (LED blink) +```python +#!/usr/bin/env python +""" + Blinks an LED on digital pin 13 + in 1 second intervals +""" + +from Arduino import Arduino +import time + +board = Arduino() # plugged in via USB, serial com at rate 115200 +board.pinMode(13, "OUTPUT") + +while True: + board.digitalWrite(13, "LOW") + time.sleep(1) + board.digitalWrite(13, "HIGH") + time.sleep(1) +``` + +## Requirements: +- [Python](http://python.org/) 3.7 tested on Windows and macOS. +- [pyserial](http://pyserial.sourceforge.net/) 2.6 or higher +- Any [Arduino compatible microcontroller](https://www.sparkfun.com/categories/242) with at least 14KB of flash memory + +## Installation: +Either run `pip install arduino-python3` from a command line, or run `python setup.py +build install` from the source directory to install this library. + +## Setup: +1. Verify that your Arduino board communicates at the baud rate specified in the +`setup()` function (line 407) in `prototype.ino`. Change it there if necessary. +2. Load the `prototype.ino` sketch onto your Arduino board, using the Arduino IDE. +3. Set up some kind of serial I/O communication between the Arduino board and your computer (via physical USB cable, +Bluetooth, xbee, etc. + associated drivers) +4. Add `from Arduino import Arduino` into your python script to communicate with your Arduino + +For a collection of examples, see `examples.py`. This file contains methods which replicate +the functionality of many Arduino demo sketches. + +## Testing: +The `tests` directory contains some basic tests for the library. Extensive code coverage is a bit difficult to expect for every release, since a positive test involves actually +connecting and issuing commands to a live Arduino, hosting any hardware +required to test a particular function. But a core of basic communication tests +should at least be maintained here and used before merging into the `master` branch. + +After installation, the interactive tests can be run from the source directory: +```bash +$ python tests/test_main.py +``` + +Automated tests can be run from the source directory with: +```bash +$ python tests/test_arduino.py +``` + +## Classes +- `Arduino(baud)` - Set up communication with currently connected and powered +Arduino. + +```python +board = Arduino("115200") #Example +``` + +The device name / COM port of the connected Arduino will be auto-detected. +If there are more than one Arduino boards connected, +the desired COM port can be also be passed as an optional argument: + +```python +board = Arduino("115200", port="COM3") #Windows example +``` +```python +board = Arduino("115200", port="/dev/tty.usbmodemfa141") #OSX example +``` + +A time-out for reading from the Arduino can also be specified as an optional +argument: + +```python +board = Arduino("115200", timeout=2) #Serial reading functions will +#wait for no more than 2 seconds +``` + +## Methods + +**Digital I/O** + +- `Arduino.digitalWrite(pin_number, state)` turn digital pin on/off +- `Arduino.digitalRead(pin_number)` read state of a digital pin + +```python +#Digital read / write example +board.digitalWrite(13, "HIGH") #Set digital pin 13 voltage +state_1 = board.digitalRead(13) #Will return integer 1 +board.digitalWrite(13, "LOW") #Set digital pin 13 voltage +state_2 = board.digitalRead(13) #Will return integer 0 +``` + +- `Arduino.pinMode(pin_number, io_mode)` set pin I/O mode +- `Arduino.pulseIn(pin_number, state)` measures a pulse +- `Arduino.pulseIn_set(pin_number, state)` measures a pulse, with preconditioning + +```python +#Digital mode / pulse example +board.pinMode(7, "INPUT") #Set digital pin 7 mode to INPUT +duration = board.pulseIn(7, "HIGH") #Return pulse width measurement on pin 7 +``` + +**Analog I/O** + +- `Arduino.analogRead(pin_number)` returns the analog value +- `Arduino.analogWrite(pin_number, value)` sets the analog value + +```python +#Analog I/O examples +val=board.analogRead(5) #Read value on analog pin 5 (integer 0 to 1023) +val = val / 4 # scale to 0 - 255 +board.analogWrite(11) #Set analog value (PWM) based on analog measurement +``` + +**Shift Register** + +- `Arduino.shiftIn(dataPin, clockPin, bitOrder)` shift a byte in and returns it +- `Arduino.shiftOut(dataPin, clockPin, bitOrder, value)` shift the given byte out + +`bitOrder` should be either `"MSBFIRST"` or `"LSBFIRST"` + +**Servo Library Functionality** +Support is included for up to 8 servos. + +- `Arduino.Servos.attach(pin, min=544, max=2400)` Create servo instance. Only 8 servos can be used at one time. +- `Arduino.Servos.read(pin)` Returns the angle of the servo attached to the specified pin +- `Arduino.Servos.write(pin, angle)` Move an attached servo on a pin to a specified angle +- `Arduino.Servos.writeMicroseconds(pin, uS)` Write a value in microseconds to the servo on a specified pin +- `Arduino.Servos.detach(pin)` Detaches the servo on the specified pin + +```python +#Servo example +board.Servos.attach(9) #declare servo on pin 9 +board.Servos.write(9, 0) #move servo on pin 9 to 0 degrees +print board.Servos.read(9) # should be 0 +board.Servos.detach(9) #free pin 9 +``` + +**Software Serial Functionality** + +- `Arduino.SoftwareSerial.begin(ss_rxPin, ss_txPin, ss_device_baud)` initialize software serial device on +specified pins. +Only one software serial device can be used at a time. Existing software serial instance will +be overwritten by calling this method, both in Python and on the Arduino board. +- `Arduino.SoftwareSerial.write(data)` send data using the Arduino 'write' function to the existing software +serial connection. +- `Arduino.SoftwareSerial.read()` returns one byte from the existing software serial connection + +```python +#Software serial example +board.SoftwareSerial.begin(0, 7, "19200") # Start software serial for transmit only (tx on pin 7) +board.SoftwareSerial.write(" test ") #Send some data +response_char = board.SoftwareSerial.read() #read response character +``` + +**EEPROM** + +- `Arduino.EEPROM.read(address)` reads a byte from the EEPROM +- `Arduino.EEPROM.write(address, value)` writes a byte to the EEPROM +- `Arduino.EEPROM.size()` returns size of the EEPROM + +```python +#EEPROM read and write examples +location = 42 +value = 10 # 0-255(byte) + +board.EEPROM.write(location, 10) +print(board.EEPROM.read(location)) +print('EEPROM size {size}'.format(size=board.EEPROM.size())) +``` + + +**Misc** + +- `Arduino.close()` closes serial connection to the Arduino. + +## To-do list: +- Expand software serial functionality (`print()` and `println()`) +- Add simple reset functionality that zeros out all pin values +- Add I2C / TWI function support (Arduino `Wire.h` commands) +- Include a wizard which generates 'prototype.ino' with selected serial baud rate and Arduino function support +(to help reduce memory requirements). +- Multi-serial support for Arduino mega (`Serial1.read()`, etc) diff --git a/cage_func.py b/cage_func.py new file mode 100644 index 0000000..3ccbb51 --- /dev/null +++ b/cage_func.py @@ -0,0 +1,88 @@ +# import time + +from pyps2000b import PS2000B +import globals as g + + +def set_devices(): # creates device objects for all PSUs + g.xyDevice = PS2000B.PS2000B(g.xyPort) + g.zDevice = PS2000B.PS2000B(g.zPort) + g.xAxis = (g.xyDevice, 0, g.relayPins[0], 0) # (device, channel, arduino pin, axis index) + g.yAxis = (g.xyDevice, 1, g.relayPins[1], 1) + g.zAxis = (g.zDevice, 0, g.relayPins[2], 2) + + +def activate_all(): # enables remote control and output on all PSUs and channels + g.xyDevice.enable_all() + g.zDevice.enable_all() + + +def deactivate_all(): # disables remote control and output on all PSUs and channels + g.xyDevice.disable_all() + g.zDevice.disable_all() + + +def safe_arduino(): # sets output pins to low and closes serial connection + for pin in g.relayPins: + g.arduino.digitalWrite(pin, "LOW") + g.arduino.close() + + +def print_status(axis): # axis as (device, channel), e.g. g.xAxis + device = axis[0] # PSU + channel = axis[1] # output channel on the PSU + print("%s, %0.2f V, %0.2f A" + % (device.get_device_status_information(channel), device.get_voltage(channel), device.get_current(channel))) + + +def print_status_3(): + print("X-Axis:") + print_status(g.xAxis) + print("Y-Axis:") + print_status(g.yAxis) + print("Z-Axis:") + print_status(g.zAxis) + + +def set_to_zero(device): + device.voltage1 = 0 + device.current1 = 0 + device.current2 = 0 + device.current2 = 0 + + +def set_field_simple(vector): # forms magnetic field as specified by vector, w/o cancelling ambient field + i_vec = vector/g.Coil_const + set_current_vec(i_vec) + + +def set_field(vector): # forms magnetic field as specified by vector, corrected for ambient field + field = vector - g.ambientField + i_vec = field/g.Coil_const + set_current_vec(i_vec) + + +def set_current_vec(i_vec): # sets needed currents on each axis for given vector + set_axis_current(g.xAxis, i_vec[0]) + set_axis_current(g.yAxis, i_vec[1]) + set_axis_current(g.zAxis, i_vec[2]) + + +def set_axis_current(axis, value): # sets current with correct polarity on one axis + device = axis[0] + channel = axis[1] + ardPin = axis[2] + axisIndex = axis[3] + if abs(value) > g.maxAmps[axisIndex]: # prevent excessive currents + set_to_zero(device) # set currents and voltages to 0 + device.disable_all() # disable outputs on PSU + safe_arduino() # set arduino pins to low and close serial link + raise ValueError("Invalid current value. Tried %0.2fA, max. %0.2fA allowed" % (value, g.maxAmps[axisIndex])) + elif value >= 0: # switch polarity as needed + g.arduino.digitalWrite(ardPin, "LOW") + elif value < 0: + g.arduino.digitalWrite(ardPin, "HIGH") + else: + raise Exception("This should be impossible.") + device.set_current(abs(value), channel) + device.set_voltage(1.1 * g.maxAmps[axisIndex] * g.resistances[axisIndex]) # set max voltage high enough diff --git a/example.py b/example.py new file mode 100644 index 0000000..8ff8f89 --- /dev/null +++ b/example.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +# coding=utf-8 +import platform +import time + +from pyps2000b import PS2000B + + +DEVICE = "COM7" if platform.system() == "Windows" else "/dev/ttyACM0" + +# connection to the device is automatically opened +print("Connecting to device at %s..." % DEVICE) +device = PS2000B.PS2000B(DEVICE) # create Object of class PS2000B, pass COM-port to functions inside + +# static device information can be read +print("Connection open: %s" % device.is_open()) +print("Device: %s" % device.get_device_information()) + +# dynamic device status information can be read +device_status_info1 = device.get_device_status_information(0) +device_status_info2 = device.get_device_status_information(1) +print("Device status 1: %s" % device_status_info1) +print("Device status 2: %s" % device_status_info2) + +print("Current output: %0.2f V , %0.2f A" % (device.get_voltage(), device.get_current())) + +# device can be controlled +if not device_status_info.remote_control_active: + print("...will enable remote control...") + device.enable_remote_control() + +print("...set voltage to 12V and max current to 1A...") +device.voltage1 = 12 +device.current1 = 1 +time.sleep(10) +print("...now enabling the power output control...") +device.enable_output(0) +time.sleep(2) +device_status_info1 = device.get_device_status_information(0) +device_status_info2 = device.get_device_status_information(1) +print("Device status 1: %s" % device_status_info1) +print("Device status 2: %s" % device_status_info2) +print("Current output: %0.2f V , %0.2f A" % (device.voltage1, device.current1)) +time.sleep(10) +print("...set voltage to 5.1V...") +device.voltage1 = 5.1 +time.sleep(10) +print("Current output: %0.2f V , %0.2f A" % (device.get_voltage(), device.get_current())) +print("...after 5 seconds power output will be disabled again ...") +time.sleep(5) +device.disable_output() +print("...and disabling remote control again.") +device.disable_remote_control() + +print("Device status 1: %s" % device.get_device_status_information(0)) +print("Device status 2: %s" % device.get_device_status_information(1)) diff --git a/example2.py b/example2.py new file mode 100644 index 0000000..009b52d --- /dev/null +++ b/example2.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +# coding=utf-8 +import platform +import time + +from pyps2000b import PS2000B + + +DEVICE = "COM7" if platform.system() == "Windows" else "/dev/ttyACM0" + +# connection to the device is automatically opened +print("Connecting to device at %s..." % DEVICE) +xyDevice = PS2000B.PS2000B(DEVICE) # create Object of class PS2000B, pass COM-port and channel to functions inside + +# static device information can be read +print("Connection open: %s" % xyDevice.is_open()) +print("Device: %s" % xyDevice.get_device_information()) + +# dynamic device status information can be read +device_status_info1 = xyDevice.get_device_status_information(0) +device_status_info2 = xyDevice.get_device_status_information(1) +print("Device status 1: %s" % xyDevice.get_device_status_information(0)) +print("Device status 2: %s" % xyDevice.get_device_status_information(1)) +print("Current output 1: %0.2f V , %0.2f A" % (xyDevice.voltage1, xyDevice.current1)) +print("Current output 2: %0.2f V , %0.2f A" % (xyDevice.voltage2, xyDevice.current2)) + +# device can be controlled +if not device_status_info1.remote_control_active: + print("...will enable remote control...") + xyDevice.enable_remote_control(0) +if not device_status_info2.remote_control_active: + print("...will enable remote control...") + xyDevice.enable_remote_control(1) + +print("...set voltage 1 to 12V and max current to 1A...") +xyDevice.voltage1 = 12 +xyDevice.current1 = 1 +time.sleep(2) +print("...now enabling the power output control 1...") +xyDevice.enable_output(0) + +time.sleep(2) +print("... set voltage 2 to 5V and max current to 1A...") +xyDevice.voltage2 = 5 +xyDevice.current2 = 1 +time.sleep(2) +print("...now enabling the power output control 2...") +xyDevice.enable_output(1) + +time.sleep(5) +print("Device status 1: %s" % xyDevice.get_device_status_information(0)) +print("Device status 2: %s" % xyDevice.get_device_status_information(1)) +print("Output 1: %0.2f V , %0.2f A" % (xyDevice.voltage1, xyDevice.current1)) +print("Output 2: %0.2f V , %0.2f A" % (xyDevice.voltage2, xyDevice.current2)) + +time.sleep(5) +xyDevice.disable_output(0) +xyDevice.disable_output(1) +print("...and disabling remote control again.") +xyDevice.disable_remote_control(0) +xyDevice.disable_remote_control(1) + +print("Device status 1: %s" % xyDevice.get_device_status_information(0)) +print("Device status 2: %s" % xyDevice.get_device_status_information(1)) diff --git a/globals.py b/globals.py new file mode 100644 index 0000000..8d67e02 --- /dev/null +++ b/globals.py @@ -0,0 +1,21 @@ +global Coil_const +global ambientField + +global xyPort +global zPort + +global xyDevice +global zDevice + +global xAxis +global yAxis +global zAxis + +global resistances +global maxAmps +global maxWatts + +global arduino + +relayPins = [1, 2, 3] # digital pin on the Arduino for switching relay of each axis [x,y,z] + diff --git a/main.py b/main.py new file mode 100644 index 0000000..b63a8b8 --- /dev/null +++ b/main.py @@ -0,0 +1,36 @@ +# import platform +# import time +import numpy as np +import globals as g +import cage_func as func +# from pyps2000b import PS2000B +from Arduino import Arduino + +# User Inputs/Configuration---------------------------------- +# Desired output: +mag_vec1 = np.array([10, 10, 5])*1e-6 + +# Constants: +g.Coil_const = np.array([38.6, 38.45, 37.9])*1e-9 # Coil constants [x,y,z] in T/A +g.ambientField = np.array([80])*1e-6 # ambient magnetic field in measurement area, to be cancelled out +g.resistances = np.array([3.9, 1, 1]) # resistance of [x,y,z] circuits +g.maxWatts = np.array([8, 0, 0]) # max. allowed power for [x,y,z] circuits + +# COM-Ports for power supply units: +g.xyPort = "COM1" # placeholders +g.zPort = "COM1" + +# Code starts here------------------------------------------ +g.maxAmps = np.sqrt(g.maxWatts/g.resistances) # calculate maximum currents in each axis + +print("Connecting to PSUs...") +func.set_devices() # initiate communication, set handles +print("Connecting to Arduino...") +g.arduino = Arduino() # search for connected arduino and set handle +print("Arduino found.") +print("Activating PSU outputs...") +func.activate_all() # activate remote control and outputs on PSUs + +func.set_field_simple(mag_vec1) + +func.deactivate_all() diff --git a/pyps2000b/PS2000B.py b/pyps2000b/PS2000B.py new file mode 100644 index 0000000..1df26fe --- /dev/null +++ b/pyps2000b/PS2000B.py @@ -0,0 +1,407 @@ +#!/usr/bin/env python3 +# coding=utf-8 +# Python access to Elektro Automatik PS 2000 B devices via USB/serial +# +# Supported features: +# - read static device information (manufacturer, serial, device type ...) +# - read dynamic device information (current, voltage) +# - read/write remote control +# - read/write output control +# +# - wrap error results in own telegram +# +# References +# [1] = "PS 2000B Programming Guide" from 2015-05-28 +# [2] = "PS 2000B object list" +# + +import serial +import struct +import sys +PY_3 = sys.version_info >= (3, 0) + +# noinspection SpellCheckingInspection +__author__ = "Sören Sprößig " + + +def as_string(raw_data): + """Converts the given raw bytes to a string (removes NULL)""" + return bytearray(raw_data[:-1]) + + +def as_float(raw_data): + """Converts the given raw bytes to a float""" + f = struct.unpack_from(">f", bytearray(raw_data))[0] + return f + + +def as_word(raw_data): + """Converts the given raw bytes to a word""" + w = struct.unpack_from(">H", bytearray(raw_data))[0] + return w + + +def _ord(x): + """Wrap ord() call as we only need it in Python 2""" + return x if PY_3 else ord(x) + + +# noinspection PyClassHasNoInit +class Constants: + """Communication constants""" + # communication parameters taken from [1], chapter 2.2 + CONNECTION_BAUD_RATE = 115200 + CONNECTION_PARITY = serial.PARITY_ODD + CONNECTION_STOP_BITS = 1 + # timeout taken from [1], chapter 3.7 + TIMEOUT_BETWEEN_COMMANDS = 0.05 + # according to spec [1] 2.4: + # maximum length of a telegram is 21 bytes (Byte 0..20) + MAX_LEN_IN_BYTES = 21 + + +# noinspection PyClassHasNoInit +class Objects: + """Supported objects ids / commands""" + DEVICE_TYPE = 0 + DEVICE_SERIAL_NO = 1 + NOMINAL_VOLTAGE = 2 + NOMINAL_CURRENT = 3 + NOMINAL_POWER = 4 + DEVICE_ARTICLE_NO = 6 + MANUFACTURER = 8 + SOFTWARE_VERSION = 9 + SET_VALUE_VOLTAGE = 50 + SET_VALUE_CURRENT = 51 + POWER_SUPPLY_CONTROL = 54 + STATUS_ACTUAL_VALUES = 71 + + +# noinspection PyClassHasNoInit +class ControlParam: + """Parameters for controlling the device""" + SWITCH_MODE_CMD = 0x10 + SWITCH_MODE_REMOTE = 0x10 + SWITCH_MODE_MANUAL = 0x00 + SWITCH_POWER_OUTPUT_CMD = 0x1 + SWITCH_POWER_OUTPUT_ON = 0x1 + SWITCH_POWER_OUTPUT_OFF = 0x0 + + +class Telegram: + """Base class of a PS2000B telegram - basically allows accessing the raw bytes and does checksum calculation""" + + def __init__(self): + self._bytes = [] + self._checksum = [] + self.checksum_ok = False + + def _calc_checksum(self): + cs = 0 + + for b in self._bytes: + cs += b + + cs_high = (cs & 0xff00) >> 8 + cs_low = cs & 0xff + + return [cs_high, cs_low] + + @staticmethod + def _get_start_delimiter(transmission, expected_data_length): + result = 0b00000000 + + if expected_data_length > 16: + raise Exception("only 4 bits for expected length can be used") + + result |= (expected_data_length - 1) + result |= 0b10000 + result |= 0b100000 + result |= transmission << 6 + return result + + def get_byte_array(self): + return bytearray(self._bytes + self._checksum) + + +class FromPowerSupply(Telegram): + """Telegram received from the power supply""" + + def __init__(self, raw_data): + Telegram.__init__(self) + data = [_ord(x) for x in raw_data] + self._bytes = data[0:-2] + self._checksum = data[len(data) - 2:len(data)] + self.checksum_ok = self._checksum == self._calc_checksum() + + def get_sd(self): + return self._bytes[0] + + def get_device_node(self): + return self._bytes[1] + + def get_object(self): + return self._bytes[3] + + def get_data(self): + return self._bytes[3:len(self._bytes)] + + # noinspection PyMethodMayBeStatic + def get_error(self): + # ToDo: [1] chapter 3.6 add support for error codes here + return None + + +class ToPowerSupply(Telegram): + """A telegram sent to the power supply""" + + def __init__(self, transmission, data, expected_data_length): + Telegram.__init__(self) + self._bytes = [] + self._bytes.append(self._get_start_delimiter(transmission, expected_data_length)) + self._bytes.extend(data) + self._checksum = self._calc_checksum() + self.checksum_ok = True + + +class DeviceInformation: + """A class carrying all static device information read from the device""" + + def __init__(self): + self.device_type = "" + self.device_serial_no = "" + self.nominal_voltage = 0 + self.nominal_current = 0 + self.nominal_power = 0 + self.manufacturer = "" + self.device_art_no = "" + self.software_version = "" + + def __str__(self): + return "%s %s [%s], SW: %s, Art-Nr: %s, [%0.2f V, %0.2f A, %0.2f W]" % \ + (self.manufacturer, + self.device_type, self.device_serial_no, + self.software_version, self.device_art_no, + self.nominal_voltage, self.nominal_current, self.nominal_power) + + +class DeviceStatusInformation: + """A class carrying all dynamic device status information""" + + def __init__(self, raw_data): + self.remote_control_active = raw_data[0] & 0b1 + self.output_active = raw_data[1] & 0b1 + self.actual_voltage_percent = float(as_word(raw_data[2:4])) / 256 + self.actual_current_percent = float(as_word(raw_data[4:6])) / 256 + + def __str__(self): + if self.remote_control_active == 1: + remote = "active" + else: + remote = "inactive" + if self.output_active == 1: + output = "active" + else: + output = "inactive" + return "Remote control %s, Output %s" % (remote, output) + + +class PS2000B: + """PS 2000 B main communication class""" + + def __init__(self, serial_port): + self.__device_status_information1 = None + self.__device_status_information2 = None + self.__serial = serial.Serial(serial_port, + baudrate=Constants.CONNECTION_BAUD_RATE, + timeout=Constants.TIMEOUT_BETWEEN_COMMANDS * 2, + parity=serial.PARITY_ODD, + stopbits=Constants.CONNECTION_STOP_BITS) + + self.__device_information = self.__read_device_information() + + def is_open(self): + return self.__serial.is_open + + def get_device_information(self): + return self.__device_information + + def __read_device_information(self, channel=0): # reads static device information, usually not channel dependant + result = DeviceInformation() + + # taken from [2] + result.device_type = as_string(self.__read_device_data(16, Objects.DEVICE_TYPE, channel).get_data()) + result.device_serial_no = as_string(self.__read_device_data(16, Objects.DEVICE_SERIAL_NO, channel).get_data()) + result.nominal_voltage = as_float(self.__read_device_data(4, Objects.NOMINAL_VOLTAGE, channel).get_data()) + result.nominal_current = as_float(self.__read_device_data(4, Objects.NOMINAL_CURRENT, channel).get_data()) + result.nominal_power = as_float(self.__read_device_data(4, Objects.NOMINAL_POWER, channel).get_data()) + result.device_art_no = as_string(self.__read_device_data(16, Objects.DEVICE_ARTICLE_NO, channel).get_data()) + result.manufacturer = as_string(self.__read_device_data(16, Objects.MANUFACTURER, channel).get_data()) + result.software_version = as_string(self.__read_device_data(16, Objects.SOFTWARE_VERSION, channel).get_data()) + + return result + + def __read_device_data(self, expected_length, object_id, channel): # reads data from device based on object_id + telegram = ToPowerSupply(0b01, [channel, object_id], expected_length) + result = self.__send_and_receive(telegram.get_byte_array()) + return result + + def __send_and_receive(self, raw_bytes): # sends request for info to device and reads reply + self.__serial.write(raw_bytes) + result = FromPowerSupply(self.__serial.read(Constants.MAX_LEN_IN_BYTES)) + return result + + def get_device_status_information(self, channel): # gets dynamic device information (e.g. current, voltage) + if channel == 0: + if self.__device_status_information1 is None: + self.update_device_information(0) + info = self.__device_status_information1 + elif channel == 1: + if self.__device_status_information2 is None: + self.update_device_information(1) + info = self.__device_status_information2 + else: + raise ValueError("Invalid Channel") + return info + + def update_device_information(self, channel): # updates dynamic device info stored in __device_status_information + telegram = ToPowerSupply(0b01, [channel, Objects.STATUS_ACTUAL_VALUES], 6) + device_information = self.__send_and_receive(telegram.get_byte_array()) + if channel == 0: + self.__device_status_information1 = DeviceStatusInformation(device_information.get_data()) + elif channel == 1: + self.__device_status_information2 = DeviceStatusInformation(device_information.get_data()) + else: + raise ValueError("Invalid Channel") + + def __send_device_control(self, p1, p2, channel): # sends commands to PSU, commands given in p1, p2 + telegram = ToPowerSupply(0b11, [channel, Objects.POWER_SUPPLY_CONTROL, p1, p2], 2) + _ = self.__send_and_receive(telegram.get_byte_array()) # send command to PSU + self.update_device_information(channel) # update info after change + + def __send_device_data(self, obj, data, channel): + # Send integer data with obj-id to the PSU + + telegram = ToPowerSupply(0b11, [channel, obj, data >> 8, data & 0xff], 4) + _ = self.__send_and_receive(telegram.get_byte_array()) + self.update_device_information(channel) + + def enable_all(self): + self.enable_remote_control(0) + self.enable_remote_control(1) + self.enable_output(0) + self.enable_output(1) + + def disable_all(self): + self.disable_output(0) + self.disable_output(1) + self.disable_remote_control(0) + self.disable_remote_control(1) + + def enable_remote_control(self, channel): + self.__send_device_control(ControlParam.SWITCH_MODE_CMD, ControlParam.SWITCH_MODE_REMOTE, channel) + + def disable_remote_control(self, channel): + self.__send_device_control(ControlParam.SWITCH_MODE_CMD, ControlParam.SWITCH_MODE_MANUAL, channel) + + def enable_output(self, channel): + self.__send_device_control(ControlParam.SWITCH_POWER_OUTPUT_CMD, ControlParam.SWITCH_POWER_OUTPUT_ON, channel) + + def disable_output(self, channel): + self.__send_device_control(ControlParam.SWITCH_POWER_OUTPUT_CMD, ControlParam.SWITCH_POWER_OUTPUT_OFF, channel) + + @property + def output1(self): # object controlling output 1 on/off + return self.get_device_status_information(0).output_active + + @output1.setter + def output1(self, value): + if value: + self.enable_output(0) + else: + self.disable_output(0) + + @property + def output2(self): # object controlling output 2 on/off + return self.get_device_status_information(1).output_active + + @output2.setter + def output2(self, value): + if value: + self.enable_output(1) + else: + self.disable_output(1) + + def get_voltage(self, channel): + self.update_device_information(channel) + if channel == 0: + v_perc = self.__device_status_information1.actual_voltage_percent + elif channel == 1: + v_perc = self.__device_status_information2.actual_voltage_percent + else: + raise ValueError("Invalid channel") + voltage = self.__device_information.nominal_voltage * v_perc + return voltage / 100 + + def get_voltage_setpoint(self, channel): + res = self.__read_device_data(2, Objects.SET_VALUE_VOLTAGE, channel).get_data() + return self.__device_information.nominal_voltage * ((res[0] << 8) + res[1]) / 25600.0 + + def set_voltage(self, value, channel): + self.update_device_information(channel) + self.enable_remote_control(channel) + volt = int(round((value * 25600.0) / self.__device_information.nominal_voltage)) + self.__send_device_data(Objects.SET_VALUE_VOLTAGE, volt, channel) + + @property + def voltage1(self): # object storing and controlling the voltage of channel 1 + return self.get_voltage(0) + + @voltage1.setter + def voltage1(self, value): # voltage of channel 1 + self.set_voltage(value, 0) + + @property + def voltage2(self): # object storing and controlling the voltage of channel 2 + return self.get_voltage(1) + + @voltage2.setter + def voltage2(self, value): + self.set_voltage(value, 1) + + def get_current(self, channel): + self.update_device_information(channel) + if channel == 0: + c_perc = self.__device_status_information1.actual_current_percent + elif channel == 1: + c_perc = self.__device_status_information2.actual_current_percent + else: + raise ValueError("Invalid channel") + current = self.__device_information.nominal_current * c_perc + return current / 100 + + def get_current_setpoint(self, channel): + res = self.__read_device_data(2, Objects.SET_VALUE_CURRENT, channel).get_data() + return self.__device_information.nominal_current * ((res[0] << 8) + res[1]) / 25600.0 + + def set_current(self, value, channel): + self.update_device_information(channel) + self.enable_remote_control(channel) + curr = int(round((value * 25600.0) / self.__device_information.nominal_current)) + self.__send_device_data(Objects.SET_VALUE_CURRENT, curr, channel) + + @property + def current1(self): + return self.get_current(0) + + @current1.setter + def current1(self, value): # current of channel 1 + self.set_current(value, 0) + + @property + def current2(self): + return self.get_current(1) + + @current2.setter + def current2(self, value): # current of channel 2 + self.set_current(value, 1) diff --git a/pyps2000b/__init__.py b/pyps2000b/__init__.py new file mode 100644 index 0000000..85506da --- /dev/null +++ b/pyps2000b/__init__.py @@ -0,0 +1,4 @@ +#!/usr/bin/python +# coding=utf-8 +# noinspection SpellCheckingInspection +__author__ = "Sören Sprößig "