Merge branch 'Development_zietz'

This commit is contained in:
Martin Zietz
2020-12-10 12:46:25 +01:00
16 changed files with 2228 additions and 1 deletions
+96
View File
@@ -0,0 +1,96 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# IPython Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# dotenv
.env
# virtualenv
.venv/
venv/
ENV/
# Spyder project settings
.spyderproject
# Rope project settings
.ropeproject
# PyCharm
.idea
# VScode
.vscode/
+2
View File
@@ -0,0 +1,2 @@
name="arduino-python3"
from .arduino import Arduino, Shrimp
+657
View File
@@ -0,0 +1,657 @@
#!/usr/bin/env python
import logging
import itertools
import platform
import serial
import time
from serial.tools import list_ports
import sys
if sys.platform.startswith('win'):
import winreg
else:
import glob
libraryVersion = 'V0.6'
log = logging.getLogger(__name__)
def enumerate_serial_ports():
"""
Uses the Win32 registry to return a iterator of serial
(COM) ports existing on this computer.
"""
path = 'HARDWARE\\DEVICEMAP\\SERIALCOMM'
try:
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, path)
except OSError:
raise Exception
for i in itertools.count():
try:
val = winreg.EnumValue(key, i)
yield (str(val[1])) # , str(val[0]))
except EnvironmentError:
break
def build_cmd_str(cmd, args=None):
"""
Build a command string that can be sent to the arduino.
Input:
cmd (str): the command to send to the arduino, must not
contain a % character
args (iterable): the arguments to send to the command
@TODO: a strategy is needed to escape % characters in the args
"""
if args:
args = '%'.join(map(str, args))
else:
args = ''
return "@{cmd}%{args}$!".format(cmd=cmd, args=args)
def find_port(baud, timeout):
"""
Find the first port that is connected to an arduino with a compatible
sketch installed.
"""
if platform.system() == 'Windows':
ports = enumerate_serial_ports()
elif platform.system() == 'Darwin':
ports = [i[0] for i in list_ports.comports()]
ports = ports[::-1]
else:
ports = glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
for p in ports:
log.debug('Found {0}, testing...'.format(p))
try:
sr = serial.Serial(p, baud, timeout=timeout)
except (serial.serialutil.SerialException, OSError) as e:
log.debug(str(e))
continue
sr.readline() # wait for board to start up again
version = get_version(sr)
if version != libraryVersion:
try:
ver = version[0]
except Exception:
ver = ''
if ver == 'V' or version == "version":
print("You need to update the version of the Arduino-Python3",
"library running on your Arduino.")
print("The Arduino sketch is", version)
print("The Python installation is", libraryVersion)
print("Flash the prototype sketch again.")
return sr
# established to be the wrong board
log.debug('Bad version {0}. This is not a Shrimp/Arduino!'.format(
version))
sr.close()
continue
log.info('Using port {0}.'.format(p))
if sr:
return sr
return None
def get_version(sr):
cmd_str = build_cmd_str("version")
try:
sr.write(str.encode(cmd_str))
sr.flush()
except Exception:
return None
return sr.readline().decode("utf-8").replace("\r\n", "")
class Arduino(object):
def __init__(self, baud=115200, port=None, timeout=2, sr=None):
"""
Initializes serial communication with Arduino if no connection is
given. Attempts to self-select COM port, if not specified.
"""
if not sr:
if not port:
sr = find_port(baud, timeout)
if not sr:
raise ValueError("Could not find port.")
else:
sr = serial.Serial(port, baud, timeout=timeout)
sr.readline() # wait til board has rebooted and is connected
version = get_version(sr)
if version != libraryVersion:
# check version
try:
ver = version[0]
except Exception:
ver = ''
if ver == 'V' or version == "version":
print("You need to update the version of the Arduino-Python3",
"library running on your Arduino.")
print("The Arduino sketch is", version)
print("The Python installation is", libraryVersion)
print("Flash the prototype sketch again.")
sr.flush()
self.sr = sr
self.SoftwareSerial = SoftwareSerial(self)
self.Servos = Servos(self)
self.EEPROM = EEPROM(self)
def version(self):
return get_version(self.sr)
def digitalWrite(self, pin, val):
"""
Sends digitalWrite command
to digital pin on Arduino
-------------
inputs:
pin : digital pin number
val : either "HIGH" or "LOW"
"""
if val.upper() == "LOW":
pin_ = -pin
else:
pin_ = pin
cmd_str = build_cmd_str("dw", (pin_,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
def analogWrite(self, pin, val):
"""
Sends analogWrite pwm command
to pin on Arduino
-------------
inputs:
pin : pin number
val : integer 0 (off) to 255 (always on)
"""
if val > 255:
val = 255
elif val < 0:
val = 0
cmd_str = build_cmd_str("aw", (pin, val))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
def analogRead(self, pin):
"""
Returns the value of a specified
analog pin.
inputs:
pin : analog pin number for measurement
returns:
value: integer from 1 to 1023
"""
cmd_str = build_cmd_str("ar", (pin,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
try:
return int(rd)
except:
return 0
def pinMode(self, pin, val):
"""
Sets I/O mode of pin
inputs:
pin: pin number to toggle
val: "INPUT" or "OUTPUT"
"""
if val == "INPUT":
pin_ = -pin
else:
pin_ = pin
cmd_str = build_cmd_str("pm", (pin_,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
def pulseIn(self, pin, val):
"""
Reads a pulse from a pin
inputs:
pin: pin number for pulse measurement
returns:
duration : pulse length measurement
"""
if val.upper() == "LOW":
pin_ = -pin
else:
pin_ = pin
cmd_str = build_cmd_str("pi", (pin_,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
try:
return float(rd)
except:
return -1
def pulseIn_set(self, pin, val, numTrials=5):
"""
Sets a digital pin value, then reads the response
as a pulse width.
Useful for some ultrasonic rangefinders, etc.
inputs:
pin: pin number for pulse measurement
val: "HIGH" or "LOW". Pulse is measured
when this state is detected
numTrials: number of trials (for an average)
returns:
duration : an average of pulse length measurements
This method will automatically toggle
I/O modes on the pin and precondition the
measurment with a clean LOW/HIGH pulse.
Arduino.pulseIn_set(pin,"HIGH") is
equivalent to the Arduino sketch code:
pinMode(pin, OUTPUT);
digitalWrite(pin, LOW);
delayMicroseconds(2);
digitalWrite(pin, HIGH);
delayMicroseconds(5);
digitalWrite(pin, LOW);
pinMode(pin, INPUT);
long duration = pulseIn(pin, HIGH);
"""
if val.upper() == "LOW":
pin_ = -pin
else:
pin_ = pin
cmd_str = build_cmd_str("ps", (pin_,))
durations = []
for s in range(numTrials):
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
if rd.isdigit():
if (int(rd) > 1):
durations.append(int(rd))
if len(durations) > 0:
duration = int(sum(durations)) / int(len(durations))
else:
duration = None
try:
return float(duration)
except:
return -1
def close(self):
if self.sr.isOpen():
self.sr.flush()
self.sr.close()
def digitalRead(self, pin):
"""
Returns the value of a specified
digital pin.
inputs:
pin : digital pin number for measurement
returns:
value: 0 for "LOW", 1 for "HIGH"
"""
cmd_str = build_cmd_str("dr", (pin,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
try:
return int(rd)
except:
return 0
def Melody(self, pin, melody, durations):
"""
Plays a melody.
inputs:
pin: digital pin number for playback
melody: list of tones
durations: list of duration (4=quarter note, 8=eighth note, etc.)
length of melody should be of same
length as length of duration
Melodies of the following length, can cause trouble
when playing it multiple times.
board.Melody(9,["C4","G3","G3","A3","G3",0,"B3","C4"],
[4,8,8,4,4,4,4,4])
Playing short melodies (1 or 2 tones) didn't cause
trouble during testing
"""
NOTES = dict(
B0=31, C1=33, CS1=35, D1=37, DS1=39, E1=41, F1=44, FS1=46, G1=49,
GS1=52, A1=55, AS1=58, B1=62, C2=65, CS2=69, D2=73, DS2=78, E2=82,
F2=87, FS2=93, G2=98, GS2=104, A2=110, AS2=117, B2=123, C3=131,
CS3=139, D3=147, DS3=156, E3=165, F3=175, FS3=185, G3=196, GS3=208,
A3=220, AS3=233, B3=247, C4=262, CS4=277, D4=294, DS4=311, E4=330,
F4=349, FS4=370, G4=392, GS4=415, A4=440,
AS4=466, B4=494, C5=523, CS5=554, D5=587, DS5=622, E5=659, F5=698,
FS5=740, G5=784, GS5=831, A5=880, AS5=932, B5=988, C6=1047,
CS6=1109, D6=1175, DS6=1245, E6=1319, F6=1397, FS6=1480, G6=1568,
GS6=1661, A6=1760, AS6=1865, B6=1976, C7=2093, CS7=2217, D7=2349,
DS7=2489, E7=2637, F7=2794, FS7=2960, G7=3136, GS7=3322, A7=3520,
AS7=3729, B7=3951, C8=4186, CS8=4435, D8=4699, DS8=4978)
if (isinstance(melody, list)) and (isinstance(durations, list)):
length = len(melody)
cmd_args = [length, pin]
if length == len(durations):
cmd_args.extend([NOTES.get(melody[note])
for note in range(length)])
cmd_args.extend([durations[duration]
for duration in range(len(durations))])
cmd_str = build_cmd_str("to", cmd_args)
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
cmd_str = build_cmd_str("nto", [pin])
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
else:
return -1
else:
return -1
def capacitivePin(self, pin):
'''
Input:
pin (int): pin to use as capacitive sensor
Use it in a loop!
DO NOT CONNECT ANY ACTIVE DRIVER TO THE USED PIN !
the pin is toggled to output mode to discharge the port,
and if connected to a voltage source,
will short circuit the pin, potentially damaging
the Arduino/Shrimp and any hardware attached to the pin.
'''
cmd_str = build_cmd_str("cap", (pin,))
self.sr.write(str.encode(cmd_str))
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
if rd.isdigit():
return int(rd)
def shiftOut(self, dataPin, clockPin, pinOrder, value):
"""
Shift a byte out on the datapin using Arduino's shiftOut()
Input:
dataPin (int): pin for data
clockPin (int): pin for clock
pinOrder (String): either 'MSBFIRST' or 'LSBFIRST'
value (int): an integer from 0 and 255
"""
cmd_str = build_cmd_str("so",
(dataPin, clockPin, pinOrder, value))
self.sr.write(str.encode(cmd_str))
self.sr.flush()
def shiftIn(self, dataPin, clockPin, pinOrder):
"""
Shift a byte in from the datapin using Arduino's shiftIn().
Input:
dataPin (int): pin for data
clockPin (int): pin for clock
pinOrder (String): either 'MSBFIRST' or 'LSBFIRST'
Output:
(int) an integer from 0 to 255
"""
cmd_str = build_cmd_str("si", (dataPin, clockPin, pinOrder))
self.sr.write(str.encode(cmd_str))
self.sr.flush()
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
if rd.isdigit():
return int(rd)
class Shrimp(Arduino):
def __init__(self):
Arduino.__init__(self)
class Wires(object):
"""
Class for Arduino wire (i2c) support
"""
def __init__(self, board):
self.board = board
self.sr = board.sr
class Servos(object):
"""
Class for Arduino servo support
0.03 second delay noted
"""
def __init__(self, board):
self.board = board
self.sr = board.sr
self.servo_pos = {}
def attach(self, pin, min=544, max=2400):
cmd_str = build_cmd_str("sva", (pin, min, max))
while True:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
if rd:
break
else:
log.debug("trying to attach servo to pin {0}".format(pin))
position = int(rd)
self.servo_pos[pin] = position
return 1
def detach(self, pin):
position = self.servo_pos[pin]
cmd_str = build_cmd_str("svd", (position,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
del self.servo_pos[pin]
def write(self, pin, angle):
position = self.servo_pos[pin]
cmd_str = build_cmd_str("svw", (position, angle))
self.sr.write(str.encode(cmd_str))
self.sr.flush()
def writeMicroseconds(self, pin, uS):
position = self.servo_pos[pin]
cmd_str = build_cmd_str("svwm", (position, uS))
self.sr.write(str.encode(cmd_str))
self.sr.flush()
def read(self, pin):
if pin not in self.servo_pos.keys():
self.attach(pin)
position = self.servo_pos[pin]
cmd_str = build_cmd_str("svr", (position,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
try:
angle = int(rd)
return angle
except:
return None
class SoftwareSerial(object):
"""
Class for Arduino software serial functionality
"""
def __init__(self, board):
self.board = board
self.sr = board.sr
self.connected = False
def begin(self, p1, p2, baud):
"""
Create software serial instance on
specified tx,rx pins, at specified baud
"""
cmd_str = build_cmd_str("ss", (p1, p2, baud))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
response = self.sr.readline().decode("utf-8").replace("\r\n", "")
if response == "ss OK":
self.connected = True
return True
else:
self.connected = False
return False
def write(self, data):
"""
sends data to existing software serial instance
using Arduino's 'write' function
"""
if self.connected:
cmd_str = build_cmd_str("sw", (data,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
response = self.sr.readline().decode("utf-8").replace("\r\n", "")
if response == "ss OK":
return True
else:
return False
def read(self):
"""
returns first character read from
existing software serial instance
"""
if self.connected:
cmd_str = build_cmd_str("sr")
self.sr.write(str.encode(cmd_str))
self.sr.flush()
response = self.sr.readline().decode("utf-8").replace("\r\n", "")
if response:
return response
else:
return False
class EEPROM(object):
"""
Class for reading and writing to EEPROM.
"""
def __init__(self, board):
self.board = board
self.sr = board.sr
def size(self):
"""
Returns size of EEPROM memory.
"""
cmd_str = build_cmd_str("sz")
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
response = self.sr.readline().decode("utf-8").replace("\r\n", "")
return int(response)
except:
return 0
def write(self, address, value=0):
""" Write a byte to the EEPROM.
:address: the location to write to, starting from 0 (int)
:value: the value to write, from 0 to 255 (byte)
"""
if value > 255:
value = 255
elif value < 0:
value = 0
cmd_str = build_cmd_str("eewr", (address, value))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
def read(self, adrress):
""" Reads a byte from the EEPROM.
:address: the location to write to, starting from 0 (int)
"""
cmd_str = build_cmd_str("eer", (adrress,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
response = self.sr.readline().decode("utf-8").replace("\r\n", "")
if response:
return int(response)
except:
return 0
+416
View File
@@ -0,0 +1,416 @@
#include <SoftwareSerial.h>
#include <Wire.h>
#include <Servo.h>
#include <EEPROM.h>
void Version(){
Serial.println(F("V0.6"));
}
SoftwareSerial *sserial = NULL;
Servo servos[8];
int servo_pins[] = {0, 0, 0, 0, 0, 0, 0, 0};
boolean connected = false;
int Str2int (String Str_value)
{
char buffer[10]; //max length is three units
Str_value.toCharArray(buffer, 10);
int int_value = atoi(buffer);
return int_value;
}
void split(String results[], int len, String input, char spChar) {
String temp = input;
for (int i=0; i<len; i++) {
int idx = temp.indexOf(spChar);
results[i] = temp.substring(0,idx);
temp = temp.substring(idx+1);
}
}
uint8_t readCapacitivePin(String data) {
int pinToMeasure = Str2int(data);
// readCapacitivePin
// Input: Arduino pin number
// Output: A number, from 0 to 17 expressing
// how much capacitance is on the pin
// When you touch the pin, or whatever you have
// attached to it, the number will get higher
// http://playground.arduino.cc/Code/CapacitiveSensor
//
// Variables used to translate from Arduino to AVR pin naming
volatile uint8_t* port;
volatile uint8_t* ddr;
volatile uint8_t* pin;
// Here we translate the input pin number from
// Arduino pin number to the AVR PORT, PIN, DDR,
// and which bit of those registers we care about.
byte bitmask;
port = portOutputRegister(digitalPinToPort(pinToMeasure));
ddr = portModeRegister(digitalPinToPort(pinToMeasure));
bitmask = digitalPinToBitMask(pinToMeasure);
pin = portInputRegister(digitalPinToPort(pinToMeasure));
// Discharge the pin first by setting it low and output
*port &= ~(bitmask);
*ddr |= bitmask;
delay(1);
// Make the pin an input with the internal pull-up on
*ddr &= ~(bitmask);
*port |= bitmask;
// Now see how long the pin to get pulled up. This manual unrolling of the loop
// decreases the number of hardware cycles between each read of the pin,
// thus increasing sensitivity.
uint8_t cycles = 17;
if (*pin & bitmask) { cycles = 0;}
else if (*pin & bitmask) { cycles = 1;}
else if (*pin & bitmask) { cycles = 2;}
else if (*pin & bitmask) { cycles = 3;}
else if (*pin & bitmask) { cycles = 4;}
else if (*pin & bitmask) { cycles = 5;}
else if (*pin & bitmask) { cycles = 6;}
else if (*pin & bitmask) { cycles = 7;}
else if (*pin & bitmask) { cycles = 8;}
else if (*pin & bitmask) { cycles = 9;}
else if (*pin & bitmask) { cycles = 10;}
else if (*pin & bitmask) { cycles = 11;}
else if (*pin & bitmask) { cycles = 12;}
else if (*pin & bitmask) { cycles = 13;}
else if (*pin & bitmask) { cycles = 14;}
else if (*pin & bitmask) { cycles = 15;}
else if (*pin & bitmask) { cycles = 16;}
// Discharge the pin again by setting it low and output
// It's important to leave the pins low if you want to
// be able to touch more than 1 sensor at a time - if
// the sensor is left pulled high, when you touch
// two sensors, your body will transfer the charge between
// sensors.
*port &= ~(bitmask);
*ddr |= bitmask;
//return cycles;
Serial.println(cycles);
}
void Tone(String data){
int idx = data.indexOf('%');
int len = Str2int(data.substring(0,idx));
String data2 = data.substring(idx+1);
int idx2 = data2.indexOf('%');
int pin = Str2int(data2.substring(0,idx2));
String data3 = data2.substring(idx2+1);
String melody[len*2];
split(melody,len*2,data3,'%');
for (int thisNote = 0; thisNote < len; thisNote++) {
int noteDuration = 1000/Str2int(melody[thisNote+len]);
int note = Str2int(melody[thisNote]);
tone(pin, note, noteDuration);
int pause = noteDuration * 1.30;
delay(pause);
noTone(pin);
}
}
void ToneNo(String data){
int pin = Str2int(data);
noTone(pin);
}
void DigitalHandler(int mode, String data){
int pin = Str2int(data);
if(mode<=0){ //read
Serial.println(digitalRead(pin));
}else{
if(pin <0){
digitalWrite(-pin,LOW);
}else{
digitalWrite(pin,HIGH);
}
//Serial.println('0');
}
}
void AnalogHandler(int mode, String data){
if(mode<=0){ //read
int pin = Str2int(data);
Serial.println(analogRead(pin));
}else{
String sdata[2];
split(sdata,2,data,'%');
int pin = Str2int(sdata[0]);
int pv = Str2int(sdata[1]);
analogWrite(pin,pv);
}
}
void ConfigurePinHandler(String data){
int pin = Str2int(data);
if(pin <=0){
pinMode(-pin,INPUT);
}else{
pinMode(pin,OUTPUT);
}
}
void shiftOutHandler(String data) {
String sdata[4];
split(sdata, 4, data, '%');
int dataPin = sdata[0].toInt();
int clockPin = sdata[1].toInt();
String bitOrderName = sdata[2];
byte value = (byte)(sdata[3].toInt());
if (bitOrderName == "MSBFIRST") {
shiftOut(dataPin, clockPin, MSBFIRST, value);
} else {
shiftOut(dataPin, clockPin, LSBFIRST, value);
}
}
void shiftInHandler(String data) {
String sdata[3];
split(sdata, 3, data, '%');
int dataPin = sdata[0].toInt();
int clockPin = sdata[1].toInt();
String bitOrderName = sdata[2];
int incoming;
if (bitOrderName == "MSBFIRST") {
incoming = (int)shiftIn(dataPin, clockPin, MSBFIRST);
} else {
incoming = (int)shiftIn(dataPin, clockPin, LSBFIRST);
}
Serial.println(incoming);
}
void SS_set(String data){
delete sserial;
String sdata[3];
split(sdata,3,data,'%');
int rx_ = Str2int(sdata[0]);
int tx_ = Str2int(sdata[1]);
int baud_ = Str2int(sdata[2]);
sserial = new SoftwareSerial(rx_, tx_);
sserial->begin(baud_);
Serial.println("ss OK");
}
void SS_write(String data) {
int len = data.length()+1;
char buffer[len];
data.toCharArray(buffer,len);
Serial.println("ss OK");
sserial->write(buffer);
}
void SS_read(String data) {
char c = sserial->read();
Serial.println(c);
}
void pulseInHandler(String data){
int pin = Str2int(data);
long duration;
if(pin <=0){
pinMode(-pin, INPUT);
duration = pulseIn(-pin, LOW);
}else{
pinMode(pin, INPUT);
duration = pulseIn(pin, HIGH);
}
Serial.println(duration);
}
void pulseInSHandler(String data){
int pin = Str2int(data);
long duration;
if(pin <=0){
pinMode(-pin, OUTPUT);
digitalWrite(-pin, HIGH);
delayMicroseconds(2);
digitalWrite(-pin, LOW);
delayMicroseconds(5);
digitalWrite(-pin, HIGH);
pinMode(-pin, INPUT);
duration = pulseIn(-pin, LOW);
}else{
pinMode(pin, OUTPUT);
digitalWrite(pin, LOW);
delayMicroseconds(2);
digitalWrite(pin, HIGH);
delayMicroseconds(5);
digitalWrite(pin, LOW);
pinMode(pin, INPUT);
duration = pulseIn(pin, HIGH);
}
Serial.println(duration);
}
void SV_add(String data) {
String sdata[3];
split(sdata,3,data,'%');
int pin = Str2int(sdata[0]);
int min = Str2int(sdata[1]);
int max = Str2int(sdata[2]);
int pos = -1;
for (int i = 0; i<8;i++) {
if (servo_pins[i] == pin) { //reset in place
servos[pos].detach();
servos[pos].attach(pin, min, max);
servo_pins[pos] = pin;
Serial.println(pos);
return;
}
}
for (int i = 0; i<8;i++) {
if (servo_pins[i] == 0) {pos = i;break;} // find spot in servo array
}
if (pos == -1) {;} //no array position available!
else {
servos[pos].attach(pin, min, max);
servo_pins[pos] = pin;
Serial.println(pos);
}
}
void SV_remove(String data) {
int pos = Str2int(data);
servos[pos].detach();
servo_pins[pos] = 0;
}
void SV_read(String data) {
int pos = Str2int(data);
int angle;
angle = servos[pos].read();
Serial.println(angle);
}
void SV_write(String data) {
String sdata[2];
split(sdata,2,data,'%');
int pos = Str2int(sdata[0]);
int angle = Str2int(sdata[1]);
servos[pos].write(angle);
}
void SV_write_ms(String data) {
String sdata[2];
split(sdata,2,data,'%');
int pos = Str2int(sdata[0]);
int uS = Str2int(sdata[1]);
servos[pos].writeMicroseconds(uS);
}
void sizeEEPROM() {
Serial.println(E2END + 1);
}
void EEPROMHandler(int mode, String data) {
String sdata[2];
split(sdata, 2, data, '%');
if (mode == 0) {
EEPROM.write(Str2int(sdata[0]), Str2int(sdata[1]));
} else {
Serial.println(EEPROM.read(Str2int(sdata[0])));
}
}
void SerialParser(void) {
char readChar[64];
Serial.readBytesUntil(33,readChar,64);
String read_ = String(readChar);
//Serial.println(readChar);
int idx1 = read_.indexOf('%');
int idx2 = read_.indexOf('$');
// separate command from associated data
String cmd = read_.substring(1,idx1);
String data = read_.substring(idx1+1,idx2);
// determine command sent
if (cmd == "dw") {
DigitalHandler(1, data);
}
else if (cmd == "dr") {
DigitalHandler(0, data);
}
else if (cmd == "aw") {
AnalogHandler(1, data);
}
else if (cmd == "ar") {
AnalogHandler(0, data);
}
else if (cmd == "pm") {
ConfigurePinHandler(data);
}
else if (cmd == "ps") {
pulseInSHandler(data);
}
else if (cmd == "pi") {
pulseInHandler(data);
}
else if (cmd == "ss") {
SS_set(data);
}
else if (cmd == "sw") {
SS_write(data);
}
else if (cmd == "sr") {
SS_read(data);
}
else if (cmd == "sva") {
SV_add(data);
}
else if (cmd == "svr") {
SV_read(data);
}
else if (cmd == "svw") {
SV_write(data);
}
else if (cmd == "svwm") {
SV_write_ms(data);
}
else if (cmd == "svd") {
SV_remove(data);
}
else if (cmd == "version") {
Version();
}
else if (cmd == "to") {
Tone(data);
}
else if (cmd == "nto") {
ToneNo(data);
}
else if (cmd == "cap") {
readCapacitivePin(data);
}
else if (cmd == "so") {
shiftOutHandler(data);
}
else if (cmd == "si") {
shiftInHandler(data);
}
else if (cmd == "eewr") {
EEPROMHandler(0, data);
}
else if (cmd == "eer") {
EEPROMHandler(1, data);
}
else if (cmd == "sz") {
sizeEEPROM();
}
}
void setup() {
Serial.begin(115200);
while (!Serial) {
; // wait for serial port to connect. Needed for Leonardo only
}
Serial.println("connected");
}
void loop() {
SerialParser();
}
+21
View File
@@ -0,0 +1,21 @@
import time
from Arduino import Arduino
print("Searching for Arduino...")
board = Arduino()
print("Arduino found.")
board.pinMode(9, "Output")
i = 0
while i <= 4:
print("running: ", i)
board.digitalWrite(9, "HIGH")
time.sleep(0.5)
time.sleep(1)
board.digitalWrite(9, "LOW")
time.sleep(0.5)
time.sleep(1)
i = i + 1
board.close()
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2016 Sören Sprößig
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+66
View File
@@ -0,0 +1,66 @@
# import platform
import time as t
import numpy as np
import globals as g
import cage_func as func
from pyps2000b import PS2000B
# User Inputs/Configuration----------------------------------
Test1 = 0
Test2 = 0
Test3 = 0
Test4 = 0
# Constants:
g.Coil_const = np.array([38.6, 38.45, 37.9])*1e-9 # Coil constants [x,y,z] in T/A
g.ambientField = np.array([80])*1e-6 # ambient magnetic field in measurement area, to be cancelled out
g.resistances = np.array([3.9, 1, 1]) # resistance of [x,y,z] circuits
g.maxWatts = np.array([8, 0, 0]) # max. allowed power for [x,y,z] circuits
# COM-Ports for power supply units:
xyPort = "COM7"
g.xyDevice = PS2000B.PS2000B(xyPort)
g.maxAmps = np.sqrt(g.maxWatts/g.resistances)
print(g.maxAmps)
def print_status():
print("Output 1:")
func.print_status(g.xAxis)
print("Output 2:")
func.print_status(g.yAxis)
g.xyDevice.enable_all()
func.set_to_zero(g.xyDevice)
print_status()
t.sleep(3)
if Test1 == 1:
g.xyDevice.voltage1 = 5
t.sleep(1)
print_status()
t.sleep(5)
func.set_to_zero(g.xyDevice)
if Test2 == 1:
g.xyDevice.current1 = 0.2
t.sleep(1)
print_status()
t.sleep(5)
func.set_to_zero(g.xyDevice)
if Test4 == 1:
func.set_axis_current(g.xyDevice, 0.2)
t.sleep(1)
print_status()
t.sleep(10)
func.set_to_zero(g.xyDevice)
t.sleep(1)
print_status()
g.xyDevice.disable_all()
print_status()
+67 -1
View File
@@ -1,2 +1,68 @@
# Magnetfeldteststand
# Python-PS2000B
Python library to work with Elektro-Automatik PS 2000 B power supplies via USB.
## Compatibility
Tested with:
+ Python 2.7
+ Python 3.5, 3.6
Tested on:
+ Windows 7 x64
+ Windows 10 x64 Version 1607 (OS Build 14393.2035)
+ Ubuntu 16.04.1 LTS
+ Ubuntu 20.04.1 LTS
## Features of Python-PS2000B
### Supported
- read static device information (manufacturer, serial, device type ...)
- read dynamic device information (current, voltage)
- read/write remote control
- read/write output control
### Still todo
- set voltage and current
- wrap error results in own telegram
## Prerequisites
### Python
The following third-party Python libraries are needed:
* `pyserial` - serial communication library for Python, see https://pypi.python.org/pypi/pyserial
### Windows
On Windows the USB driver (fetch it from http://www.elektroautomatik.de/files/eautomatik/treiber/usb/ea_device_driver.rar) must be installed. Afterwards you can find the serial port `COMxx` in the *device manager*.
### Linux
On Linux the device is detected as `/dev/ttyACMx` (abstract control model, see https://www.rfc1149.net/blog/2013/03/05/what-is-the-difference-between-devttyusbx-and-devttyacmx/). Use `dmesg` after connecting the device to find out `x`.
Most Linuxes will require users to elevate for accessing the device. If you want to access the device as your current user, just add it to the group `dialout` (`ls -lah /dev/ttyACM0` will show you the group to use, usually this is `dialout`) and login again.
## Usage
```python
import time
from pyps2000b import PS2000B
device = PS2000B.PS2000B("/dev/ttyACM0")
print("Device status: %s" % device.get_device_status_information())
device.enable_remote_control()
device.voltage1 = 5.1
device.current1 = 1
device.enable_output()
time.sleep(1)
print("Current output: %0.2f V , %0.2f A" % (device.get_voltage(), device.get_current()))
device.output1 = False
```
## Documentation
+ product website: http://www.elektroautomatik.de/en/ps2000b.html
+ programming guide PS 2000 B: http://www.elektroautomatik.de/files/eautomatik/treiber/ps2000b/programming_ps2000b.zip
+206
View File
@@ -0,0 +1,206 @@
# Arduino-Python3 Command API
This API is forked from the original [Python Arduino Command API](https://github.com/thearn/Python-Arduino-Command-API) to add support for Python 3.
The Arduino-Python3 Command API is a lightweight Python library for
communicating with [Arduino microcontroller boards](http://www.arduino.cc/) from a connected computer using
standard serial IO, either over a physical wire
or wirelessly. It is written using a custom protocol, similar to [Firmata](http://firmata.org/wiki/Main_Page).
This allows a user to quickly prototype programs for Arduino using Python code, or to
simply read/control/troubleshoot/experiment
with hardware connected to an Arduino board without ever having to recompile and reload sketches to the board itself.
Method names within the Arduino-Python3 Command API are designed to be as close
as possible to their Arduino programming language counterparts
## Simple usage example (LED blink)
```python
#!/usr/bin/env python
"""
Blinks an LED on digital pin 13
in 1 second intervals
"""
from Arduino import Arduino
import time
board = Arduino() # plugged in via USB, serial com at rate 115200
board.pinMode(13, "OUTPUT")
while True:
board.digitalWrite(13, "LOW")
time.sleep(1)
board.digitalWrite(13, "HIGH")
time.sleep(1)
```
## Requirements:
- [Python](http://python.org/) 3.7 tested on Windows and macOS.
- [pyserial](http://pyserial.sourceforge.net/) 2.6 or higher
- Any [Arduino compatible microcontroller](https://www.sparkfun.com/categories/242) with at least 14KB of flash memory
## Installation:
Either run `pip install arduino-python3` from a command line, or run `python setup.py
build install` from the source directory to install this library.
## Setup:
1. Verify that your Arduino board communicates at the baud rate specified in the
`setup()` function (line 407) in `prototype.ino`. Change it there if necessary.
2. Load the `prototype.ino` sketch onto your Arduino board, using the Arduino IDE.
3. Set up some kind of serial I/O communication between the Arduino board and your computer (via physical USB cable,
Bluetooth, xbee, etc. + associated drivers)
4. Add `from Arduino import Arduino` into your python script to communicate with your Arduino
For a collection of examples, see `examples.py`. This file contains methods which replicate
the functionality of many Arduino demo sketches.
## Testing:
The `tests` directory contains some basic tests for the library. Extensive code coverage is a bit difficult to expect for every release, since a positive test involves actually
connecting and issuing commands to a live Arduino, hosting any hardware
required to test a particular function. But a core of basic communication tests
should at least be maintained here and used before merging into the `master` branch.
After installation, the interactive tests can be run from the source directory:
```bash
$ python tests/test_main.py
```
Automated tests can be run from the source directory with:
```bash
$ python tests/test_arduino.py
```
## Classes
- `Arduino(baud)` - Set up communication with currently connected and powered
Arduino.
```python
board = Arduino("115200") #Example
```
The device name / COM port of the connected Arduino will be auto-detected.
If there are more than one Arduino boards connected,
the desired COM port can be also be passed as an optional argument:
```python
board = Arduino("115200", port="COM3") #Windows example
```
```python
board = Arduino("115200", port="/dev/tty.usbmodemfa141") #OSX example
```
A time-out for reading from the Arduino can also be specified as an optional
argument:
```python
board = Arduino("115200", timeout=2) #Serial reading functions will
#wait for no more than 2 seconds
```
## Methods
**Digital I/O**
- `Arduino.digitalWrite(pin_number, state)` turn digital pin on/off
- `Arduino.digitalRead(pin_number)` read state of a digital pin
```python
#Digital read / write example
board.digitalWrite(13, "HIGH") #Set digital pin 13 voltage
state_1 = board.digitalRead(13) #Will return integer 1
board.digitalWrite(13, "LOW") #Set digital pin 13 voltage
state_2 = board.digitalRead(13) #Will return integer 0
```
- `Arduino.pinMode(pin_number, io_mode)` set pin I/O mode
- `Arduino.pulseIn(pin_number, state)` measures a pulse
- `Arduino.pulseIn_set(pin_number, state)` measures a pulse, with preconditioning
```python
#Digital mode / pulse example
board.pinMode(7, "INPUT") #Set digital pin 7 mode to INPUT
duration = board.pulseIn(7, "HIGH") #Return pulse width measurement on pin 7
```
**Analog I/O**
- `Arduino.analogRead(pin_number)` returns the analog value
- `Arduino.analogWrite(pin_number, value)` sets the analog value
```python
#Analog I/O examples
val=board.analogRead(5) #Read value on analog pin 5 (integer 0 to 1023)
val = val / 4 # scale to 0 - 255
board.analogWrite(11) #Set analog value (PWM) based on analog measurement
```
**Shift Register**
- `Arduino.shiftIn(dataPin, clockPin, bitOrder)` shift a byte in and returns it
- `Arduino.shiftOut(dataPin, clockPin, bitOrder, value)` shift the given byte out
`bitOrder` should be either `"MSBFIRST"` or `"LSBFIRST"`
**Servo Library Functionality**
Support is included for up to 8 servos.
- `Arduino.Servos.attach(pin, min=544, max=2400)` Create servo instance. Only 8 servos can be used at one time.
- `Arduino.Servos.read(pin)` Returns the angle of the servo attached to the specified pin
- `Arduino.Servos.write(pin, angle)` Move an attached servo on a pin to a specified angle
- `Arduino.Servos.writeMicroseconds(pin, uS)` Write a value in microseconds to the servo on a specified pin
- `Arduino.Servos.detach(pin)` Detaches the servo on the specified pin
```python
#Servo example
board.Servos.attach(9) #declare servo on pin 9
board.Servos.write(9, 0) #move servo on pin 9 to 0 degrees
print board.Servos.read(9) # should be 0
board.Servos.detach(9) #free pin 9
```
**Software Serial Functionality**
- `Arduino.SoftwareSerial.begin(ss_rxPin, ss_txPin, ss_device_baud)` initialize software serial device on
specified pins.
Only one software serial device can be used at a time. Existing software serial instance will
be overwritten by calling this method, both in Python and on the Arduino board.
- `Arduino.SoftwareSerial.write(data)` send data using the Arduino 'write' function to the existing software
serial connection.
- `Arduino.SoftwareSerial.read()` returns one byte from the existing software serial connection
```python
#Software serial example
board.SoftwareSerial.begin(0, 7, "19200") # Start software serial for transmit only (tx on pin 7)
board.SoftwareSerial.write(" test ") #Send some data
response_char = board.SoftwareSerial.read() #read response character
```
**EEPROM**
- `Arduino.EEPROM.read(address)` reads a byte from the EEPROM
- `Arduino.EEPROM.write(address, value)` writes a byte to the EEPROM
- `Arduino.EEPROM.size()` returns size of the EEPROM
```python
#EEPROM read and write examples
location = 42
value = 10 # 0-255(byte)
board.EEPROM.write(location, 10)
print(board.EEPROM.read(location))
print('EEPROM size {size}'.format(size=board.EEPROM.size()))
```
**Misc**
- `Arduino.close()` closes serial connection to the Arduino.
## To-do list:
- Expand software serial functionality (`print()` and `println()`)
- Add simple reset functionality that zeros out all pin values
- Add I2C / TWI function support (Arduino `Wire.h` commands)
- Include a wizard which generates 'prototype.ino' with selected serial baud rate and Arduino function support
(to help reduce memory requirements).
- Multi-serial support for Arduino mega (`Serial1.read()`, etc)
+88
View File
@@ -0,0 +1,88 @@
# import time
from pyps2000b import PS2000B
import globals as g
def set_devices(): # creates device objects for all PSUs
g.xyDevice = PS2000B.PS2000B(g.xyPort)
g.zDevice = PS2000B.PS2000B(g.zPort)
g.xAxis = (g.xyDevice, 0, g.relayPins[0], 0) # (device, channel, arduino pin, axis index)
g.yAxis = (g.xyDevice, 1, g.relayPins[1], 1)
g.zAxis = (g.zDevice, 0, g.relayPins[2], 2)
def activate_all(): # enables remote control and output on all PSUs and channels
g.xyDevice.enable_all()
g.zDevice.enable_all()
def deactivate_all(): # disables remote control and output on all PSUs and channels
g.xyDevice.disable_all()
g.zDevice.disable_all()
def safe_arduino(): # sets output pins to low and closes serial connection
for pin in g.relayPins:
g.arduino.digitalWrite(pin, "LOW")
g.arduino.close()
def print_status(axis): # axis as (device, channel), e.g. g.xAxis
device = axis[0] # PSU
channel = axis[1] # output channel on the PSU
print("%s, %0.2f V, %0.2f A"
% (device.get_device_status_information(channel), device.get_voltage(channel), device.get_current(channel)))
def print_status_3():
print("X-Axis:")
print_status(g.xAxis)
print("Y-Axis:")
print_status(g.yAxis)
print("Z-Axis:")
print_status(g.zAxis)
def set_to_zero(device):
device.voltage1 = 0
device.current1 = 0
device.current2 = 0
device.current2 = 0
def set_field_simple(vector): # forms magnetic field as specified by vector, w/o cancelling ambient field
i_vec = vector/g.Coil_const
set_current_vec(i_vec)
def set_field(vector): # forms magnetic field as specified by vector, corrected for ambient field
field = vector - g.ambientField
i_vec = field/g.Coil_const
set_current_vec(i_vec)
def set_current_vec(i_vec): # sets needed currents on each axis for given vector
set_axis_current(g.xAxis, i_vec[0])
set_axis_current(g.yAxis, i_vec[1])
set_axis_current(g.zAxis, i_vec[2])
def set_axis_current(axis, value): # sets current with correct polarity on one axis
device = axis[0]
channel = axis[1]
ardPin = axis[2]
axisIndex = axis[3]
if abs(value) > g.maxAmps[axisIndex]: # prevent excessive currents
set_to_zero(device) # set currents and voltages to 0
device.disable_all() # disable outputs on PSU
safe_arduino() # set arduino pins to low and close serial link
raise ValueError("Invalid current value. Tried %0.2fA, max. %0.2fA allowed" % (value, g.maxAmps[axisIndex]))
elif value >= 0: # switch polarity as needed
g.arduino.digitalWrite(ardPin, "LOW")
elif value < 0:
g.arduino.digitalWrite(ardPin, "HIGH")
else:
raise Exception("This should be impossible.")
device.set_current(abs(value), channel)
device.set_voltage(1.1 * g.maxAmps[axisIndex] * g.resistances[axisIndex]) # set max voltage high enough
+56
View File
@@ -0,0 +1,56 @@
#!/usr/bin/env python3
# coding=utf-8
import platform
import time
from pyps2000b import PS2000B
DEVICE = "COM7" if platform.system() == "Windows" else "/dev/ttyACM0"
# connection to the device is automatically opened
print("Connecting to device at %s..." % DEVICE)
device = PS2000B.PS2000B(DEVICE) # create Object of class PS2000B, pass COM-port to functions inside
# static device information can be read
print("Connection open: %s" % device.is_open())
print("Device: %s" % device.get_device_information())
# dynamic device status information can be read
device_status_info1 = device.get_device_status_information(0)
device_status_info2 = device.get_device_status_information(1)
print("Device status 1: %s" % device_status_info1)
print("Device status 2: %s" % device_status_info2)
print("Current output: %0.2f V , %0.2f A" % (device.get_voltage(), device.get_current()))
# device can be controlled
if not device_status_info.remote_control_active:
print("...will enable remote control...")
device.enable_remote_control()
print("...set voltage to 12V and max current to 1A...")
device.voltage1 = 12
device.current1 = 1
time.sleep(10)
print("...now enabling the power output control...")
device.enable_output(0)
time.sleep(2)
device_status_info1 = device.get_device_status_information(0)
device_status_info2 = device.get_device_status_information(1)
print("Device status 1: %s" % device_status_info1)
print("Device status 2: %s" % device_status_info2)
print("Current output: %0.2f V , %0.2f A" % (device.voltage1, device.current1))
time.sleep(10)
print("...set voltage to 5.1V...")
device.voltage1 = 5.1
time.sleep(10)
print("Current output: %0.2f V , %0.2f A" % (device.get_voltage(), device.get_current()))
print("...after 5 seconds power output will be disabled again ...")
time.sleep(5)
device.disable_output()
print("...and disabling remote control again.")
device.disable_remote_control()
print("Device status 1: %s" % device.get_device_status_information(0))
print("Device status 2: %s" % device.get_device_status_information(1))
+64
View File
@@ -0,0 +1,64 @@
#!/usr/bin/env python3
# coding=utf-8
import platform
import time
from pyps2000b import PS2000B
DEVICE = "COM7" if platform.system() == "Windows" else "/dev/ttyACM0"
# connection to the device is automatically opened
print("Connecting to device at %s..." % DEVICE)
xyDevice = PS2000B.PS2000B(DEVICE) # create Object of class PS2000B, pass COM-port and channel to functions inside
# static device information can be read
print("Connection open: %s" % xyDevice.is_open())
print("Device: %s" % xyDevice.get_device_information())
# dynamic device status information can be read
device_status_info1 = xyDevice.get_device_status_information(0)
device_status_info2 = xyDevice.get_device_status_information(1)
print("Device status 1: %s" % xyDevice.get_device_status_information(0))
print("Device status 2: %s" % xyDevice.get_device_status_information(1))
print("Current output 1: %0.2f V , %0.2f A" % (xyDevice.voltage1, xyDevice.current1))
print("Current output 2: %0.2f V , %0.2f A" % (xyDevice.voltage2, xyDevice.current2))
# device can be controlled
if not device_status_info1.remote_control_active:
print("...will enable remote control...")
xyDevice.enable_remote_control(0)
if not device_status_info2.remote_control_active:
print("...will enable remote control...")
xyDevice.enable_remote_control(1)
print("...set voltage 1 to 12V and max current to 1A...")
xyDevice.voltage1 = 12
xyDevice.current1 = 1
time.sleep(2)
print("...now enabling the power output control 1...")
xyDevice.enable_output(0)
time.sleep(2)
print("... set voltage 2 to 5V and max current to 1A...")
xyDevice.voltage2 = 5
xyDevice.current2 = 1
time.sleep(2)
print("...now enabling the power output control 2...")
xyDevice.enable_output(1)
time.sleep(5)
print("Device status 1: %s" % xyDevice.get_device_status_information(0))
print("Device status 2: %s" % xyDevice.get_device_status_information(1))
print("Output 1: %0.2f V , %0.2f A" % (xyDevice.voltage1, xyDevice.current1))
print("Output 2: %0.2f V , %0.2f A" % (xyDevice.voltage2, xyDevice.current2))
time.sleep(5)
xyDevice.disable_output(0)
xyDevice.disable_output(1)
print("...and disabling remote control again.")
xyDevice.disable_remote_control(0)
xyDevice.disable_remote_control(1)
print("Device status 1: %s" % xyDevice.get_device_status_information(0))
print("Device status 2: %s" % xyDevice.get_device_status_information(1))
+21
View File
@@ -0,0 +1,21 @@
global Coil_const
global ambientField
global xyPort
global zPort
global xyDevice
global zDevice
global xAxis
global yAxis
global zAxis
global resistances
global maxAmps
global maxWatts
global arduino
relayPins = [1, 2, 3] # digital pin on the Arduino for switching relay of each axis [x,y,z]
+36
View File
@@ -0,0 +1,36 @@
# import platform
# import time
import numpy as np
import globals as g
import cage_func as func
# from pyps2000b import PS2000B
from Arduino import Arduino
# User Inputs/Configuration----------------------------------
# Desired output:
mag_vec1 = np.array([10, 10, 5])*1e-6
# Constants:
g.Coil_const = np.array([38.6, 38.45, 37.9])*1e-9 # Coil constants [x,y,z] in T/A
g.ambientField = np.array([80])*1e-6 # ambient magnetic field in measurement area, to be cancelled out
g.resistances = np.array([3.9, 1, 1]) # resistance of [x,y,z] circuits
g.maxWatts = np.array([8, 0, 0]) # max. allowed power for [x,y,z] circuits
# COM-Ports for power supply units:
g.xyPort = "COM1" # placeholders
g.zPort = "COM1"
# Code starts here------------------------------------------
g.maxAmps = np.sqrt(g.maxWatts/g.resistances) # calculate maximum currents in each axis
print("Connecting to PSUs...")
func.set_devices() # initiate communication, set handles
print("Connecting to Arduino...")
g.arduino = Arduino() # search for connected arduino and set handle
print("Arduino found.")
print("Activating PSU outputs...")
func.activate_all() # activate remote control and outputs on PSUs
func.set_field_simple(mag_vec1)
func.deactivate_all()
+407
View File
@@ -0,0 +1,407 @@
#!/usr/bin/env python3
# coding=utf-8
# Python access to Elektro Automatik PS 2000 B devices via USB/serial
#
# Supported features:
# - read static device information (manufacturer, serial, device type ...)
# - read dynamic device information (current, voltage)
# - read/write remote control
# - read/write output control
#
# - wrap error results in own telegram
#
# References
# [1] = "PS 2000B Programming Guide" from 2015-05-28
# [2] = "PS 2000B object list"
#
import serial
import struct
import sys
PY_3 = sys.version_info >= (3, 0)
# noinspection SpellCheckingInspection
__author__ = "Sören Sprößig <ssproessig@gmail.com>"
def as_string(raw_data):
"""Converts the given raw bytes to a string (removes NULL)"""
return bytearray(raw_data[:-1])
def as_float(raw_data):
"""Converts the given raw bytes to a float"""
f = struct.unpack_from(">f", bytearray(raw_data))[0]
return f
def as_word(raw_data):
"""Converts the given raw bytes to a word"""
w = struct.unpack_from(">H", bytearray(raw_data))[0]
return w
def _ord(x):
"""Wrap ord() call as we only need it in Python 2"""
return x if PY_3 else ord(x)
# noinspection PyClassHasNoInit
class Constants:
"""Communication constants"""
# communication parameters taken from [1], chapter 2.2
CONNECTION_BAUD_RATE = 115200
CONNECTION_PARITY = serial.PARITY_ODD
CONNECTION_STOP_BITS = 1
# timeout taken from [1], chapter 3.7
TIMEOUT_BETWEEN_COMMANDS = 0.05
# according to spec [1] 2.4:
# maximum length of a telegram is 21 bytes (Byte 0..20)
MAX_LEN_IN_BYTES = 21
# noinspection PyClassHasNoInit
class Objects:
"""Supported objects ids / commands"""
DEVICE_TYPE = 0
DEVICE_SERIAL_NO = 1
NOMINAL_VOLTAGE = 2
NOMINAL_CURRENT = 3
NOMINAL_POWER = 4
DEVICE_ARTICLE_NO = 6
MANUFACTURER = 8
SOFTWARE_VERSION = 9
SET_VALUE_VOLTAGE = 50
SET_VALUE_CURRENT = 51
POWER_SUPPLY_CONTROL = 54
STATUS_ACTUAL_VALUES = 71
# noinspection PyClassHasNoInit
class ControlParam:
"""Parameters for controlling the device"""
SWITCH_MODE_CMD = 0x10
SWITCH_MODE_REMOTE = 0x10
SWITCH_MODE_MANUAL = 0x00
SWITCH_POWER_OUTPUT_CMD = 0x1
SWITCH_POWER_OUTPUT_ON = 0x1
SWITCH_POWER_OUTPUT_OFF = 0x0
class Telegram:
"""Base class of a PS2000B telegram - basically allows accessing the raw bytes and does checksum calculation"""
def __init__(self):
self._bytes = []
self._checksum = []
self.checksum_ok = False
def _calc_checksum(self):
cs = 0
for b in self._bytes:
cs += b
cs_high = (cs & 0xff00) >> 8
cs_low = cs & 0xff
return [cs_high, cs_low]
@staticmethod
def _get_start_delimiter(transmission, expected_data_length):
result = 0b00000000
if expected_data_length > 16:
raise Exception("only 4 bits for expected length can be used")
result |= (expected_data_length - 1)
result |= 0b10000
result |= 0b100000
result |= transmission << 6
return result
def get_byte_array(self):
return bytearray(self._bytes + self._checksum)
class FromPowerSupply(Telegram):
"""Telegram received from the power supply"""
def __init__(self, raw_data):
Telegram.__init__(self)
data = [_ord(x) for x in raw_data]
self._bytes = data[0:-2]
self._checksum = data[len(data) - 2:len(data)]
self.checksum_ok = self._checksum == self._calc_checksum()
def get_sd(self):
return self._bytes[0]
def get_device_node(self):
return self._bytes[1]
def get_object(self):
return self._bytes[3]
def get_data(self):
return self._bytes[3:len(self._bytes)]
# noinspection PyMethodMayBeStatic
def get_error(self):
# ToDo: [1] chapter 3.6 add support for error codes here
return None
class ToPowerSupply(Telegram):
"""A telegram sent to the power supply"""
def __init__(self, transmission, data, expected_data_length):
Telegram.__init__(self)
self._bytes = []
self._bytes.append(self._get_start_delimiter(transmission, expected_data_length))
self._bytes.extend(data)
self._checksum = self._calc_checksum()
self.checksum_ok = True
class DeviceInformation:
"""A class carrying all static device information read from the device"""
def __init__(self):
self.device_type = ""
self.device_serial_no = ""
self.nominal_voltage = 0
self.nominal_current = 0
self.nominal_power = 0
self.manufacturer = ""
self.device_art_no = ""
self.software_version = ""
def __str__(self):
return "%s %s [%s], SW: %s, Art-Nr: %s, [%0.2f V, %0.2f A, %0.2f W]" % \
(self.manufacturer,
self.device_type, self.device_serial_no,
self.software_version, self.device_art_no,
self.nominal_voltage, self.nominal_current, self.nominal_power)
class DeviceStatusInformation:
"""A class carrying all dynamic device status information"""
def __init__(self, raw_data):
self.remote_control_active = raw_data[0] & 0b1
self.output_active = raw_data[1] & 0b1
self.actual_voltage_percent = float(as_word(raw_data[2:4])) / 256
self.actual_current_percent = float(as_word(raw_data[4:6])) / 256
def __str__(self):
if self.remote_control_active == 1:
remote = "active"
else:
remote = "inactive"
if self.output_active == 1:
output = "active"
else:
output = "inactive"
return "Remote control %s, Output %s" % (remote, output)
class PS2000B:
"""PS 2000 B main communication class"""
def __init__(self, serial_port):
self.__device_status_information1 = None
self.__device_status_information2 = None
self.__serial = serial.Serial(serial_port,
baudrate=Constants.CONNECTION_BAUD_RATE,
timeout=Constants.TIMEOUT_BETWEEN_COMMANDS * 2,
parity=serial.PARITY_ODD,
stopbits=Constants.CONNECTION_STOP_BITS)
self.__device_information = self.__read_device_information()
def is_open(self):
return self.__serial.is_open
def get_device_information(self):
return self.__device_information
def __read_device_information(self, channel=0): # reads static device information, usually not channel dependant
result = DeviceInformation()
# taken from [2]
result.device_type = as_string(self.__read_device_data(16, Objects.DEVICE_TYPE, channel).get_data())
result.device_serial_no = as_string(self.__read_device_data(16, Objects.DEVICE_SERIAL_NO, channel).get_data())
result.nominal_voltage = as_float(self.__read_device_data(4, Objects.NOMINAL_VOLTAGE, channel).get_data())
result.nominal_current = as_float(self.__read_device_data(4, Objects.NOMINAL_CURRENT, channel).get_data())
result.nominal_power = as_float(self.__read_device_data(4, Objects.NOMINAL_POWER, channel).get_data())
result.device_art_no = as_string(self.__read_device_data(16, Objects.DEVICE_ARTICLE_NO, channel).get_data())
result.manufacturer = as_string(self.__read_device_data(16, Objects.MANUFACTURER, channel).get_data())
result.software_version = as_string(self.__read_device_data(16, Objects.SOFTWARE_VERSION, channel).get_data())
return result
def __read_device_data(self, expected_length, object_id, channel): # reads data from device based on object_id
telegram = ToPowerSupply(0b01, [channel, object_id], expected_length)
result = self.__send_and_receive(telegram.get_byte_array())
return result
def __send_and_receive(self, raw_bytes): # sends request for info to device and reads reply
self.__serial.write(raw_bytes)
result = FromPowerSupply(self.__serial.read(Constants.MAX_LEN_IN_BYTES))
return result
def get_device_status_information(self, channel): # gets dynamic device information (e.g. current, voltage)
if channel == 0:
if self.__device_status_information1 is None:
self.update_device_information(0)
info = self.__device_status_information1
elif channel == 1:
if self.__device_status_information2 is None:
self.update_device_information(1)
info = self.__device_status_information2
else:
raise ValueError("Invalid Channel")
return info
def update_device_information(self, channel): # updates dynamic device info stored in __device_status_information
telegram = ToPowerSupply(0b01, [channel, Objects.STATUS_ACTUAL_VALUES], 6)
device_information = self.__send_and_receive(telegram.get_byte_array())
if channel == 0:
self.__device_status_information1 = DeviceStatusInformation(device_information.get_data())
elif channel == 1:
self.__device_status_information2 = DeviceStatusInformation(device_information.get_data())
else:
raise ValueError("Invalid Channel")
def __send_device_control(self, p1, p2, channel): # sends commands to PSU, commands given in p1, p2
telegram = ToPowerSupply(0b11, [channel, Objects.POWER_SUPPLY_CONTROL, p1, p2], 2)
_ = self.__send_and_receive(telegram.get_byte_array()) # send command to PSU
self.update_device_information(channel) # update info after change
def __send_device_data(self, obj, data, channel):
# Send integer data with obj-id to the PSU
telegram = ToPowerSupply(0b11, [channel, obj, data >> 8, data & 0xff], 4)
_ = self.__send_and_receive(telegram.get_byte_array())
self.update_device_information(channel)
def enable_all(self):
self.enable_remote_control(0)
self.enable_remote_control(1)
self.enable_output(0)
self.enable_output(1)
def disable_all(self):
self.disable_output(0)
self.disable_output(1)
self.disable_remote_control(0)
self.disable_remote_control(1)
def enable_remote_control(self, channel):
self.__send_device_control(ControlParam.SWITCH_MODE_CMD, ControlParam.SWITCH_MODE_REMOTE, channel)
def disable_remote_control(self, channel):
self.__send_device_control(ControlParam.SWITCH_MODE_CMD, ControlParam.SWITCH_MODE_MANUAL, channel)
def enable_output(self, channel):
self.__send_device_control(ControlParam.SWITCH_POWER_OUTPUT_CMD, ControlParam.SWITCH_POWER_OUTPUT_ON, channel)
def disable_output(self, channel):
self.__send_device_control(ControlParam.SWITCH_POWER_OUTPUT_CMD, ControlParam.SWITCH_POWER_OUTPUT_OFF, channel)
@property
def output1(self): # object controlling output 1 on/off
return self.get_device_status_information(0).output_active
@output1.setter
def output1(self, value):
if value:
self.enable_output(0)
else:
self.disable_output(0)
@property
def output2(self): # object controlling output 2 on/off
return self.get_device_status_information(1).output_active
@output2.setter
def output2(self, value):
if value:
self.enable_output(1)
else:
self.disable_output(1)
def get_voltage(self, channel):
self.update_device_information(channel)
if channel == 0:
v_perc = self.__device_status_information1.actual_voltage_percent
elif channel == 1:
v_perc = self.__device_status_information2.actual_voltage_percent
else:
raise ValueError("Invalid channel")
voltage = self.__device_information.nominal_voltage * v_perc
return voltage / 100
def get_voltage_setpoint(self, channel):
res = self.__read_device_data(2, Objects.SET_VALUE_VOLTAGE, channel).get_data()
return self.__device_information.nominal_voltage * ((res[0] << 8) + res[1]) / 25600.0
def set_voltage(self, value, channel):
self.update_device_information(channel)
self.enable_remote_control(channel)
volt = int(round((value * 25600.0) / self.__device_information.nominal_voltage))
self.__send_device_data(Objects.SET_VALUE_VOLTAGE, volt, channel)
@property
def voltage1(self): # object storing and controlling the voltage of channel 1
return self.get_voltage(0)
@voltage1.setter
def voltage1(self, value): # voltage of channel 1
self.set_voltage(value, 0)
@property
def voltage2(self): # object storing and controlling the voltage of channel 2
return self.get_voltage(1)
@voltage2.setter
def voltage2(self, value):
self.set_voltage(value, 1)
def get_current(self, channel):
self.update_device_information(channel)
if channel == 0:
c_perc = self.__device_status_information1.actual_current_percent
elif channel == 1:
c_perc = self.__device_status_information2.actual_current_percent
else:
raise ValueError("Invalid channel")
current = self.__device_information.nominal_current * c_perc
return current / 100
def get_current_setpoint(self, channel):
res = self.__read_device_data(2, Objects.SET_VALUE_CURRENT, channel).get_data()
return self.__device_information.nominal_current * ((res[0] << 8) + res[1]) / 25600.0
def set_current(self, value, channel):
self.update_device_information(channel)
self.enable_remote_control(channel)
curr = int(round((value * 25600.0) / self.__device_information.nominal_current))
self.__send_device_data(Objects.SET_VALUE_CURRENT, curr, channel)
@property
def current1(self):
return self.get_current(0)
@current1.setter
def current1(self, value): # current of channel 1
self.set_current(value, 0)
@property
def current2(self):
return self.get_current(1)
@current2.setter
def current2(self, value): # current of channel 2
self.set_current(value, 1)
+4
View File
@@ -0,0 +1,4 @@
#!/usr/bin/python
# coding=utf-8
# noinspection SpellCheckingInspection
__author__ = "Sören Sprößig <ssproessig@gmail.com>"