Refatoring: Moved .py files into src folder. Unified some file names

This commit is contained in:
2021-07-27 15:09:24 +02:00
parent c7e793a420
commit bcfe4808c0
18 changed files with 16 additions and 20 deletions
+206
View File
@@ -0,0 +1,206 @@
# Arduino-Python3 Command API
This API is forked from the original [Python Arduino Command API](https://github.com/thearn/Python-Arduino-Command-API) to add support for Python 3.
The Arduino-Python3 Command API is a lightweight Python library for
communicating with [Arduino microcontroller boards](http://www.arduino.cc/) from a connected computer using
standard serial IO, either over a physical wire
or wirelessly. It is written using a custom protocol, similar to [Firmata](http://firmata.org/wiki/Main_Page).
This allows a user to quickly prototype programs for Arduino using Python code, or to
simply read/control/troubleshoot/experiment
with hardware connected to an Arduino board without ever having to recompile and reload sketches to the board itself.
Method names within the Arduino-Python3 Command API are designed to be as close
as possible to their Arduino programming language counterparts
## Simple usage example (LED blink)
```python
#!/usr/bin/env python
"""
Blinks an LED on digital pin 13
in 1 second intervals
"""
from Arduino import Arduino
import time
board = Arduino() # plugged in via USB, serial com at rate 115200
board.pinMode(13, "OUTPUT")
while True:
board.digitalWrite(13, "LOW")
time.sleep(1)
board.digitalWrite(13, "HIGH")
time.sleep(1)
```
## Requirements:
- [Python](http://python.org/) 3.7 tested on Windows and macOS.
- [pyserial](http://pyserial.sourceforge.net/) 2.6 or higher
- Any [Arduino compatible microcontroller](https://www.sparkfun.com/categories/242) with at least 14KB of flash memory
## Installation:
Either run `pip install arduino-python3` from a command line, or run `python setup.py
build install` from the source directory to install this library.
## Setup:
1. Verify that your Arduino board communicates at the baud rate specified in the
`setup()` function (line 407) in `prototype.ino`. Change it there if necessary.
2. Load the `prototype.ino` sketch onto your Arduino board, using the Arduino IDE.
3. Set up some kind of serial I/O communication between the Arduino board and your computer (via physical USB cable,
Bluetooth, xbee, etc. + associated drivers)
4. Add `from Arduino import Arduino` into your python script to communicate with your Arduino
For a collection of examples, see `examples.py`. This file contains methods which replicate
the functionality of many Arduino demo sketches.
## Testing:
The `tests` directory contains some basic tests for the library. Extensive code coverage is a bit difficult to expect for every release, since a positive test involves actually
connecting and issuing commands to a live Arduino, hosting any hardware
required to test a particular function. But a core of basic communication tests
should at least be maintained here and used before merging into the `master` branch.
After installation, the interactive tests can be run from the source directory:
```bash
$ python tests/test_main.py
```
Automated tests can be run from the source directory with:
```bash
$ python tests/test_arduino.py
```
## Classes
- `Arduino(baud)` - Set up communication with currently connected and powered
Arduino.
```python
board = Arduino("115200") #Example
```
The device name / COM port of the connected Arduino will be auto-detected.
If there are more than one Arduino boards connected,
the desired COM port can be also be passed as an optional argument:
```python
board = Arduino("115200", port="COM3") #Windows example
```
```python
board = Arduino("115200", port="/dev/tty.usbmodemfa141") #OSX example
```
A time-out for reading from the Arduino can also be specified as an optional
argument:
```python
board = Arduino("115200", timeout=2) #Serial reading functions will
#wait for no more than 2 seconds
```
## Methods
**Digital I/O**
- `Arduino.digitalWrite(pin_number, state)` turn digital pin on/off
- `Arduino.digitalRead(pin_number)` read state of a digital pin
```python
#Digital read / write example
board.digitalWrite(13, "HIGH") #Set digital pin 13 voltage
state_1 = board.digitalRead(13) #Will return integer 1
board.digitalWrite(13, "LOW") #Set digital pin 13 voltage
state_2 = board.digitalRead(13) #Will return integer 0
```
- `Arduino.pinMode(pin_number, io_mode)` set pin I/O mode
- `Arduino.pulseIn(pin_number, state)` measures a pulse
- `Arduino.pulseIn_set(pin_number, state)` measures a pulse, with preconditioning
```python
#Digital mode / pulse example
board.pinMode(7, "INPUT") #Set digital pin 7 mode to INPUT
duration = board.pulseIn(7, "HIGH") #Return pulse width measurement on pin 7
```
**Analog I/O**
- `Arduino.analogRead(pin_number)` returns the analog value
- `Arduino.analogWrite(pin_number, value)` sets the analog value
```python
#Analog I/O examples
val=board.analogRead(5) #Read value on analog pin 5 (integer 0 to 1023)
val = val / 4 # scale to 0 - 255
board.analogWrite(11) #Set analog value (PWM) based on analog measurement
```
**Shift Register**
- `Arduino.shiftIn(dataPin, clockPin, bitOrder)` shift a byte in and returns it
- `Arduino.shiftOut(dataPin, clockPin, bitOrder, value)` shift the given byte out
`bitOrder` should be either `"MSBFIRST"` or `"LSBFIRST"`
**Servo Library Functionality**
Support is included for up to 8 servos.
- `Arduino.Servos.attach(pin, min=544, max=2400)` Create servo instance. Only 8 servos can be used at one time.
- `Arduino.Servos.read(pin)` Returns the angle of the servo attached to the specified pin
- `Arduino.Servos.write(pin, angle)` Move an attached servo on a pin to a specified angle
- `Arduino.Servos.writeMicroseconds(pin, uS)` Write a value in microseconds to the servo on a specified pin
- `Arduino.Servos.detach(pin)` Detaches the servo on the specified pin
```python
#Servo example
board.Servos.attach(9) #declare servo on pin 9
board.Servos.write(9, 0) #move servo on pin 9 to 0 degrees
print board.Servos.read(9) # should be 0
board.Servos.detach(9) #free pin 9
```
**Software Serial Functionality**
- `Arduino.SoftwareSerial.begin(ss_rxPin, ss_txPin, ss_device_baud)` initialize software serial device on
specified pins.
Only one software serial device can be used at a time. Existing software serial instance will
be overwritten by calling this method, both in Python and on the Arduino board.
- `Arduino.SoftwareSerial.write(data)` send data using the Arduino 'write' function to the existing software
serial connection.
- `Arduino.SoftwareSerial.read()` returns one byte from the existing software serial connection
```python
#Software serial example
board.SoftwareSerial.begin(0, 7, "19200") # Start software serial for transmit only (tx on pin 7)
board.SoftwareSerial.write(" test ") #Send some data
response_char = board.SoftwareSerial.read() #read response character
```
**EEPROM**
- `Arduino.EEPROM.read(address)` reads a byte from the EEPROM
- `Arduino.EEPROM.write(address, value)` writes a byte to the EEPROM
- `Arduino.EEPROM.size()` returns size of the EEPROM
```python
#EEPROM read and write examples
location = 42
value = 10 # 0-255(byte)
board.EEPROM.write(location, 10)
print(board.EEPROM.read(location))
print('EEPROM size {size}'.format(size=board.EEPROM.size()))
```
**Misc**
- `Arduino.close()` closes serial connection to the Arduino.
## To-do list:
- Expand software serial functionality (`print()` and `println()`)
- Add simple reset functionality that zeros out all pin values
- Add I2C / TWI function support (Arduino `Wire.h` commands)
- Include a wizard which generates 'prototype.ino' with selected serial baud rate and Arduino function support
(to help reduce memory requirements).
- Multi-serial support for Arduino mega (`Serial1.read()`, etc)
+2
View File
@@ -0,0 +1,2 @@
name="arduino-python3"
from .arduino import Arduino, Shrimp
+658
View File
@@ -0,0 +1,658 @@
# This file enables control of a connected Arduino microcontroller.
#!/usr/bin/env python
import logging
import itertools
import platform
import serial
import time
from serial.tools import list_ports
import sys
if sys.platform.startswith('win'):
import winreg
else:
import glob
libraryVersion = 'V0.6'
log = logging.getLogger(__name__)
def enumerate_serial_ports():
"""
Uses the Win32 registry to return a iterator of serial
(COM) ports existing on this computer.
"""
path = 'HARDWARE\\DEVICEMAP\\SERIALCOMM'
try:
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, path)
except OSError:
raise Exception
for i in itertools.count():
try:
val = winreg.EnumValue(key, i)
yield (str(val[1])) # , str(val[0]))
except EnvironmentError:
break
def build_cmd_str(cmd, args=None):
"""
Build a command string that can be sent to the arduino.
Input:
cmd (str): the command to send to the arduino, must not
contain a % character
args (iterable): the arguments to send to the command
@TODO: a strategy is needed to escape % characters in the args
"""
if args:
args = '%'.join(map(str, args))
else:
args = ''
return "@{cmd}%{args}$!".format(cmd=cmd, args=args)
def find_port(baud, timeout):
"""
Find the first port that is connected to an arduino with a compatible
sketch installed.
"""
if platform.system() == 'Windows':
ports = enumerate_serial_ports()
elif platform.system() == 'Darwin':
ports = [i[0] for i in list_ports.comports()]
ports = ports[::-1]
else:
ports = glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
for p in ports:
log.debug('Found {0}, testing...'.format(p))
try:
sr = serial.Serial(p, baud, timeout=timeout)
except (serial.serialutil.SerialException, OSError) as e:
log.debug(str(e))
continue
sr.readline() # wait for board to start up again
version = get_version(sr)
if version != libraryVersion:
try:
ver = version[0]
except Exception:
ver = ''
if ver == 'V' or version == "version":
print("You need to update the version of the Arduino-Python3",
"library running on your Arduino.")
print("The Arduino sketch is", version)
print("The Python installation is", libraryVersion)
print("Flash the prototype sketch again.")
return sr
# established to be the wrong board
log.debug('Bad version {0}. This is not a Shrimp/Arduino!'.format(
version))
sr.close()
continue
log.info('Using port {0}.'.format(p))
if sr:
return sr
return None
def get_version(sr):
cmd_str = build_cmd_str("version")
try:
sr.write(str.encode(cmd_str))
sr.flush()
except Exception:
return None
return sr.readline().decode("utf-8").replace("\r\n", "")
class Arduino(object):
def __init__(self, baud=115200, port=None, timeout=2, sr=None):
"""
Initializes serial communication with Arduino if no connection is
given. Attempts to self-select COM port, if not specified.
"""
if not sr:
if not port:
sr = find_port(baud, timeout)
if not sr:
raise ValueError("Could not find port.")
else:
sr = serial.Serial(port, baud, timeout=timeout)
sr.readline() # wait til board has rebooted and is connected
version = get_version(sr)
if version != libraryVersion:
# check version
try:
ver = version[0]
except Exception:
ver = ''
if ver == 'V' or version == "version":
print("You need to update the version of the Arduino-Python3",
"library running on your Arduino.")
print("The Arduino sketch is", version)
print("The Python installation is", libraryVersion)
print("Flash the prototype sketch again.")
sr.flush()
self.sr = sr
self.SoftwareSerial = SoftwareSerial(self)
self.Servos = Servos(self)
self.EEPROM = EEPROM(self)
def version(self):
return get_version(self.sr)
def digitalWrite(self, pin, val):
"""
Sends digitalWrite command
to digital pin on Arduino
-------------
inputs:
pin : digital pin number
val : either "HIGH" or "LOW"
"""
if val.upper() == "LOW":
pin_ = -pin
else:
pin_ = pin
cmd_str = build_cmd_str("dw", (pin_,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
def analogWrite(self, pin, val):
"""
Sends analogWrite pwm command
to pin on Arduino
-------------
inputs:
pin : pin number
val : integer 0 (off) to 255 (always on)
"""
if val > 255:
val = 255
elif val < 0:
val = 0
cmd_str = build_cmd_str("aw", (pin, val))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
def analogRead(self, pin):
"""
Returns the value of a specified
analog pin.
inputs:
pin : analog pin number for measurement
returns:
value: integer from 1 to 1023
"""
cmd_str = build_cmd_str("ar", (pin,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
try:
return int(rd)
except:
return 0
def pinMode(self, pin, val):
"""
Sets I/O mode of pin
inputs:
pin: pin number to toggle
val: "INPUT" or "OUTPUT"
"""
if val == "INPUT":
pin_ = -pin
else:
pin_ = pin
cmd_str = build_cmd_str("pm", (pin_,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
def pulseIn(self, pin, val):
"""
Reads a pulse from a pin
inputs:
pin: pin number for pulse measurement
returns:
duration : pulse length measurement
"""
if val.upper() == "LOW":
pin_ = -pin
else:
pin_ = pin
cmd_str = build_cmd_str("pi", (pin_,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
try:
return float(rd)
except:
return -1
def pulseIn_set(self, pin, val, numTrials=5):
"""
Sets a digital pin value, then reads the response
as a pulse width.
Useful for some ultrasonic rangefinders, etc.
inputs:
pin: pin number for pulse measurement
val: "HIGH" or "LOW". Pulse is measured
when this state is detected
numTrials: number of trials (for an average)
returns:
duration : an average of pulse length measurements
This method will automatically toggle
I/O modes on the pin and precondition the
measurment with a clean LOW/HIGH pulse.
Arduino.pulseIn_set(pin,"HIGH") is
equivalent to the Arduino sketch code:
pinMode(pin, OUTPUT);
digitalWrite(pin, LOW);
delayMicroseconds(2);
digitalWrite(pin, HIGH);
delayMicroseconds(5);
digitalWrite(pin, LOW);
pinMode(pin, INPUT);
long duration = pulseIn(pin, HIGH);
"""
if val.upper() == "LOW":
pin_ = -pin
else:
pin_ = pin
cmd_str = build_cmd_str("ps", (pin_,))
durations = []
for s in range(numTrials):
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
if rd.isdigit():
if (int(rd) > 1):
durations.append(int(rd))
if len(durations) > 0:
duration = int(sum(durations)) / int(len(durations))
else:
duration = None
try:
return float(duration)
except:
return -1
def close(self):
if self.sr.isOpen():
self.sr.flush()
self.sr.close()
def digitalRead(self, pin):
"""
Returns the value of a specified
digital pin.
inputs:
pin : digital pin number for measurement
returns:
value: 0 for "LOW", 1 for "HIGH"
"""
cmd_str = build_cmd_str("dr", (pin,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
try:
return int(rd)
except:
return 0
def Melody(self, pin, melody, durations):
"""
Plays a melody.
inputs:
pin: digital pin number for playback
melody: list of tones
durations: list of duration (4=quarter note, 8=eighth note, etc.)
length of melody should be of same
length as length of duration
Melodies of the following length, can cause trouble
when playing it multiple times.
board.Melody(9,["C4","G3","G3","A3","G3",0,"B3","C4"],
[4,8,8,4,4,4,4,4])
Playing short melodies (1 or 2 tones) didn't cause
trouble during testing
"""
NOTES = dict(
B0=31, C1=33, CS1=35, D1=37, DS1=39, E1=41, F1=44, FS1=46, G1=49,
GS1=52, A1=55, AS1=58, B1=62, C2=65, CS2=69, D2=73, DS2=78, E2=82,
F2=87, FS2=93, G2=98, GS2=104, A2=110, AS2=117, B2=123, C3=131,
CS3=139, D3=147, DS3=156, E3=165, F3=175, FS3=185, G3=196, GS3=208,
A3=220, AS3=233, B3=247, C4=262, CS4=277, D4=294, DS4=311, E4=330,
F4=349, FS4=370, G4=392, GS4=415, A4=440,
AS4=466, B4=494, C5=523, CS5=554, D5=587, DS5=622, E5=659, F5=698,
FS5=740, G5=784, GS5=831, A5=880, AS5=932, B5=988, C6=1047,
CS6=1109, D6=1175, DS6=1245, E6=1319, F6=1397, FS6=1480, G6=1568,
GS6=1661, A6=1760, AS6=1865, B6=1976, C7=2093, CS7=2217, D7=2349,
DS7=2489, E7=2637, F7=2794, FS7=2960, G7=3136, GS7=3322, A7=3520,
AS7=3729, B7=3951, C8=4186, CS8=4435, D8=4699, DS8=4978)
if (isinstance(melody, list)) and (isinstance(durations, list)):
length = len(melody)
cmd_args = [length, pin]
if length == len(durations):
cmd_args.extend([NOTES.get(melody[note])
for note in range(length)])
cmd_args.extend([durations[duration]
for duration in range(len(durations))])
cmd_str = build_cmd_str("to", cmd_args)
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
cmd_str = build_cmd_str("nto", [pin])
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
else:
return -1
else:
return -1
def capacitivePin(self, pin):
'''
Input:
pin (int): pin to use as capacitive sensor
Use it in a loop!
DO NOT CONNECT ANY ACTIVE DRIVER TO THE USED PIN !
the pin is toggled to output mode to discharge the port,
and if connected to a voltage source,
will short circuit the pin, potentially damaging
the Arduino/Shrimp and any hardware attached to the pin.
'''
cmd_str = build_cmd_str("cap", (pin,))
self.sr.write(str.encode(cmd_str))
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
if rd.isdigit():
return int(rd)
def shiftOut(self, dataPin, clockPin, pinOrder, value):
"""
Shift a byte out on the datapin using Arduino's shiftOut()
Input:
dataPin (int): pin for data
clockPin (int): pin for clock
pinOrder (String): either 'MSBFIRST' or 'LSBFIRST'
value (int): an integer from 0 and 255
"""
cmd_str = build_cmd_str("so",
(dataPin, clockPin, pinOrder, value))
self.sr.write(str.encode(cmd_str))
self.sr.flush()
def shiftIn(self, dataPin, clockPin, pinOrder):
"""
Shift a byte in from the datapin using Arduino's shiftIn().
Input:
dataPin (int): pin for data
clockPin (int): pin for clock
pinOrder (String): either 'MSBFIRST' or 'LSBFIRST'
Output:
(int) an integer from 0 to 255
"""
cmd_str = build_cmd_str("si", (dataPin, clockPin, pinOrder))
self.sr.write(str.encode(cmd_str))
self.sr.flush()
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
if rd.isdigit():
return int(rd)
class Shrimp(Arduino):
def __init__(self):
Arduino.__init__(self)
class Wires(object):
"""
Class for Arduino wire (i2c) support
"""
def __init__(self, board):
self.board = board
self.sr = board.sr
class Servos(object):
"""
Class for Arduino servo support
0.03 second delay noted
"""
def __init__(self, board):
self.board = board
self.sr = board.sr
self.servo_pos = {}
def attach(self, pin, min=544, max=2400):
cmd_str = build_cmd_str("sva", (pin, min, max))
while True:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
if rd:
break
else:
log.debug("trying to attach servo to pin {0}".format(pin))
position = int(rd)
self.servo_pos[pin] = position
return 1
def detach(self, pin):
position = self.servo_pos[pin]
cmd_str = build_cmd_str("svd", (position,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
del self.servo_pos[pin]
def write(self, pin, angle):
position = self.servo_pos[pin]
cmd_str = build_cmd_str("svw", (position, angle))
self.sr.write(str.encode(cmd_str))
self.sr.flush()
def writeMicroseconds(self, pin, uS):
position = self.servo_pos[pin]
cmd_str = build_cmd_str("svwm", (position, uS))
self.sr.write(str.encode(cmd_str))
self.sr.flush()
def read(self, pin):
if pin not in self.servo_pos.keys():
self.attach(pin)
position = self.servo_pos[pin]
cmd_str = build_cmd_str("svr", (position,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
rd = self.sr.readline().decode("utf-8").replace("\r\n", "")
try:
angle = int(rd)
return angle
except:
return None
class SoftwareSerial(object):
"""
Class for Arduino software serial functionality
"""
def __init__(self, board):
self.board = board
self.sr = board.sr
self.connected = False
def begin(self, p1, p2, baud):
"""
Create software serial instance on
specified tx,rx pins, at specified baud
"""
cmd_str = build_cmd_str("ss", (p1, p2, baud))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
response = self.sr.readline().decode("utf-8").replace("\r\n", "")
if response == "ss OK":
self.connected = True
return True
else:
self.connected = False
return False
def write(self, data):
"""
sends data to existing software serial instance
using Arduino's 'write' function
"""
if self.connected:
cmd_str = build_cmd_str("sw", (data,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
response = self.sr.readline().decode("utf-8").replace("\r\n", "")
if response == "ss OK":
return True
else:
return False
def read(self):
"""
returns first character read from
existing software serial instance
"""
if self.connected:
cmd_str = build_cmd_str("sr")
self.sr.write(str.encode(cmd_str))
self.sr.flush()
response = self.sr.readline().decode("utf-8").replace("\r\n", "")
if response:
return response
else:
return False
class EEPROM(object):
"""
Class for reading and writing to EEPROM.
"""
def __init__(self, board):
self.board = board
self.sr = board.sr
def size(self):
"""
Returns size of EEPROM memory.
"""
cmd_str = build_cmd_str("sz")
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
response = self.sr.readline().decode("utf-8").replace("\r\n", "")
return int(response)
except:
return 0
def write(self, address, value=0):
""" Write a byte to the EEPROM.
:address: the location to write to, starting from 0 (int)
:value: the value to write, from 0 to 255 (byte)
"""
if value > 255:
value = 255
elif value < 0:
value = 0
cmd_str = build_cmd_str("eewr", (address, value))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
except:
pass
def read(self, adrress):
""" Reads a byte from the EEPROM.
:address: the location to write to, starting from 0 (int)
"""
cmd_str = build_cmd_str("eer", (adrress,))
try:
self.sr.write(str.encode(cmd_str))
self.sr.flush()
response = self.sr.readline().decode("utf-8").replace("\r\n", "")
if response:
return int(response)
except:
return 0
+416
View File
@@ -0,0 +1,416 @@
#include <SoftwareSerial.h>
#include <Wire.h>
#include <Servo.h>
#include <EEPROM.h>
void Version(){
Serial.println(F("V0.6"));
}
SoftwareSerial *sserial = NULL;
Servo servos[8];
int servo_pins[] = {0, 0, 0, 0, 0, 0, 0, 0};
boolean connected = false;
int Str2int (String Str_value)
{
char buffer[10]; //max length is three units
Str_value.toCharArray(buffer, 10);
int int_value = atoi(buffer);
return int_value;
}
void split(String results[], int len, String input, char spChar) {
String temp = input;
for (int i=0; i<len; i++) {
int idx = temp.indexOf(spChar);
results[i] = temp.substring(0,idx);
temp = temp.substring(idx+1);
}
}
uint8_t readCapacitivePin(String data) {
int pinToMeasure = Str2int(data);
// readCapacitivePin
// Input: Arduino pin number
// Output: A number, from 0 to 17 expressing
// how much capacitance is on the pin
// When you touch the pin, or whatever you have
// attached to it, the number will get higher
// http://playground.arduino.cc/Code/CapacitiveSensor
//
// Variables used to translate from Arduino to AVR pin naming
volatile uint8_t* port;
volatile uint8_t* ddr;
volatile uint8_t* pin;
// Here we translate the input pin number from
// Arduino pin number to the AVR PORT, PIN, DDR,
// and which bit of those registers we care about.
byte bitmask;
port = portOutputRegister(digitalPinToPort(pinToMeasure));
ddr = portModeRegister(digitalPinToPort(pinToMeasure));
bitmask = digitalPinToBitMask(pinToMeasure);
pin = portInputRegister(digitalPinToPort(pinToMeasure));
// Discharge the pin first by setting it low and output
*port &= ~(bitmask);
*ddr |= bitmask;
delay(1);
// Make the pin an input with the internal pull-up on
*ddr &= ~(bitmask);
*port |= bitmask;
// Now see how long the pin to get pulled up. This manual unrolling of the loop
// decreases the number of hardware cycles between each read of the pin,
// thus increasing sensitivity.
uint8_t cycles = 17;
if (*pin & bitmask) { cycles = 0;}
else if (*pin & bitmask) { cycles = 1;}
else if (*pin & bitmask) { cycles = 2;}
else if (*pin & bitmask) { cycles = 3;}
else if (*pin & bitmask) { cycles = 4;}
else if (*pin & bitmask) { cycles = 5;}
else if (*pin & bitmask) { cycles = 6;}
else if (*pin & bitmask) { cycles = 7;}
else if (*pin & bitmask) { cycles = 8;}
else if (*pin & bitmask) { cycles = 9;}
else if (*pin & bitmask) { cycles = 10;}
else if (*pin & bitmask) { cycles = 11;}
else if (*pin & bitmask) { cycles = 12;}
else if (*pin & bitmask) { cycles = 13;}
else if (*pin & bitmask) { cycles = 14;}
else if (*pin & bitmask) { cycles = 15;}
else if (*pin & bitmask) { cycles = 16;}
// Discharge the pin again by setting it low and output
// It's important to leave the pins low if you want to
// be able to touch more than 1 sensor at a time - if
// the sensor is left pulled high, when you touch
// two sensors, your body will transfer the charge between
// sensors.
*port &= ~(bitmask);
*ddr |= bitmask;
//return cycles;
Serial.println(cycles);
}
void Tone(String data){
int idx = data.indexOf('%');
int len = Str2int(data.substring(0,idx));
String data2 = data.substring(idx+1);
int idx2 = data2.indexOf('%');
int pin = Str2int(data2.substring(0,idx2));
String data3 = data2.substring(idx2+1);
String melody[len*2];
split(melody,len*2,data3,'%');
for (int thisNote = 0; thisNote < len; thisNote++) {
int noteDuration = 1000/Str2int(melody[thisNote+len]);
int note = Str2int(melody[thisNote]);
tone(pin, note, noteDuration);
int pause = noteDuration * 1.30;
delay(pause);
noTone(pin);
}
}
void ToneNo(String data){
int pin = Str2int(data);
noTone(pin);
}
void DigitalHandler(int mode, String data){
int pin = Str2int(data);
if(mode<=0){ //read
Serial.println(digitalRead(pin));
}else{
if(pin <0){
digitalWrite(-pin,LOW);
}else{
digitalWrite(pin,HIGH);
}
//Serial.println('0');
}
}
void AnalogHandler(int mode, String data){
if(mode<=0){ //read
int pin = Str2int(data);
Serial.println(analogRead(pin));
}else{
String sdata[2];
split(sdata,2,data,'%');
int pin = Str2int(sdata[0]);
int pv = Str2int(sdata[1]);
analogWrite(pin,pv);
}
}
void ConfigurePinHandler(String data){
int pin = Str2int(data);
if(pin <=0){
pinMode(-pin,INPUT);
}else{
pinMode(pin,OUTPUT);
}
}
void shiftOutHandler(String data) {
String sdata[4];
split(sdata, 4, data, '%');
int dataPin = sdata[0].toInt();
int clockPin = sdata[1].toInt();
String bitOrderName = sdata[2];
byte value = (byte)(sdata[3].toInt());
if (bitOrderName == "MSBFIRST") {
shiftOut(dataPin, clockPin, MSBFIRST, value);
} else {
shiftOut(dataPin, clockPin, LSBFIRST, value);
}
}
void shiftInHandler(String data) {
String sdata[3];
split(sdata, 3, data, '%');
int dataPin = sdata[0].toInt();
int clockPin = sdata[1].toInt();
String bitOrderName = sdata[2];
int incoming;
if (bitOrderName == "MSBFIRST") {
incoming = (int)shiftIn(dataPin, clockPin, MSBFIRST);
} else {
incoming = (int)shiftIn(dataPin, clockPin, LSBFIRST);
}
Serial.println(incoming);
}
void SS_set(String data){
delete sserial;
String sdata[3];
split(sdata,3,data,'%');
int rx_ = Str2int(sdata[0]);
int tx_ = Str2int(sdata[1]);
int baud_ = Str2int(sdata[2]);
sserial = new SoftwareSerial(rx_, tx_);
sserial->begin(baud_);
Serial.println("ss OK");
}
void SS_write(String data) {
int len = data.length()+1;
char buffer[len];
data.toCharArray(buffer,len);
Serial.println("ss OK");
sserial->write(buffer);
}
void SS_read(String data) {
char c = sserial->read();
Serial.println(c);
}
void pulseInHandler(String data){
int pin = Str2int(data);
long duration;
if(pin <=0){
pinMode(-pin, INPUT);
duration = pulseIn(-pin, LOW);
}else{
pinMode(pin, INPUT);
duration = pulseIn(pin, HIGH);
}
Serial.println(duration);
}
void pulseInSHandler(String data){
int pin = Str2int(data);
long duration;
if(pin <=0){
pinMode(-pin, OUTPUT);
digitalWrite(-pin, HIGH);
delayMicroseconds(2);
digitalWrite(-pin, LOW);
delayMicroseconds(5);
digitalWrite(-pin, HIGH);
pinMode(-pin, INPUT);
duration = pulseIn(-pin, LOW);
}else{
pinMode(pin, OUTPUT);
digitalWrite(pin, LOW);
delayMicroseconds(2);
digitalWrite(pin, HIGH);
delayMicroseconds(5);
digitalWrite(pin, LOW);
pinMode(pin, INPUT);
duration = pulseIn(pin, HIGH);
}
Serial.println(duration);
}
void SV_add(String data) {
String sdata[3];
split(sdata,3,data,'%');
int pin = Str2int(sdata[0]);
int min = Str2int(sdata[1]);
int max = Str2int(sdata[2]);
int pos = -1;
for (int i = 0; i<8;i++) {
if (servo_pins[i] == pin) { //reset in place
servos[pos].detach();
servos[pos].attach(pin, min, max);
servo_pins[pos] = pin;
Serial.println(pos);
return;
}
}
for (int i = 0; i<8;i++) {
if (servo_pins[i] == 0) {pos = i;break;} // find spot in servo array
}
if (pos == -1) {;} //no array position available!
else {
servos[pos].attach(pin, min, max);
servo_pins[pos] = pin;
Serial.println(pos);
}
}
void SV_remove(String data) {
int pos = Str2int(data);
servos[pos].detach();
servo_pins[pos] = 0;
}
void SV_read(String data) {
int pos = Str2int(data);
int angle;
angle = servos[pos].read();
Serial.println(angle);
}
void SV_write(String data) {
String sdata[2];
split(sdata,2,data,'%');
int pos = Str2int(sdata[0]);
int angle = Str2int(sdata[1]);
servos[pos].write(angle);
}
void SV_write_ms(String data) {
String sdata[2];
split(sdata,2,data,'%');
int pos = Str2int(sdata[0]);
int uS = Str2int(sdata[1]);
servos[pos].writeMicroseconds(uS);
}
void sizeEEPROM() {
Serial.println(E2END + 1);
}
void EEPROMHandler(int mode, String data) {
String sdata[2];
split(sdata, 2, data, '%');
if (mode == 0) {
EEPROM.write(Str2int(sdata[0]), Str2int(sdata[1]));
} else {
Serial.println(EEPROM.read(Str2int(sdata[0])));
}
}
void SerialParser(void) {
char readChar[64];
Serial.readBytesUntil(33,readChar,64);
String read_ = String(readChar);
//Serial.println(readChar);
int idx1 = read_.indexOf('%');
int idx2 = read_.indexOf('$');
// separate command from associated data
String cmd = read_.substring(1,idx1);
String data = read_.substring(idx1+1,idx2);
// determine command sent
if (cmd == "dw") {
DigitalHandler(1, data);
}
else if (cmd == "dr") {
DigitalHandler(0, data);
}
else if (cmd == "aw") {
AnalogHandler(1, data);
}
else if (cmd == "ar") {
AnalogHandler(0, data);
}
else if (cmd == "pm") {
ConfigurePinHandler(data);
}
else if (cmd == "ps") {
pulseInSHandler(data);
}
else if (cmd == "pi") {
pulseInHandler(data);
}
else if (cmd == "ss") {
SS_set(data);
}
else if (cmd == "sw") {
SS_write(data);
}
else if (cmd == "sr") {
SS_read(data);
}
else if (cmd == "sva") {
SV_add(data);
}
else if (cmd == "svr") {
SV_read(data);
}
else if (cmd == "svw") {
SV_write(data);
}
else if (cmd == "svwm") {
SV_write_ms(data);
}
else if (cmd == "svd") {
SV_remove(data);
}
else if (cmd == "version") {
Version();
}
else if (cmd == "to") {
Tone(data);
}
else if (cmd == "nto") {
ToneNo(data);
}
else if (cmd == "cap") {
readCapacitivePin(data);
}
else if (cmd == "so") {
shiftOutHandler(data);
}
else if (cmd == "si") {
shiftInHandler(data);
}
else if (cmd == "eewr") {
EEPROMHandler(0, data);
}
else if (cmd == "eer") {
EEPROMHandler(1, data);
}
else if (cmd == "sz") {
sizeEEPROM();
}
}
void setup() {
Serial.begin(115200);
while (!Serial) {
; // wait for serial port to connect. Needed for Leonardo only
}
Serial.println("connected");
}
void loop() {
SerialParser();
}
+431
View File
@@ -0,0 +1,431 @@
# This file contains all classes and functions directly related to the operation of the helmholtz test bench.
# The two main classes are Axis and ArduinoCtrl, see their definitions for details.
# import packages:
import numpy as np
import serial
import traceback
from tkinter import messagebox
# import other project files
from user_interface import ui_print
from ps2000b import PS2000B
from arduino import Arduino
import config_handling as config
import globals as g
class Axis:
# Main class representing an axis (x,y,z) of the test bench
# contains static and dynamic status information about this axis and methods to control it
def __init__(self, index, device, PSU_channel, arduino_pin):
# static information
self.index = index # index of this axis, 0->X, 1->Y, 2->Z
self.device = device # power supply object for this axis (PS2000B class object)
self.channel = PSU_channel # power supply unit channel (0 or 1)
self.ardPin = arduino_pin # output pin on the arduino for switching polarity on this axis
self.name = g.AXIS_NAMES[index] # get name of this axis from list in globals.py (e.g. "X-Axis"
self.port = g.PORTS[index] # get serial port of this axis PSU
# read static information from the configuration object (which has read it from the config file or settings):
self.resistance = float(config.read_from_config(self.name, "resistance", config.CONFIG_OBJECT))
self.max_amps = float(config.read_from_config(self.name, "max_amps", config.CONFIG_OBJECT))
self.max_volts = float(config.read_from_config(self.name, "max_volts", config.CONFIG_OBJECT))
self.coil_constant = float(config.read_from_config(self.name, "coil_const", config.CONFIG_OBJECT))
self.ambient_field = float(config.read_from_config(self.name, "ambient_field", config.CONFIG_OBJECT))
max_field = self.max_amps * self.coil_constant # calculate max field reachable in this axis
self.max_field = np.array([-max_field, max_field]) # make array with min/max reachable field (w/o compensation)
# calculate max and min field that can be reached after compensating for the ambient field
self.max_comp_field = np.array([self.ambient_field - max_field, self.ambient_field + max_field]) # [min, max]
# initialize dynamic information, this is updated by self.update_status_info() later
self.connected = "Not Connected"
self.output_active = "Unknown" # power output on the PSU enabled?
self.remote_ctrl_active = "Unknown" # remote control on the PSU enabled?
self.voltage_setpoint = 0 # target voltage on PSU [V]
self.voltage = 0 # actual voltage on PSU [V]
self.current_setpoint = 0 # target current on PSU [A]
self.current = 0 # actual current on PSU [A]
self.polarity_switched = "Unknown" # polarity switched on the Arduino?
self.target_field_comp = 0 # field to be created by coil pair (this is sent to the coils) [T]
self.target_field = 0 # field that should occur in measurement area (ambient still needs to be compensated) [T]
self.target_current = 0 # signed current that should pass through coil pair [A]
if self.device is not None:
self.update_status_info()
def update_status_info(self): # Read out the values of the dynamic parameters stored in this object and update them
try: # try to read out the data, this will fail on connection error to PSU
# ToDo: this takes a long time, try to improve performance
self.device.update_device_information(self.channel) # update the information in the device object
device_status = self.device.get_device_status_information(self.channel) # get object with new status info
if device_status.output_active: # is the power output active?
self.output_active = "Active"
else:
self.output_active = "Inactive"
# is remote control active, allowing the device to be controlled by this program?
if device_status.remote_control_active:
self.remote_ctrl_active = "Active"
else:
self.remote_ctrl_active = "Inactive"
# get currents and voltages:
self.voltage = self.device.get_voltage(self.channel)
self.voltage_setpoint = self.device.get_voltage_setpoint(self.channel)
self.current = self.device.get_current(self.channel)
self.current_setpoint = self.device.get_current_setpoint(self.channel)
except (serial.serialutil.SerialException, IndexError): # Connection error, usually the PSU is unplugged
if self.connected == "Connected": # only show error messages if the device was connected before this error
# Show error as print-out in console and as pop-up:
ui_print("Connection Error with %s PSU on %s" % (self.name, self.port))
messagebox.showerror("PSU Error", "Connection Error with %s PSU on %s" % (self.name, self.port))
# set status attributes to connection error status:
self.connected = "Connection Error"
self.output_active = "Unknown"
self.remote_ctrl_active = "Unknown"
else: # no communications error
self.connected = "Connected" # PSU is connected
def print_status(self): # print out the current status of the PSU channel (not used at the moment)
ui_print("%s, %0.2f V, %0.2f A"
% (self.device.get_device_status_information(self.channel),
self.device.get_voltage(self.channel), self.device.get_current(self.channel)))
def power_down(self): # temporary powerdown, set outputs to 0 but keep connections enabled
try:
# set class object attributes to 0 to reflect shutdown in status displays, log files etc.
self.target_current = 0
self.target_field = 0
self.target_field_comp = 0
if self.device is not None: # there is a PSU connected for this axis
self.device.set_voltage(0, self.channel) # set voltage on PSU channel to 0
self.device.set_current(0, self.channel) # set current on PSU channel to 0
self.device.disable_output(self.channel) # disable power output on PSU channel
g.ARDUINO.digitalWrite(self.ardPin, "LOW") # set arduino pin for polarity switch relay to unpowered state
except Exception as e: # some error was encountered
# show error message:
ui_print("Error while powering down %s: %s" % (self.name, e))
messagebox.showerror("PSU Error!", "Error while powering down %s: \n%s" % (self.name, e))
def set_signed_current(self, value):
# sets current with correct polarity on this axis, this is the primary way to control the test bench
# ui_print("Attempting to set current", value, "A")
self.target_current = value # show target value in object attribute for status display, logging etc.
if abs(value) > self.max_amps: # prevent excessive currents
self.power_down() # set output to 0 and deactivate
raise ValueError("Invalid current value on %s. Tried %0.2fA, max. %0.2fA allowed"
% (self.name, value, self.max_amps))
elif value >= 0: # switch the e-box relay to change polarity as needed
g.ARDUINO.digitalWrite(self.ardPin, "LOW") # command the output pin on the arduino in the electronics box
elif value < 0:
g.ARDUINO.digitalWrite(self.ardPin, "HIGH") # command the output pin on the arduino in the electronics box
# determine voltage limit to be set on PSU, must be high enough to not limit the current:
# min. 8V, max. max_volts, in-between as needed with current value (+margin to not limit current)
maxVoltage = min(max(1.3 * abs(value) * self.resistance, 8), self.max_volts) # limit voltage
if self.connected == "Connected": # only try to command the PSU if its actually connected
self.device.set_current(abs(value), self.channel) # set desired current
self.device.set_voltage(maxVoltage, self.channel) # set voltage limit
if self.output_active == "Inactive": # don't send unnecessary commands to PSU
# ToDo: without calling update_status_info() output_active may be wrong
self.device.enable_output(self.channel) # activate the power output
else: # the PSU is not connected
ui_print(self.name, "not connected, can't set current.")
def set_field_simple(self, value): # forms magnetic field as specified by value, w/o cancelling ambient field
self.target_field = value # update object attribute for display
self.target_field_comp = value # same as above, bc no compensation
current = value / self.coil_constant # calculate needed current
self.set_signed_current(current) # command the test bench
def set_field(self, value): # forms magnetic field as specified by value, corrected for ambient field
self.target_field = value # update object attribute for display
field = value - self.ambient_field # calculate needed field after compensation
self.target_field_comp = field # update object attribute for display
current = field / self.coil_constant # calculate needed current
self.set_signed_current(current) # command the test bench
class ArduinoCtrl(Arduino):
# main class to control the electronics box (which means commanding the arduino inside)
# inherits from the Arduino library
def __init__(self):
self.connected = "Unknown" # connection status attribute, nominal "Connected"
self.pins = [0, 0, 0] # initialize list with pins to switch relay of each axis
for i in range(3): # get correct pins from the config
self.pins[i] = int(config.read_from_config(g.AXIS_NAMES[i], "relay_pin", config.CONFIG_OBJECT))
ui_print("\nConnecting to Arduino...")
try: # try to set up the arduino
Arduino.__init__(self) # search for connected arduino and connect by initializing arduino library class
for pin in self.pins:
self.pinMode(pin, "Output")
self.digitalWrite(pin, "LOW")
except Exception as e: # some error occurred, usually the arduino is not connected
ui_print("Connection to Arduino failed.", e)
self.connected = "Not Connected"
else: # connection was successfully established
self.connected = "Connected"
ui_print("Arduino ready.")
def update_status_info(self): # update the attributes stored in this class object
if self.connected == "Connected": # only do this if arduino is connected (initialize new instance to reconnect)
try: # try to read the status of the pins from the arduino
for axis in g.AXES: # go through all three axes
if g.ARDUINO.digitalRead(axis.ardPin): # pin is HIGH --> relay is switched
axis.polarity_switched = "True" # set attribute in axis object accordingly
else: # pin is LOW --> relay is not switched
axis.polarity_switched = "False" # set attribute in axis object accordingly
except Exception as e: # some error occurred while trying to read status, usually arduino is disconnected
# show warning messages to alert user
ui_print("Error with arduino:", e)
messagebox.showerror("Error with arduino!", "Connection Error with arduino: \n%s" % e)
for axis in g.AXES: # set polarity switch attributes in axis objects to "Unknown"
axis.polarity_switched = "Unknown"
self.connected = "Connection Error" # update own connection status
else: # no error occurred --> data was read successfully
self.connected = "Connected" # update own connection status
def safe(self): # sets relay switching pins to low to depower most of the electronics box
for pin in self.pins:
self.digitalWrite(pin, "LOW")
def value_in_limits(axis, key, value): # Check if value is within safe limits (set in globals.py)
# axis is string with axis name, e.g. "X-Axis"
# key specifies which value to check, e.g. current
max_value = g.default_arrays[key][1][g.AXIS_NAMES.index(axis)] # get max value from dictionary in globals.py
min_value = g.default_arrays[key][2][g.AXIS_NAMES.index(axis)] # get min value from dictionary in globals.py
if float(value) > float(max_value): # value is too high
return 'HIGH'
elif float(value) < float(min_value): # value is too low
return 'LOW'
else: # value is within limits
return 'OK'
def setup_all(): # main test bench initialization function
# creates device objects for all PSUs and Arduino and sets their values
# initializes an object of class Axis for all three axes (x,y,z)
# Setup Arduino:
try: # broad error handling for unforeseen errors, handling in ArduinoCtrl should catch most errors
if g.ARDUINO is not None: # the arduino has been initialized before, so we need to first close its connection
try:
g.ARDUINO.close() # close serial link
except serial.serialutil.SerialException:
pass
# serial.flush() in Arduino.close() fails when reconnecting
# this ignores it and allows serial.close() to execute (I think)
except AttributeError:
pass
# when no Arduino is connected but g.ARDUINO has been initialized then there is nothing to close
# this throws an exception, which can be ignored
g.ARDUINO = ArduinoCtrl() # initialize the arduino object from the control class, connects and sets up
except Exception as e: # some unforeseen error occurred (not connected issue handled in ArduinoCtrl class)
# show error messages to alert user
ui_print("Arduino setup failed:", e)
ui_print(traceback.print_exc())
messagebox.showerror("Error!", "Arduino setup failed:\n%s \nCheck traceback in console." % e)
# Setup PSUs and axis objects:
g.AXES = [] # initialize global list containing the three axis objects
# get serial ports for the PSUs from config
g.XY_PORT = config.read_from_config("PORTS", "xy_port", config.CONFIG_OBJECT)
g.Z_PORT = config.read_from_config("PORTS", "z_port", config.CONFIG_OBJECT)
g.PORTS = [g.XY_PORT, g.XY_PORT, g.Z_PORT] # write list with PSU port for each axis (X/Y share PSU)
# setup PSU and axis objects for X and Y axes:
ui_print("Connecting to XY Device on %s..." % g.XY_PORT)
try: # try to connect to the PSU
if g.XY_DEVICE is not None: # if PSU has previously been connected we need to close the serial link first
ui_print("Closing serial connection on XY device")
g.XY_DEVICE.serial.close()
g.XY_DEVICE = None
g.XY_DEVICE = PS2000B.PS2000B(g.XY_PORT) # setup PSU
ui_print("Connection established.")
g.X_AXIS = Axis(0, g.XY_DEVICE, 0, g.ARDUINO.pins[0]) # create axis objects (index, PSU, channel, relay pin)
g.Y_AXIS = Axis(1, g.XY_DEVICE, 1, g.ARDUINO.pins[1])
except serial.serialutil.SerialException: # communications error, usually PSU is not connected or wrong port set
g.X_AXIS = Axis(0, None, 0, g.ARDUINO.pins[0]) # create axis objects without the PSU
g.Y_AXIS = Axis(1, None, 1, g.ARDUINO.pins[1])
ui_print("XY Device not connected or incorrect port set.")
# same for the Z axis
ui_print("Connecting to Z Device on %s..." % g.Z_PORT)
try:
if g.Z_DEVICE is not None:
ui_print("Closing serial connection on Z device")
g.Z_DEVICE.serial.close()
g.Z_DEVICE = None
g.Z_DEVICE = PS2000B.PS2000B(g.Z_PORT)
ui_print("Connection established.")
g.Z_AXIS = Axis(2, g.Z_DEVICE, 0, g.ARDUINO.pins[2])
except serial.serialutil.SerialException:
g.Z_AXIS = Axis(2, None, 0, g.ARDUINO.pins[2])
ui_print("Z Device not connected or incorrect port set.")
# put newly created axis objects into a list for access later
g.AXES.append(g.X_AXIS)
g.AXES.append(g.Y_AXIS)
g.AXES.append(g.Z_AXIS)
ui_print("") # print new line
def set_to_zero(device): # sets voltages and currents to 0 on all channels of a specific PSU
device.voltage1 = 0
device.current1 = 0
device.voltage2 = 0
device.current2 = 0
def power_down_all(): # on all PSUs set all outputs to 0 but keep connections enabled
for axis in g.AXES:
axis.power_down() # set outputs to 0 and pin to low on this axis
def shut_down_all(): # safe shutdown at program end or on error
# set outputs to 0 and disable connections on all devices
ui_print("\nAttempting to safely shut down all devices. Check equipment to confirm.")
# start writing string to later show how shutdown on all devices went in a single info pop-up:
message = "Tried to shut down all devices. Check equipment to confirm."
# Shut down PSUs:
if g.XY_DEVICE is not None: # the PSU has been setup before
try: # try to safe the PSU
set_to_zero(g.XY_DEVICE) # set currents and voltages to 0 for both channels
g.XY_DEVICE.disable_all() # disable power output on both channels
ui_print("Closing serial connection on XY PSU")
g.XY_DEVICE.serial.close()
g.XY_DEVICE = None
except BaseException as e: # some error occurred, usually device has been disconnected
ui_print("Error while deactivating XY PSU:", e) # print the problem in the console
message += "\nError while deactivating XY PSU: %s" % e # append status to the message to show later
else: # device was successfully deactivated
ui_print("XY PSU deactivated.")
message += "\nXY PSU deactivated." # append message to show later
else: # the device was not connected before
# tell user there was no need/no possibility to deactivate:
ui_print("XY PSU not connected, can't deactivate.")
message += "\nXY PSU not connected, can't deactivate."
# same as above
if g.Z_DEVICE is not None:
try:
set_to_zero(g.Z_DEVICE)
g.Z_DEVICE.disable_all()
ui_print("Closing serial connection on Z PSU")
g.Z_DEVICE.serial.close()
g.Z_DEVICE = None
except BaseException as e:
ui_print("Error while deactivating Z PSU:", e)
message += "\nError while deactivating Z PSU: %s" % e
else:
ui_print("Z PSU deactivated.")
message += "\nZ PSU deactivated."
else:
ui_print("Z PSU not connected, can't deactivate.")
message += "\nZ PSU not connected, can't deactivate."
# Shut down arduino:
try:
g.ARDUINO.safe() # call safe method in ArduinoCtrl class (all relay pins to LOW)
except BaseException as e: # some error occurred
ui_print("Arduino safing unsuccessful:", e)
message += "\nArduino safing unsuccessful: %s" % e # append to the message to show later
# this throws no exception, even when arduino is not connected
# ToDo (optional): figure out error handling for this
try:
g.ARDUINO.close() # close the serial link
except BaseException as e: # something went wrong there
if g.ARDUINO.connected == "Connected": # Arduino was connected, some error occurred
ui_print("Closing Arduino connection failed:", e)
message += "\nClosing Arduino connection failed: %s" % e
else: # Arduino was not connected, so error is expected
ui_print("Arduino not connected, can't close connection.")
message += "\nArduino not connected, can't close connection."
else: # no problems, connection was successfully closed
ui_print("Serial connection to Arduino closed.")
message += "\nSerial connection to Arduino closed."
messagebox.showinfo("Program ended", message) # Show a unified pop-up with how the shutdown on each device went
def set_field_simple(vector): # forms magnetic field as specified by vector, w/o cancelling ambient field
for i in [0, 1, 2]:
try:
g.AXES[i].set_field_simple(vector[i]) # try to set the field on each axis
except ValueError as e: # a limit was violated, usually the needed current was too high
ui_print(e) # let the user know
def set_field(vector): # forms magnetic field as specified by vector, corrected for ambient field
# same as set_field_simple(), but with compensation
for i in [0, 1, 2]:
try:
g.AXES[i].set_field(vector[i])
except ValueError as e:
ui_print(e)
def set_current_vec(vector): # sets currents on each axis according to given vector
i = 0
for axis in g.AXES:
try:
axis.target_field = 0 # set target field attribute to 0 to show that current, not field is controlled atm
axis.target_field_comp = 0 # as above
axis.set_signed_current(vector[i]) # command test bench to set the current
except ValueError as e: # current was too high
ui_print(e) # print out the error message
i += 1
def devices_ok(xy_off=False, z_off=False, arduino_off=False):
# check if all devices are connected, return True if yes
# checks for individual devices can be disabled by parameters above (default not disabled)
try: # handle errors while checking connections
if not xy_off: # if check for this device is not disabled
if g.XY_DEVICE is not None: # has the handle for this device been set?
g.X_AXIS.update_status_info() # update info --> this actually communicates with the device
if g.X_AXIS.connected != "Connected": # if not connected
return False # return and exit function
else: # if handle has not been set the device is inactive --> not ok
return False
if not z_off: # same as above
if g.Z_DEVICE is not None:
g.Z_AXIS.update_status_info()
if g.Z_AXIS.connected != "Connected":
return False
else:
return False
if not arduino_off: # check not disabled
g.ARDUINO.update_status_info() # update status info --> attempts communication
if g.ARDUINO.connected != "Connected":
return False
except Exception as e: # if an error is encountered while checking the devices
messagebox.showerror("Error!", "Error while checking devices: \n%s" % e) # show error pop-up
return False # clearly something is not ok
else: # if nothing has triggered so far all devices are ok --> return True
return True
+131
View File
@@ -0,0 +1,131 @@
# This file contains functions and variables related to reading and writing configuration files.
# The configparser module is used for processing. Config files are of type .ini
# import packages:
from configparser import ConfigParser
from tkinter import messagebox
# import other project files:
import globals as g
import cage_func as func
# noinspection PyPep8Naming
import user_interface as ui
global CONFIG_FILE # string storing the path of the used config file
global CONFIG_OBJECT # object of type ConfigParser(), storing all configuration information
# CONFIG_OBJECT is what is mostly read/written by the program
# CONFIG_FILE is only used to export/import to/from a file
def get_config_from_file(file): # read a config file to a config_object
config_object = ConfigParser() # initialize config parser
config_object.read(file) # open config file
return config_object # return config object, that contains all info from the file
def write_config_to_file(config_object): # write contents of config object to a config file
with open(CONFIG_FILE, 'w') as conf: # Write changes to config file
config_object.write(conf)
def read_from_config(section, key, config_object): # read a specific value from a config object
try:
section_obj = config_object[section] # get relevant section
value = section_obj[key] # get relevant value in the section
return value
except KeyError as e: # a section or key was used, that does not exist
ui.ui_print("Error while reading config file:", e)
raise KeyError("Could not find key", key, "in config file.")
def edit_config(section, key, value, override=False): # edit a specific value in the CONFIG_OBJECT
# section: Section of the config, e.g. "X-Axis" or "PORTS"
# key: name of the value in the section, e.g. max_amps
# value: new value to be written into the config
# override: Bool to allow user to force writing a value into the config, even if it exceeds the safe limit
global CONFIG_OBJECT # get the global config object to edit it
# ToDo (optional): add check for data types, e.g. int for arduino ports
# Check if value to write is within acceptable limits (set in dictionary in globals.py):
try:
value_ok = 'OK'
if section in g.AXIS_NAMES and not override: # only check values in axis sections and not if check overridden
value_ok = func.value_in_limits(section, key, value) # check if value is ok, too high or too low
if value_ok == 'HIGH': # value is too high
max_value = g.default_arrays[key][1][g.AXIS_NAMES.index(section)] # get max value for message printing
message = "Prevented writing too high value for {s} {k} to config file:\n" \
"{v}, max. {mv} allowed. Erroneous values may damage equipment!" \
.format(s=section, k=key, v=value, mv=max_value)
raise ValueError(message) # return an error with the message attached
elif value_ok == 'LOW': # value is too low
min_value = g.default_arrays[key][2][g.AXIS_NAMES.index(section)] # get min value for message printing
message = "Prevented writing too low value for {s} {k} to config file:\n" \
"{v}, max. {mv} allowed. Erroneous values may damage equipment!" \
.format(s=section, k=key, v=value, mv=min_value)
raise ValueError(message) # return an error with the message attached
if value_ok == 'OK' or override: # value is within limits or user has overridden the checks
try:
section_obj = CONFIG_OBJECT[section] # get relevant section in the config
except KeyError: # there is no such section
ui.ui_print("Could not find section", section, "in config file, creating new.")
CONFIG_OBJECT.add_section(section) # create the missing section
section_obj = CONFIG_OBJECT[section] # get the object of the section
try:
section_obj[key] = str(value) # set value for correct entry in the section
except KeyError: # there is no entry with this key
ui.ui_print("Could not find key", key, "in config file, creating new.")
CONFIG_OBJECT.set(section, key, str(value)) # create the entry and set the value
except KeyError as e: # key for section or specific value does not exist in the dictionary for max/min values
ui.ui_print("Error while editing config file:", e)
raise KeyError("Could not find key", key, "in config file.") # return an error
def check_config(config_object): # check all numeric values in the config and see if they are within safe limits
ui.ui_print("Checking config file for values exceeding limits:")
concerns = {} # initialize dictionary for found problems
problem_counter = 0 # count the number of values that exceed limits
i = 0
for axis in g.AXIS_NAMES: # go through all 3 axes
concerns[axis] = [] # create dictionary entry for this axis
for key in g.default_arrays.keys(): # go over entries in this axis
value = float(read_from_config(axis, key, config_object)) # read value to check from config file
max_value = g.default_arrays[key][1][i] # get max value
min_value = g.default_arrays[key][2][i] # get min value
if not min_value <= value <= max_value: # value is not in safe limits
concerns[axis].append(key) # add this entry to the problem dictionary
problem_counter += 1
if len(concerns[axis]) == 0: # no problems were found for this axis
concerns[axis].append("No problems detected.")
ui.ui_print(axis, ":", *concerns[axis]) # print out results for this axis
i += 1
if problem_counter > 0: # some values are not ok
# shop pup-up warning message:
messagebox.showwarning("Warning!", "Found %i value(s) exceeding limits in config file. Check values "
"to ensure correct operation and avoid equipment damage!" % problem_counter)
g.app.show_frame(ui.Configuration) # open configuration window so user can check values
def reset_config_to_default(): # reset values in config object to defaults (set in globals.py)
config = ConfigParser() # reinitialize empty config object
global CONFIG_OBJECT # get the global config object
CONFIG_OBJECT = config # reset it to the empty object
i = 0
for axis_name in g.AXIS_NAMES: # go through axes
config.add_section(axis_name) # add section for this axis
for key in g.default_arrays.keys(): # go through dictionary with default values
config.set(axis_name, key, str(g.default_arrays[key][0][i])) # set values
i += 1
config.add_section("PORTS") # add section for PSU serial ports
for key in g.default_ports.keys(): # go through dictionary of default serial ports
config.set("PORTS", key, str(g.default_ports[key])) # set the value for each axis
+101
View File
@@ -0,0 +1,101 @@
# This file contains functions related to logging data from the program to a CSV file.
# They are mainly but not only called by the ConfigureLogging class in user_interface.py.
# import packages
import pandas as pd
from datetime import datetime
import os
from tkinter import filedialog
from tkinter import messagebox
# import other project files
import globals as g
import user_interface as ui
log_data = pd.DataFrame() # pandas data frame containing the logged data, in-program representation of csv/excel data
unsaved_data = False # Bool to indicate if there is unsaved data, set to True each time a datapoint is logged
zero_time = datetime.now() # set reference for timestamps in log file, reset when log_data is cleared and restarted
# create dictionary with all value handles that could be logged
# Key: String that is displayed in UI and column headers. Also serves as handle to access dictionary elements.
# Keys are the same as the rows in the status display ToDo (optional): use this for the status display
# Content: name of the corresponding attribute in the Axis class (in cage_func.py).
# Important: attribute handle must match definition in Axis class exactly, used with getattr() to get values.
axis_data_dict = {
'PSU Status': 'connected',
'Voltage Setpoint': 'voltage_setpoint',
'Actual Voltage': 'voltage',
'Current Setpoint': 'current_setpoint',
'Actual Current': 'current',
'Target Field': 'target_field_comp',
'Trgt. Field Raw': 'target_field_comp',
'Target Current': 'target_current',
'Inverted': 'polarity_switched'
}
def triple_list(key_list): # creates list with each entry of key_list tripled with axis names before it
new_list = [] # initialize list
for key in key_list: # go through the given list
for axis_name in ['X', 'Y', 'Z']: # per given list entry create three, one for each axis
new_list.append(' '.join((axis_name, key))) # put axis_name before the given entry and append to new list
return new_list
def log_datapoint(key_list): # logs a single row of data into the log_data DataFrame
# key_list determines what data is logged
global log_data # get global dataframe with logged data
global unsaved_data # get global variable that indicates if there is unsaved data
date = datetime.now().date() # get current date
time = datetime.now().strftime("%H:%M:%S,%f") # get string with current time in correct format
t = (datetime.now() - zero_time).total_seconds() # calculate timestamp relative to the start of the logging
data = [[date, time, t]] # initialize new data row with timestamps
for key in key_list: # go through the list telling us what data to log
for axis in g.AXES: # log this data for each axis
# get relevant value from the correct AXIS object and append to new data row:
data[0].append(getattr(axis, axis_data_dict[key]))
column_names = ["Date", "Time", "t (s)", *triple_list(key_list)] # create list with the correct column headers
new_row = pd.DataFrame(data, columns=column_names) # create data frame containing the new row
log_data = log_data.append(new_row, ignore_index=True) # append the new data frame to the logged data
unsaved_data = True # tell other program parts that there is now unsaved data
def select_file(): # select a file to write logs to
directory = os.path.abspath(os.getcwd()) # get project directory
# open file selection dialogue and save path of selected file
filepath = filedialog.asksaveasfilename(initialdir=directory, title="Set log file",
filetypes=([("Comma Separated Values", "*.csv*")]),
defaultextension=[("Comma Separated Values", "*.csv*")])
if filepath == '': # this happens when file selection window is closed without selecting a file
ui.ui_print("No file selected, can't save logged data.")
return None
else: # a valid file name was entered
return filepath
def write_to_file(dataframe, filepath):
# get global variables for use in this function:
global unsaved_data
if filepath is not None: # user has selected a file and no errors occurred
ui.ui_print("Writing logged data to file", filepath)
try:
# write data collected in log_data DataFrame to csv file in german excel format:
dataframe.to_csv(filepath, index=False, sep=';', decimal=',')
except PermissionError:
message = "No permission to write to: \n%s. \nFile may be open in another program." % filepath
messagebox.showerror("Permission Error", message)
except BaseException as e:
message = "Error while trying to write to file \n%s.\n%s" % (filepath, e)
messagebox.showerror("Error!", message)
else: # no exceptions occurred
unsaved_data = False # tell everything that there is no unsaved data remaining
ui.ui_print("Log data saved to", filepath)
def clear_logged_data(): # clears all logged data from data frame
global log_data # get global variable
log_data = pd.DataFrame() # reset to an empty data frame, i.e. clearing all logged data
+180
View File
@@ -0,0 +1,180 @@
# tThis file contains code for executing a sequence of magnetic fields from a csv file.
# To do this without crashing the UI it has to run in a separate thread using the threading module.
# ToDo!: apparently max. 1 thread can access PSU --> continuous update + csv thread crashes program. Find solution
# import packages:
import time
import pandas
import numpy as np
from threading import *
from tkinter import messagebox
import matplotlib.pyplot as plt
# import other project files:
import user_interface as ui
import cage_func as func
import globals as g
class ExecCSVThread(Thread):
# main class for executing a CSV sequence
# it inherits the threading.Thread class, enabling sequence execution in a separate thread
def __init__(self, array, parent, controller):
Thread.__init__(self)
self.array = array # numpy array containing data from csv to be executed
self.parent = parent # object from which this class is called, here the ExecuteCSVMode object of the UI
self.controller = controller # object on which mainloop() is running, usually the main UI window
self.__stop_event = Event() # event which can be set to stop the thread execution if needed
def run(self): # called to start the execution of the thread
ui.ui_print("\nStarting Sequence Execution...")
self.execute_sequence(self.array, 0.1, self.parent, self.controller) # run sequence
# when the sequence has ended, reset buttons on the UI:
if not g.exitFlag: # main window is open
self.parent.select_file_button["state"] = "normal"
self.parent.execute_button["state"] = "normal"
self.parent.stop_button["state"] = "disabled"
self.parent.reinit_button["state"] = "normal"
# setup ability to interrupt thread (https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread)
def stop(self): # stop thread execution, can be called from another thread to kill this one
self.__stop_event.set()
def stopped(self): # returns true if the thread has been stopped, used to check if a run should continue
return self.__stop_event.is_set()
def execute_sequence(self, array, delay, parent, controller):
# main execution method of the class
# runs through array with times and desired fields and commands test bench accordingly
# array format: [time (s), xField (T), yField (T), zField (T)]
func.power_down_all() # sets outputs on PSUs to 0 and Arduino pins to LOW before starting
t_zero = time.time() # set reference time for start of run
# Check if everything is properly connected:
all_connected = func.devices_ok(parent.xy_override.get(), parent.z_override.get(),
parent.arduino_override.get())
# True or False depending on devices status, checks for some devices may be overridden by user
i = 0 # index of the current array row
while i < len(array) and all_connected and not self.stopped() and not g.exitFlag:
# while array is not finished, devices are connected, user has not cancelled and application is running
t = time.time() - t_zero # get time relative to start of run
if t >= array[i, 0]: # time for this row has come
g.threadLock.acquire() # execute all lines until threadLock.release() before going back to main thread
# check if everything is still connected before sending commands:
all_connected = func.devices_ok(parent.xy_override.get(), parent.z_override.get(),
parent.arduino_override.get())
if all_connected:
field_vec = array[i, 1:4] # extract desired field vector
ui.ui_print("%0.5f s: t = %0.2f s, target field vector ="
% (time.time() - t_zero, array[i, 0]), field_vec * 1e6, "\u03BCT")
func.set_field(field_vec) # send field vector to test bench
controller.StatusDisplay.update_labels() # update status display after change
# ToDo: display update takes a long time, remove for performance?
# log change to the log file if user has selected event logging in the Configure Logging window
logger = controller.pages[ui.ConfigureLogging] # get object of logging configurator
if logger.event_logging: # data should be logged when test bench is commanded
logger.log_datapoint() # log data
i = i + 1 # next row
g.threadLock.release() # allow going back to main thread now
elif t <= array[i, 0] - delay - 0.02: # is there enough time to sleep before the next row?
time.sleep(delay) # sleep to give other threads time to run
if not self.stopped() and not g.exitFlag and all_connected: # sequence ended without interruption
ui.ui_print("Sequence executed, powering down channels.")
elif all_connected: # interrupted by user
ui.ui_print("Sequence cancelled, powering down channels.")
elif not all_connected: # interrupted by device error
ui.ui_print("Error with at least one device, sequence aborted.")
messagebox.showwarning("Device Error!", "Error with at least one device, sequence aborted.")
else: # if this happens there is a mistake in the logic above, it really should not
# tell the user something weird happened:
ui.ui_print("Encountered unexpected sequence end state:"
"\nThread Stopped:", self.stopped(), ", Application Closed:", g.exitFlag,
", Devices connected:", all_connected)
messagebox.showwarning("Unexpected state",
"Encountered unexpected sequence end state, see console output for details.")
func.power_down_all() # set currents and voltages to 0, set arduino pins to low
def read_csv_to_array(filepath): # convert a given csv file to a numpy array
# csv format: time (s); xField (T); yField (T); zField (T) (german excel)
# decimal commas
file = pandas.read_csv(filepath, sep=';', decimal=',', header=0) # read csv file without column headers
array = file.to_numpy() # convert csv to array
return array
def check_array_ok(array):
# check if any magnetic fields in an array exceed the test bench limits and if so display a warning message
values_ok = True
for i in [0, 1, 2]: # go through axes/columns
max_val = g.AXES[i].max_comp_field[1] # get limits the test bench can do
min_val = g.AXES[i].max_comp_field[0]
data = array[:, i + 1] # extract data for this axis from array
# noinspection PyTypeChecker
if any(data > max_val) or any(data < min_val): # if any datapoint is out of bounds
values_ok = False
if not values_ok: # show warning pop-up if values are exceeding limits
messagebox.showwarning("Value Limits Warning!", "Found field values exceeding limits of test bench."
"\nSee plot and check values in csv.")
def plot_field_sequence(array, width, height): # create plot of fixed size (pixels) from array
# ToDo (optional): polar plots, plots of angle...
fig_dpi = 100 # set figure resolution (dots per inch)
px = 1/fig_dpi # get pixel to inch size conversion
figure = plt.Figure(figsize=(width*px, height*px), dpi=fig_dpi) # create figure with correct size
# noinspection PyTypeChecker,SpellCheckingInspection
axes = figure.subplots(3, sharex=True, sharey=True, gridspec_kw={'hspace': 0.4}) # create subplots with shared axes
figure.suptitle("Magnetic Field Sequence") # set figure title
# modify data to show instantaneous jumps in field to reflect test bench operation
new_array = np.array([[0, 0, 0, 0]], dtype=float) # initialize modified array, zeros to show start from no fields
last_vals = [0, 0, 0] # [x,y,z] field values from last data point (zero here), used to create step in data
for row in array[:, 0:4]: # go through each row in the original array
# create extra datapoint at current timestamp, with field values from last to create a "step" in the plot:
new_array = np.append(new_array, [[row[0], *last_vals]], axis=0)
new_array = np.append(new_array, [row], axis=0) # add actual datapoint for current timestamp
last_vals = row[1:4] # save values from current timestamp for next
new_array = np.append(new_array, [[new_array[-1, 0], 0, 0, 0]], axis=0) # append last datapoint with 0 fields
# extract data and plot:
t = new_array[:, 0] # extract time column
for i in [0, 1, 2]: # go through all three axes
data = new_array[:, i + 1] * 1e6 # extract field column of this axis and convert to microtesla
max_val = g.AXES[i].max_comp_field[1] * 1e6 # get limits of achievable field
min_val = g.AXES[i].max_comp_field[0] * 1e6
plot = axes[i] # get appropriate subplot
plot.plot(t, data, linestyle='solid', marker='.') # plot data
if any(data > max_val): # if any value is higher than the maximum
plot.axhline(y=max_val, linestyle='dashed', color='r') # plot horizontal line to show maximum
# add label to line:
plot.text(t[-1], max_val, "max", horizontalalignment='center', verticalalignment='top', color='r')
if any(data < min_val): # same as above
plot.axhline(y=min_val, linestyle='dashed', color='r')
plot.text(t[-1], min_val, "min", horizontalalignment='center', color='r')
plot.set_title(g.AXIS_NAMES[i], size=10) # set subplot title (e.g. "X-Axis")
# set shared axis labels:
axes[2].set_xlabel("Time (s)")
axes[1].set_ylabel("Magnetic Field (\u03BCT)")
return figure # return the created figure to be inserted somewhere
+53
View File
@@ -0,0 +1,53 @@
# This file is used to hold global variables that are used by more than one file of the program.
# Instead of always passing all variables to functions, this file can simply be imported to get them.
import numpy as np
XY_DEVICE = None # XY PSU object will be stored here (class PS2000B)
Z_DEVICE = None # Z PSU object will be stored here (class PS2000B)
ARDUINO = None # Arduino object will be stored here (class ArduinoCtrl)
# Axis objects will be stored here (class Axis)
X_AXIS = None
Y_AXIS = None
Z_AXIS = None
AXES = None # list containing [X_AXIS, Y_AXIS, Z_AXIS]
app = None # Main Tkinter application object will be stored here (class HelmholtzGUI)
AXIS_NAMES = ["X-Axis", "Y-Axis", "Z-Axis"] # list with the names of each axis, used mainly for printing functions
global XY_PORT # serial port for XY PSU will be stored here (string)
global Z_PORT # serial port for Z PSU will be stored here (string)
global PORTS # list containing [XY_PORT, XY_PORT, Z_PORT], used in loops where info on each axis is needed
global threadLock # thread locking object, used to force threads to perform actions in a certain order (threading.Lock)
exitFlag = True # False when main window is open, True otherwise
# Create dictionaries with default Constants and maximum/minimum values
# Used to create default configs and to check if user inputs are within safe limits
# ToDo: refine values after testing
# ToDo: put this into a config file
# Dictionary for numerical values:
# format: key: [default values], [maximum values], [minimum values]
default_arrays = {
"coil_const": np.array([[38.957, 40.408, 37.754], [50, 50, 50], [0, 0, 0]]) * 1e-6, # Coil constants [x,y,z] [T/A]
"ambient_field": np.array([[0, 0, 0], [200, 200, 200], [-200, -200, -200]]) * 1e-6, # ambient magnetic field [T]
"resistance": np.array([[3.131, 3.107, 3.129], [5, 5, 5], [1, 1, 1]], dtype=float), # resistance of circuits [Ohm]
"max_volts": np.array([[15, 15, 15], [16, 16, 16], [0, 0, 0]], dtype=float), # max. voltage, limited to 16V by used diodes! [V]
"max_amps": np.array([[5, 5, 5], [6, 6, 6], [0, 0, 0]], dtype=float), # max. allowed current (A)
"relay_pin": [[15, 16, 17], [15, 16, 17], [15, 16, 17]] # pins on the arduino for reversing [x,y,z] polarity
}
# Dictionary for PSU serial ports:
default_ports = {
"xy_port": "COM8", # Default serial port where PSU for X- and Y-Axes is connected
"z_port": "COM5", # Default serial port where PSU for Z-Axis is connected
}
# Configuration for socket interface
SOCKET_PORT = 6677
SOCKET_MAX_CONNECTIONS = 5
+85
View File
@@ -0,0 +1,85 @@
# Main file of the program. Run this file to start the application.
# ToDo: improve performance, communication to PSUs is way too slow for dynamic field changes
# import packages:
from os.path import exists
import traceback
from tkinter import messagebox
# import other project files:
import cage_func as func
from user_interface import HelmholtzGUI
from user_interface import ui_print
import user_interface as ui
import globals as g
import config_handling as config
import csv_logging as log
from socket_control import SocketInterfaceThread
def program_end(): # called on exception or when user closes application
# safely shuts everything down and saves any unsaved data
g.exitFlag = True # tell everything else the application has been closed
if g.app is not None: # the main Tkinter app object has been initialized before
if g.app.pages[ui.ExecuteCSVMode].csv_thread is not None: # check if a thread for executing CSVs exists
g.app.pages[ui.ExecuteCSVMode].csv_thread.stop() # stop the thread
func.shut_down_all() # shut down devices
if log.unsaved_data: # Check if there is logged data that has not been saved yet
# open pop-up to ask user if he wants to save the data:
save_log = messagebox.askquestion("Save log data?", "There seems to be unsaved logging data. "
"Do you wish to write it to a file now?")
if save_log == 'yes': # user has chosen yes
filepath = log.select_file() # let user select a file to write to
log.write_to_file(log.log_data, filepath) # write the data to the chosen file
if g.app is not None:
g.app.destroy() # close application
try: # start normal operations
config.CONFIG_FILE = 'config.ini' # set the config file path
# ToDo: remember what the last config file was
if not exists(config.CONFIG_FILE): # config file does not exist yet
print("Config file not found, creating new from defaults.")
config.reset_config_to_default() # create configuration object from defaults
config.write_config_to_file(config.CONFIG_OBJECT) # write the configuration object to a new file
config.CONFIG_OBJECT = config.get_config_from_file(config.CONFIG_FILE) # read configuration data from config file
print("Starting setup...")
func.setup_all() # initiate communication with devices and initialize all major program objects
print("\nOpening User Interface...")
g.app = HelmholtzGUI() # initialize user interface
g.exitFlag = False # tell all functions that the user interface is now running
# g.app.state('zoomed') # open UI in maximized window
# g.app.StatusDisplay.continuous_label_update(g.app, 1000) # initiate regular Status Display updates (ms)
# ToDo: label update is very slow, commented out to save performance but should be implemented
# ToDo!: csv thread + continuous label update seems to exceed capacity of PSU communication
ui_print("Program Initialized")
config.check_config(config.CONFIG_OBJECT) # check config for values exceeding limits
ui_print("\nStarting setup...") # do setup again, so it is printed in the UI console ToDo: do it only once
func.setup_all() # initiate communication with devices and initialize all major program objects
# Create TCP/Socket listener
socket_controller = SocketInterfaceThread()
socket_controller.start()
g.app.protocol("WM_DELETE_WINDOW", program_end) # call program_end function if user closes the application
g.app.mainloop() # start main program loop
except Exception as e: # An error has occurred somewhere in the program
print("\nAn error occurred, Shutting down.")
# shop pup-up error message:
message = "%s.\nSee python console traceback for more details. " \
"\nShutting down devices, check equipment to confirm." % e
messagebox.showerror("Error!", message)
print(traceback.print_exc()) # print error traceback in the python console
program_end() # safely close everything and shut down devices
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2016 Sören Sprößig
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+409
View File
@@ -0,0 +1,409 @@
# This file enables communication with PS2000B Power Supply Units.
#!/usr/bin/env python3
# coding=utf-8
# Python access to Elektro Automatik PS 2000 B devices via USB/serial
#
# Supported features:
# - read static device information (manufacturer, serial, device type ...)
# - read dynamic device information (current, voltage)
# - read/write remote control
# - read/write output control
#
# - wrap error results in own telegram
#
# References
# [1] = "PS 2000B Programming Guide" from 2015-05-28
# [2] = "PS 2000B object list"
#
import serial
import struct
import sys
PY_3 = sys.version_info >= (3, 0)
# noinspection SpellCheckingInspection
__author__ = "Sören Sprößig <ssproessig@gmail.com>"
def as_string(raw_data):
"""Converts the given raw bytes to a string (removes NULL)"""
return bytearray(raw_data[:-1])
def as_float(raw_data):
"""Converts the given raw bytes to a float"""
f = struct.unpack_from(">f", bytearray(raw_data))[0]
return f
def as_word(raw_data):
"""Converts the given raw bytes to a word"""
w = struct.unpack_from(">H", bytearray(raw_data))[0]
return w
def _ord(x):
"""Wrap ord() call as we only need it in Python 2"""
return x if PY_3 else ord(x)
# noinspection PyClassHasNoInit
class Constants:
"""Communication constants"""
# communication parameters taken from [1], chapter 2.2
CONNECTION_BAUD_RATE = 115200
CONNECTION_PARITY = serial.PARITY_ODD
CONNECTION_STOP_BITS = 1
# timeout taken from [1], chapter 3.7
TIMEOUT_BETWEEN_COMMANDS = 0.05
# according to spec [1] 2.4:
# maximum length of a telegram is 21 bytes (Byte 0..20)
MAX_LEN_IN_BYTES = 21
# noinspection PyClassHasNoInit
class Objects:
"""Supported objects ids / commands"""
DEVICE_TYPE = 0
DEVICE_SERIAL_NO = 1
NOMINAL_VOLTAGE = 2
NOMINAL_CURRENT = 3
NOMINAL_POWER = 4
DEVICE_ARTICLE_NO = 6
MANUFACTURER = 8
SOFTWARE_VERSION = 9
SET_VALUE_VOLTAGE = 50
SET_VALUE_CURRENT = 51
POWER_SUPPLY_CONTROL = 54
STATUS_ACTUAL_VALUES = 71
# noinspection PyClassHasNoInit
class ControlParam:
"""Parameters for controlling the device"""
SWITCH_MODE_CMD = 0x10
SWITCH_MODE_REMOTE = 0x10
SWITCH_MODE_MANUAL = 0x00
SWITCH_POWER_OUTPUT_CMD = 0x1
SWITCH_POWER_OUTPUT_ON = 0x1
SWITCH_POWER_OUTPUT_OFF = 0x0
class Telegram:
"""Base class of a PS2000B telegram - basically allows accessing the raw bytes and does checksum calculation"""
def __init__(self):
self._bytes = []
self._checksum = []
self.checksum_ok = False
def _calc_checksum(self):
cs = 0
for b in self._bytes:
cs += b
cs_high = (cs & 0xff00) >> 8
cs_low = cs & 0xff
return [cs_high, cs_low]
@staticmethod
def _get_start_delimiter(transmission, expected_data_length):
result = 0b00000000
if expected_data_length > 16:
raise Exception("only 4 bits for expected length can be used")
result |= (expected_data_length - 1)
result |= 0b10000
result |= 0b100000
result |= transmission << 6
return result
def get_byte_array(self):
return bytearray(self._bytes + self._checksum)
class FromPowerSupply(Telegram):
"""Telegram received from the power supply"""
def __init__(self, raw_data):
Telegram.__init__(self)
data = [_ord(x) for x in raw_data]
self._bytes = data[0:-2]
self._checksum = data[len(data) - 2:len(data)]
self.checksum_ok = self._checksum == self._calc_checksum()
def get_sd(self):
return self._bytes[0]
def get_device_node(self):
return self._bytes[1]
def get_object(self):
return self._bytes[3]
def get_data(self):
return self._bytes[3:len(self._bytes)]
# noinspection PyMethodMayBeStatic
def get_error(self):
# ToDo: [1] chapter 3.6 add support for error codes here
return None
class ToPowerSupply(Telegram):
"""A telegram sent to the power supply"""
def __init__(self, transmission, data, expected_data_length):
Telegram.__init__(self)
self._bytes = []
self._bytes.append(self._get_start_delimiter(transmission, expected_data_length))
self._bytes.extend(data)
self._checksum = self._calc_checksum()
self.checksum_ok = True
class DeviceInformation:
"""A class carrying all static device information read from the device"""
def __init__(self):
self.device_type = ""
self.device_serial_no = ""
self.nominal_voltage = 0
self.nominal_current = 0
self.nominal_power = 0
self.manufacturer = ""
self.device_art_no = ""
self.software_version = ""
def __str__(self):
return "%s %s [%s], SW: %s, Art-Nr: %s, [%0.2f V, %0.2f A, %0.2f W]" % \
(self.manufacturer,
self.device_type, self.device_serial_no,
self.software_version, self.device_art_no,
self.nominal_voltage, self.nominal_current, self.nominal_power)
class DeviceStatusInformation:
"""A class carrying all dynamic device status information"""
def __init__(self, raw_data):
self.remote_control_active = raw_data[0] & 0b1
self.output_active = raw_data[1] & 0b1
self.actual_voltage_percent = float(as_word(raw_data[2:4])) / 256
self.actual_current_percent = float(as_word(raw_data[4:6])) / 256
def __str__(self):
if self.remote_control_active == 1:
remote = "active"
else:
remote = "inactive"
if self.output_active == 1:
output = "active"
else:
output = "inactive"
return "Remote control %s, Output %s" % (remote, output)
class PS2000B:
"""PS 2000 B main communication class"""
def __init__(self, serial_port):
self.__device_status_information1 = None
self.__device_status_information2 = None
self.serial = serial.Serial(serial_port,
baudrate=Constants.CONNECTION_BAUD_RATE,
timeout=Constants.TIMEOUT_BETWEEN_COMMANDS * 2,
parity=serial.PARITY_ODD,
stopbits=Constants.CONNECTION_STOP_BITS)
self.__device_information = self.__read_device_information()
def is_open(self):
return self.serial.is_open
def get_device_information(self):
return self.__device_information
def __read_device_information(self, channel=0): # reads static device information, usually not channel dependant
result = DeviceInformation()
# taken from [2]
result.device_type = as_string(self.__read_device_data(16, Objects.DEVICE_TYPE, channel).get_data())
result.device_serial_no = as_string(self.__read_device_data(16, Objects.DEVICE_SERIAL_NO, channel).get_data())
result.nominal_voltage = as_float(self.__read_device_data(4, Objects.NOMINAL_VOLTAGE, channel).get_data())
result.nominal_current = as_float(self.__read_device_data(4, Objects.NOMINAL_CURRENT, channel).get_data())
result.nominal_power = as_float(self.__read_device_data(4, Objects.NOMINAL_POWER, channel).get_data())
result.device_art_no = as_string(self.__read_device_data(16, Objects.DEVICE_ARTICLE_NO, channel).get_data())
result.manufacturer = as_string(self.__read_device_data(16, Objects.MANUFACTURER, channel).get_data())
result.software_version = as_string(self.__read_device_data(16, Objects.SOFTWARE_VERSION, channel).get_data())
return result
def __read_device_data(self, expected_length, object_id, channel): # reads data from device based on object_id
telegram = ToPowerSupply(0b01, [channel, object_id], expected_length)
result = self.__send_and_receive(telegram.get_byte_array())
return result
def __send_and_receive(self, raw_bytes): # sends request for info to device and reads reply
self.serial.write(raw_bytes)
result = FromPowerSupply(self.serial.read(Constants.MAX_LEN_IN_BYTES))
return result
def get_device_status_information(self, channel): # gets dynamic device information (e.g. current, voltage)
if channel == 0:
if self.__device_status_information1 is None:
self.update_device_information(0)
info = self.__device_status_information1
elif channel == 1:
if self.__device_status_information2 is None:
self.update_device_information(1)
info = self.__device_status_information2
else:
raise ValueError("Invalid Channel")
return info
def update_device_information(self, channel): # updates dynamic device info stored in __device_status_information
telegram = ToPowerSupply(0b01, [channel, Objects.STATUS_ACTUAL_VALUES], 6)
device_information = self.__send_and_receive(telegram.get_byte_array())
if channel == 0:
self.__device_status_information1 = DeviceStatusInformation(device_information.get_data())
elif channel == 1:
self.__device_status_information2 = DeviceStatusInformation(device_information.get_data())
else:
raise ValueError("Invalid Channel")
def __send_device_control(self, p1, p2, channel): # sends commands to PSU, commands given in p1, p2
telegram = ToPowerSupply(0b11, [channel, Objects.POWER_SUPPLY_CONTROL, p1, p2], 2)
_ = self.__send_and_receive(telegram.get_byte_array()) # send command to PSU
self.update_device_information(channel) # update info after change
def __send_device_data(self, obj, data, channel):
# Send integer data with obj-id to the PSU
telegram = ToPowerSupply(0b11, [channel, obj, data >> 8, data & 0xff], 4)
_ = self.__send_and_receive(telegram.get_byte_array())
self.update_device_information(channel)
def enable_all(self):
self.enable_remote_control(0)
self.enable_remote_control(1)
self.enable_output(0)
self.enable_output(1)
def disable_all(self):
self.disable_output(0)
self.disable_output(1)
self.disable_remote_control(0)
self.disable_remote_control(1)
def enable_remote_control(self, channel):
self.__send_device_control(ControlParam.SWITCH_MODE_CMD, ControlParam.SWITCH_MODE_REMOTE, channel)
def disable_remote_control(self, channel):
self.__send_device_control(ControlParam.SWITCH_MODE_CMD, ControlParam.SWITCH_MODE_MANUAL, channel)
def enable_output(self, channel):
self.__send_device_control(ControlParam.SWITCH_POWER_OUTPUT_CMD, ControlParam.SWITCH_POWER_OUTPUT_ON, channel)
def disable_output(self, channel):
self.__send_device_control(ControlParam.SWITCH_POWER_OUTPUT_CMD, ControlParam.SWITCH_POWER_OUTPUT_OFF, channel)
@property
def output1(self): # object controlling output 1 on/off
return self.get_device_status_information(0).output_active
@output1.setter
def output1(self, value):
if value:
self.enable_output(0)
else:
self.disable_output(0)
@property
def output2(self): # object controlling output 2 on/off
return self.get_device_status_information(1).output_active
@output2.setter
def output2(self, value):
if value:
self.enable_output(1)
else:
self.disable_output(1)
def get_voltage(self, channel):
self.update_device_information(channel)
if channel == 0:
v_perc = self.__device_status_information1.actual_voltage_percent
elif channel == 1:
v_perc = self.__device_status_information2.actual_voltage_percent
else:
raise ValueError("Invalid channel")
voltage = self.__device_information.nominal_voltage * v_perc
return voltage / 100
def get_voltage_setpoint(self, channel):
res = self.__read_device_data(2, Objects.SET_VALUE_VOLTAGE, channel).get_data()
return self.__device_information.nominal_voltage * ((res[0] << 8) + res[1]) / 25600.0
def set_voltage(self, value, channel):
self.update_device_information(channel)
self.enable_remote_control(channel)
volt = int(round((value * 25600.0) / self.__device_information.nominal_voltage))
self.__send_device_data(Objects.SET_VALUE_VOLTAGE, volt, channel)
@property
def voltage1(self): # object storing and controlling the voltage of channel 1
return self.get_voltage(0)
@voltage1.setter
def voltage1(self, value): # voltage of channel 1
self.set_voltage(value, 0)
@property
def voltage2(self): # object storing and controlling the voltage of channel 2
return self.get_voltage(1)
@voltage2.setter
def voltage2(self, value):
self.set_voltage(value, 1)
def get_current(self, channel):
self.update_device_information(channel)
if channel == 0:
c_perc = self.__device_status_information1.actual_current_percent
elif channel == 1:
c_perc = self.__device_status_information2.actual_current_percent
else:
raise ValueError("Invalid channel")
current = self.__device_information.nominal_current * c_perc
return current / 100
def get_current_setpoint(self, channel):
res = self.__read_device_data(2, Objects.SET_VALUE_CURRENT, channel).get_data()
return self.__device_information.nominal_current * ((res[0] << 8) + res[1]) / 25600.0
def set_current(self, value, channel):
self.update_device_information(channel)
self.enable_remote_control(channel)
curr = int(round((value * 25600.0) / self.__device_information.nominal_current))
self.__send_device_data(Objects.SET_VALUE_CURRENT, curr, channel)
@property
def current1(self):
return self.get_current(0)
@current1.setter
def current1(self, value): # current of channel 1
self.set_current(value, 0)
@property
def current2(self):
return self.get_current(1)
@current2.setter
def current2(self, value): # current of channel 2
self.set_current(value, 1)
+68
View File
@@ -0,0 +1,68 @@
# Python-PS2000B
Python library to work with Elektro-Automatik PS 2000 B power supplies via USB.
## Compatibility
Tested with:
+ Python 2.7
+ Python 3.5, 3.6
Tested on:
+ Windows 7 x64
+ Windows 10 x64 Version 1607 (OS Build 14393.2035)
+ Ubuntu 16.04.1 LTS
+ Ubuntu 20.04.1 LTS
## Features of Python-PS2000B
### Supported
- read static device information (manufacturer, serial, device type ...)
- read dynamic device information (current, voltage)
- read/write remote control
- read/write output control
### Still todo
- set voltage and current
- wrap error results in own telegram
## Prerequisites
### Python
The following third-party Python libraries are needed:
* `pyserial` - serial communication library for Python, see https://pypi.python.org/pypi/pyserial
### Windows
On Windows the USB driver (fetch it from http://www.elektroautomatik.de/files/eautomatik/treiber/usb/ea_device_driver.rar) must be installed. Afterwards you can find the serial port `COMxx` in the *device manager*.
### Linux
On Linux the device is detected as `/dev/ttyACMx` (abstract control model, see https://www.rfc1149.net/blog/2013/03/05/what-is-the-difference-between-devttyusbx-and-devttyacmx/). Use `dmesg` after connecting the device to find out `x`.
Most Linuxes will require users to elevate for accessing the device. If you want to access the device as your current user, just add it to the group `dialout` (`ls -lah /dev/ttyACM0` will show you the group to use, usually this is `dialout`) and login again.
## Usage
```python
import time
from pyps2000b import PS2000B
device = PS2000B.PS2000B("/dev/ttyACM0")
print("Device status: %s" % device.get_device_status_information())
device.enable_remote_control()
device.voltage1 = 5.1
device.current1 = 1
device.enable_output()
time.sleep(1)
print("Current output: %0.2f V , %0.2f A" % (device.get_voltage(), device.get_current()))
device.output1 = False
```
## Documentation
+ product website: http://www.elektroautomatik.de/en/ps2000b.html
+ programming guide PS 2000 B: http://www.elektroautomatik.de/files/eautomatik/treiber/ps2000b/programming_ps2000b.zip
+4
View File
@@ -0,0 +1,4 @@
#!/usr/bin/python
# coding=utf-8
# noinspection SpellCheckingInspection
__author__ = "Sören Sprößig <ssproessig@gmail.com>"
+146
View File
@@ -0,0 +1,146 @@
import globals as g
from user_interface import ui_print
import cage_func as cage_controls
from threading import Thread
import socket
import numpy as np
# --- Definition of TCP interface ---
#
# Clients should by default initialize a TCP connection to port 6677
# The commands shown must be terminated with a single \n (newline) char
# Commands may be split across multiple packets.
# Before useful commands can be sent, declare_api_version must be called.
#
# A description of the TCP api (safety limits are always enforced):
#
# set_raw_field [X comp.] [Y comp.] [Z comp.]
# Returns: 0 or 1 for success
# Accepts decimal point formatted floats, with or without scientific notation. The float() cast must understand it.
# The field units are Tesla
# This causes an additional field of the given strength to be generated, without regard for the pre-existing
# geomagnetic/external fields.
#
# set_compensated_field [X comp.] [Y comp.] [Z comp.]
# Returns: 0 or 1 for success
# Accepts decimal point formatted floats, with or without scientific notation. The float() cast must understand it.
# The field units are Tesla
# This causes a field of exactly the given magnitude to be generated by compensating external factors such as the
# geomagnetic field.
#
# set_coil_currents [X comp.] [Y comp.] [Z comp.]
# Returns: 0 or 1 for success
# Accepts decimal point formatted floats, with or without scientific notation. The float() cast must understand it.
# The field units are Ampere
# This establishes the requested current in the individual coils.
#
# get_api_version
# Returns: a string uniquely identifying each API version.
# This function can be called before declare_api_version.
# Please dont put
#
# declare_api_version [version]
# Returns: 0 or 1 (terminated with newline)
# Declare the api version the client application was programmed for. It must be compatible with the current
# API version. This prevents unexpected behaviour by forcing programmers to specify which API they are expecting.
# This function must be called before sending HW commands.
SOCKET_INTERFACE_API_VERSION = "1"
class ClientConnectionThread(Thread):
def __init__(self, client_socket, address):
Thread.__init__(self)
self.client_socket = client_socket
self.client_address = address
self.api_compat = False # Indicates whether the client has a compatible API version
def run(self):
msg = ''
while True:
raw_msg = self.client_socket.recv(2048).decode()
for char in raw_msg:
if char == '\n':
msg = msg.rstrip() # Some systems will try to send \r characters... looking at you windows O_O
try:
response = self.handle_msg(msg)
except Exception as e:
ui_print("An error occurred while processing a client message")
ui_print("Msg: {}".format(msg))
ui_print(e)
response = "err"
self.client_socket.sendall((response + '\n').encode('utf-8'))
msg = ''
else:
msg += char
def handle_msg(self, message):
""" Executes command logic and returns string response (for client). """
tokens = message.split(" ")
if tokens[0] == "get_api_version":
return SOCKET_INTERFACE_API_VERSION
elif tokens[0] == "declare_api_version":
if tokens[1] == SOCKET_INTERFACE_API_VERSION:
self.api_compat = True
return "1"
else:
ui_print("Declared socket API version ({}) is incompatible with current version ({})!".format(tokens[1], SOCKET_INTERFACE_API_VERSION))
return "0"
else:
# api_compat indicates we have checked the api version and are ready to accept commands
if self.api_compat:
if tokens[0] == "set_raw_field":
x = float(tokens[1])
y = float(tokens[2])
z = float(tokens[3])
field_vec = np.array([x, y, z], dtype=np.float32)
# uncompensated
cage_controls.set_field_simple(field_vec)
return "1"
elif tokens[0] == "set_compensated_field":
x = float(tokens[1])
y = float(tokens[2])
z = float(tokens[3])
field_vec = np.array([x, y, z], dtype=np.float32)
# compensated
cage_controls.set_field(field_vec)
return "1"
elif tokens[0] == "set_coil_currents":
x = float(tokens[1])
y = float(tokens[2])
z = float(tokens[3])
current_vec = np.array([x, y, z], dtype=np.float32)
cage_controls.set_current_vec(current_vec)
return "1"
else:
# The message given is unknown. The programmer probably did not intend for this, so display an error
# even if is not inherently problematic.
raise Exception("The command '{}' is unknown".format(tokens[0]))
else:
raise Exception("The command '{}' may not be called before 'declare_api_version'".format(tokens[0]))
class SocketInterfaceThread(Thread):
def __init__(self):
Thread.__init__(self)
self.server_socket = None
# Can throw exception, which should be passed on to the instantiator of this class
self.configure_tcp_port()
def run(self):
while True:
(client_socket, address) = self.server_socket.accept()
new_thread = ClientConnectionThread(client_socket, address)
new_thread.start()
ui_print("Accepted connection from {}".format(address))
def configure_tcp_port(self):
# Creates and configures the listening port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind(('', g.SOCKET_PORT))
self.server_socket.listen(5) # Limit to max. 5 simultaneous connections
ui_print("Listening for TCP connections on port {}".format(g.SOCKET_PORT))
File diff suppressed because it is too large Load Diff