forked from zietzm/Helmholtz_Test_Bench
Fixed and extended csv_logging.py
This commit is contained in:
+129
-41
@@ -1,65 +1,145 @@
|
||||
# This file contains functions related to logging data from the program to a CSV file.
|
||||
# They are mainly but not only called by the ConfigureLogging class in user_interface.py.
|
||||
|
||||
# import packages
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
import os
|
||||
import csv
|
||||
|
||||
from tkinter import filedialog
|
||||
from tkinter import messagebox
|
||||
|
||||
# import other project files
|
||||
import src.globals as g
|
||||
from src.utility import ui_print
|
||||
|
||||
log_data = pd.DataFrame() # pandas data frame containing the logged data, in-program representation of csv/excel data
|
||||
|
||||
log_data = [] # List containing the logged data, in-program representation of csv numerical data
|
||||
unsaved_data = False # Bool to indicate if there is unsaved data, set to True each time a datapoint is logged
|
||||
zero_time = datetime.now() # set reference for timestamps in log file, reset when log_data is cleared and restarted
|
||||
|
||||
# create dictionary with all value handles that could be logged
|
||||
# Key: String that is displayed in UI and column headers. Also serves as handle to access dictionary elements.
|
||||
# Keys are the same as the rows in the status display ToDo (optional): use this for the status display
|
||||
# Content: name of the corresponding attribute in the Axis class (in cage_func.py).
|
||||
# Important: attribute handle must match definition in Axis class exactly, used with getattr() to get values.
|
||||
axis_data_dict = {
|
||||
'PSU Status': 'connected',
|
||||
'Voltage Setpoint': 'voltage_setpoint',
|
||||
'Actual Voltage': 'voltage',
|
||||
'Current Setpoint': 'current_setpoint',
|
||||
'Actual Current': 'current',
|
||||
'Target Field': 'target_field_comp',
|
||||
'Trgt. Field Raw': 'target_field_comp',
|
||||
'Target Current': 'target_current',
|
||||
'Inverted': 'polarity_switched'
|
||||
# Dictionary listing the options that control what is present is the log output
|
||||
# Key: The option that this control sets
|
||||
# Value: displayed in the UI
|
||||
logging_selection_options = {
|
||||
'timestamp': 'Timestamp',
|
||||
'device_stat': 'PSU + Arduino Status',
|
||||
'x_axis': 'Log X-Axis Data',
|
||||
'y_axis': 'Log Y-Axis Data',
|
||||
'z_axis': 'Log Z-Axis Data',
|
||||
'v': 'Voltage Setpoint',
|
||||
'v_actual': 'Actual Voltage',
|
||||
'i': 'Current Setpoint',
|
||||
'i_actual': 'Actual Current',
|
||||
'tgt_field': 'Target Field',
|
||||
'tgt_field_raw': 'Target Field Raw',
|
||||
'tgt_field_i': 'Target Current',
|
||||
'inv': 'Inverted',
|
||||
'mag_stat': 'Magnetometer Status',
|
||||
'mag_field': 'Magnetometer Reading',
|
||||
}
|
||||
|
||||
|
||||
def triple_list(key_list): # creates list with each entry of key_list tripled with axis names before it
|
||||
new_list = [] # initialize list
|
||||
for key in key_list: # go through the given list
|
||||
for axis_name in ['X', 'Y', 'Z']: # per given list entry create three, one for each axis
|
||||
new_list.append(' '.join((axis_name, key))) # put axis_name before the given entry and append to new list
|
||||
return new_list
|
||||
# This returns a dictionary defining the human readable column headers (Value) corresponding to the short forms (key)
|
||||
# It has many similarities to the options above, but there is not a perfect correspondence between columns and logging
|
||||
# options. This is both in terms of the user friendly name, which can differ, and the options.
|
||||
def get_long_column_header():
|
||||
header = {
|
||||
'date': 'Date',
|
||||
'time': 'Timestamp',
|
||||
't': 'Time',
|
||||
'arduino_stat': 'Arduino Status',
|
||||
'xy_psu_stat': 'XY PSU Status',
|
||||
'z_psu_stat': 'Z PSU Status',
|
||||
'mag_stat': 'Magnetometer Status',
|
||||
}
|
||||
|
||||
for i in ['x', 'y', 'z']:
|
||||
i_caps = i.upper()
|
||||
header[f'{i}_v'] = f'{i_caps}-Axis Voltage Setpoint'
|
||||
header[f'{i}_v_actual'] = f'{i_caps}-Axis Actual Voltage'
|
||||
header[f'{i}_i'] = f'{i_caps}-Axis Current Setpoint'
|
||||
header[f'{i}_i_actual'] = f'{i_caps}-Axis Actual Current'
|
||||
header[f'{i}_tgt_field'] = f'{i_caps}-Axis Target Field'
|
||||
header[f'{i}_tgt_field_raw'] = f'{i_caps}-Axis Target Field Raw'
|
||||
header[f'{i}_tgt_field_i'] = f'{i_caps}-Axis Target Current'
|
||||
header[f'{i}_inv'] = f'{i_caps}-Axis Inverted'
|
||||
header[f'{i}_mag_field'] = f'Magnetometer Reading {i_caps}'
|
||||
|
||||
return header
|
||||
|
||||
|
||||
def log_datapoint(key_list): # logs a single row of data into the log_data DataFrame
|
||||
def log_datapoint(settings):
|
||||
"""Logs a single row of data into the log_data list"""
|
||||
# key_list determines what data is logged
|
||||
global log_data # get global dataframe with logged data
|
||||
global unsaved_data # get global variable that indicates if there is unsaved data
|
||||
|
||||
date = datetime.now().date() # get current date
|
||||
time = datetime.now().strftime("%H:%M:%S,%f") # get string with current time in correct format
|
||||
t = (datetime.now() - zero_time).total_seconds() # calculate timestamp relative to the start of the logging
|
||||
# --- PRODUCE NEW DATA ROW ---
|
||||
# d for 'data'
|
||||
d = {}
|
||||
|
||||
data = [[date, time, t]] # initialize new data row with timestamps
|
||||
for key in key_list: # go through the list telling us what data to log
|
||||
for axis in g.AXES: # log this data for each axis
|
||||
# get relevant value from the correct AXIS object and append to new data row:
|
||||
data[0].append(getattr(axis, axis_data_dict[key]))
|
||||
# Timestamp
|
||||
if settings['timestamp']:
|
||||
d['date'] = datetime.now().date() # get current date
|
||||
d['time'] = datetime.now().strftime("%H:%M:%S.%f") # get string with current time in correct format
|
||||
d['t'] = (datetime.now() - zero_time).total_seconds() # calculate timestamp relative to the start of the logging
|
||||
|
||||
column_names = ["Date", "Time", "t (s)", *triple_list(key_list)] # create list with the correct column headers
|
||||
new_row = pd.DataFrame(data, columns=column_names) # create data frame containing the new row
|
||||
log_data = log_data.append(new_row, ignore_index=True) # append the new data frame to the logged data
|
||||
# PSU + Arduino status
|
||||
if settings['device_stat']:
|
||||
d['arduino_stat'] = g.CAGE_DEVICE.arduino is not None
|
||||
d['xy_psu_stat'] = g.CAGE_DEVICE.psu1 is not None
|
||||
d['z_psu_stat'] = g.CAGE_DEVICE.psu2 is not None
|
||||
|
||||
# Axis data
|
||||
axes = []
|
||||
if settings['x_axis']:
|
||||
axes.append(0)
|
||||
if settings['y_axis']:
|
||||
axes.append(1)
|
||||
if settings['z_axis']:
|
||||
axes.append(2)
|
||||
for i in axes:
|
||||
pre = ['x_', 'y_', 'z_'][i]
|
||||
axis = g.CAGE_DEVICE.axes[i]
|
||||
|
||||
# Cached PSU state data
|
||||
if axis.psu:
|
||||
psu_state = axis.psu.cached_channel_state(axis.channel)
|
||||
else:
|
||||
psu_state = {'limit_voltage': None,
|
||||
'actual_voltage': None,
|
||||
'limit_current': None,
|
||||
'actual_current': None}
|
||||
|
||||
if settings['v']:
|
||||
d[pre + 'v'] = psu_state['limit_voltage']
|
||||
if settings['v_actual']:
|
||||
d[pre + 'v_actual'] = psu_state['actual_voltage']
|
||||
if settings['i']:
|
||||
d[pre + 'i'] = psu_state['limit_current']
|
||||
if settings['i_actual']:
|
||||
d[pre + 'i_actual'] = psu_state['actual_current']
|
||||
# All the rest
|
||||
if settings['tgt_field']:
|
||||
d[pre + 'tgt_field'] = axis.target_field
|
||||
if settings['tgt_field_raw']:
|
||||
d[pre + 'tgt_field_raw'] = axis.target_field_raw
|
||||
if settings['tgt_field_i']:
|
||||
d[pre + 'tgt_field_i'] = axis.target_current
|
||||
if settings['inv']:
|
||||
d[pre + 'inv'] = axis.polarity
|
||||
|
||||
# Mangetometer field
|
||||
if settings['mag_field']:
|
||||
for i in [0, 1, 2]:
|
||||
pre = ['x_', 'y_', 'z_'][i]
|
||||
d[pre + 'mag_field'] = g.MAGNETOMETER.field[i]
|
||||
|
||||
# Magnetometer status
|
||||
if settings['mag_stat']:
|
||||
d['mag_stat'] = g.MAGNETOMETER.connected
|
||||
|
||||
# Save the datapoint
|
||||
log_data.append(d) # append the new data to the current data
|
||||
unsaved_data = True # tell other program parts that there is now unsaved data
|
||||
|
||||
|
||||
@@ -77,14 +157,22 @@ def select_file(): # select a file to write logs to
|
||||
return filepath
|
||||
|
||||
|
||||
def write_to_file(dataframe, filepath):
|
||||
def write_to_file(filepath):
|
||||
# get global variables for use in this function:
|
||||
global unsaved_data
|
||||
global log_data
|
||||
if filepath is not None: # user has selected a file and no errors occurred
|
||||
ui_print("Writing logged data to file", filepath)
|
||||
try:
|
||||
# write data collected in log_data DataFrame to csv file in german excel format:
|
||||
dataframe.to_csv(filepath, index=False, sep=';', decimal=',')
|
||||
# write data collected in log_data list to csv file in excel format:
|
||||
with open(filepath, 'w', newline='') as f:
|
||||
writer = csv.writer(f)
|
||||
headers = log_data[0].keys()
|
||||
long_headers = [get_long_column_header()[header] for header in headers]
|
||||
writer.writerow(long_headers)
|
||||
writer.writerow(headers)
|
||||
for row in log_data:
|
||||
writer.writerow(row.values())
|
||||
except PermissionError:
|
||||
message = "No permission to write to: \n%s. \nFile may be open in another program." % filepath
|
||||
messagebox.showerror("Permission Error", message)
|
||||
@@ -98,4 +186,4 @@ def write_to_file(dataframe, filepath):
|
||||
|
||||
def clear_logged_data(): # clears all logged data from data frame
|
||||
global log_data # get global variable
|
||||
log_data = pd.DataFrame() # reset to an empty data frame, i.e. clearing all logged data
|
||||
log_data = [] # reset to an empty list, i.e. clearing all logged data
|
||||
|
||||
@@ -401,6 +401,7 @@ class Axis:
|
||||
|
||||
# State variables
|
||||
self.target_current = 0
|
||||
self.polarity = False
|
||||
|
||||
def set_field_raw(self, field):
|
||||
self.set_signed_current(field / self.coil_const)
|
||||
@@ -434,9 +435,11 @@ class Axis:
|
||||
# Set polarity on Arduino
|
||||
if safe_current < 0:
|
||||
# Reverse polarity
|
||||
self.polarity = True # Track the state
|
||||
self.arduino.set_axis_polarity(self.idx, True)
|
||||
else:
|
||||
# Positive polarity (default case)
|
||||
self.polarity = False # Track the state
|
||||
self.arduino.set_axis_polarity(self.idx, False)
|
||||
|
||||
# determine voltage limit to be set on PSU, must be high enough to not limit the current:
|
||||
@@ -478,10 +481,6 @@ class Axis:
|
||||
def connected(self):
|
||||
return self.psu is not None and self.arduino is not None
|
||||
|
||||
@property
|
||||
def polarity(self):
|
||||
return self.arduino.get_axis_polarity(self.idx)
|
||||
|
||||
@property
|
||||
def max_field(self):
|
||||
max_field_magnitude = self.max_amps * self.coil_const
|
||||
@@ -492,6 +491,14 @@ class Axis:
|
||||
max_field_magnitude = self.max_amps * self.coil_const
|
||||
return np.array([self.ambient_field - max_field_magnitude, self.ambient_field + max_field_magnitude])
|
||||
|
||||
@property
|
||||
def arduino_connected(self):
|
||||
return self.arduino is not None
|
||||
|
||||
@property
|
||||
def psu_connected(self):
|
||||
return self.psu is not None
|
||||
|
||||
def get_status_dict(self):
|
||||
"""Dict containing all data from this model to pass to the front-end. Some data is only available through
|
||||
this interface, since it also polls the hardware for current set-points"""
|
||||
|
||||
+25
-6
@@ -15,6 +15,7 @@ class PSUDevice(ABC):
|
||||
"""PSUDevice assumes a serial connection"""
|
||||
ui_print("\nConnecting to power supply...")
|
||||
self.com_port = com_port
|
||||
self.cached_state = dict.fromkeys(self.valid_channels()) # Used for components that require state on demand
|
||||
|
||||
@abstractmethod
|
||||
def enable_channel(self, channel_nr):
|
||||
@@ -53,10 +54,24 @@ class PSUDevice(ABC):
|
||||
def poll_channel_state(self, channel_nr):
|
||||
"""Return a dictionary with the entries below. WARNING: this call is blocking and potentially slow!
|
||||
Can throw exceptions"""
|
||||
# Should also set self.cached_state
|
||||
# return {'active': False, 'remote_active': False,
|
||||
# 'actual_voltage':0, 'limit_voltage':0, 'actual_current':0, 'limit_current':0}
|
||||
pass
|
||||
|
||||
def cached_channel_state(self, channel_nr):
|
||||
"""Return a dictionary with the entries below. Uses the values obtained during last poll.
|
||||
May contain None-entries"""
|
||||
|
||||
if self.cached_state[channel_nr]:
|
||||
return self.cached_state[channel_nr]
|
||||
else:
|
||||
return {'active': None, 'remote_active': None,
|
||||
'actual_voltage': None,
|
||||
'limit_voltage': None,
|
||||
'actual_current': None,
|
||||
'limit_current': None}
|
||||
|
||||
def idle(self):
|
||||
"""Zero all outputs but activate channels so commands can be sent."""
|
||||
for ch in self.valid_channels():
|
||||
@@ -130,9 +145,11 @@ class PSUDevicePS2000B(PSUDevice):
|
||||
time.sleep(self.MIN_DELAY)
|
||||
|
||||
# Format should match the provided template in abstract PSUDevice class.
|
||||
return {'active': dev_status.output_active, 'remote_active': dev_status.remote_control_active,
|
||||
'voltage': voltage, 'voltage_setpoint': voltage_setp,
|
||||
'current': current, 'current_setpoint': current_setp}
|
||||
self.cached_state[channel_nr] = {'active': dev_status.output_active,
|
||||
'remote_active': dev_status.remote_control_active,
|
||||
'voltage': voltage, 'voltage_setpoint': voltage_setp,
|
||||
'current': current, 'current_setpoint': current_setp}
|
||||
return self.cached_state[channel_nr]
|
||||
|
||||
def shutdown(self):
|
||||
for ch in self.valid_channels():
|
||||
@@ -161,6 +178,7 @@ class PSUDeviceQL355TP(PSUDevice):
|
||||
self.set_output_range(0, 0) # Put the PSU into the 15V/5A range
|
||||
self.set_output_range(1, 0)
|
||||
self.reset_breaker() # Reset the breaker in case we are coming from an unclean state
|
||||
self.cached_state = dict.fromkeys(self.valid_channels()) # Used for components that require state on demand
|
||||
|
||||
@staticmethod
|
||||
def valid_channels():
|
||||
@@ -216,9 +234,10 @@ class PSUDeviceQL355TP(PSUDevice):
|
||||
# Format should match the provided template in abstract PSUDevice class.
|
||||
# The remote_active property is assumed to always be True since it cant be read
|
||||
# (it should be since we are talking to it)
|
||||
return {'active': output_active, 'remote_active': True,
|
||||
'voltage': voltage, 'voltage_setpoint': voltage_setp,
|
||||
'current': current, 'current_setpoint': current_setp}
|
||||
self.cached_state[channel_nr] = {'active': output_active, 'remote_active': True,
|
||||
'voltage': voltage, 'voltage_setpoint': voltage_setp,
|
||||
'current': current, 'current_setpoint': current_setp}
|
||||
return self.cached_state[channel_nr]
|
||||
|
||||
def set_output_range(self, channel_nr, value_range):
|
||||
"""The QL355TP supports various output ranges. We require the 15V/5A mode to achieve the greatest range."""
|
||||
|
||||
+13
-10
@@ -1314,7 +1314,7 @@ class HardwareConfiguration(Frame):
|
||||
# open file selection dialogue at current path and save path of selected file
|
||||
filename = filedialog.askopenfilename(initialdir=directory, title="Select Config File",
|
||||
filetypes=(("Config File", "*.ini*"), ("All Files", "*.*")))
|
||||
if exists(filename): # does the file exist?
|
||||
if os.path.exists(filename): # does the file exist?
|
||||
config.CONFIG_FILE = filename # set global config file to the new file
|
||||
config.CONFIG_OBJECT = config.get_config_from_file(filename) # load from config file to config object
|
||||
config.check_config(config.CONFIG_OBJECT) # check and display warnings if values are out of bounds
|
||||
@@ -1467,16 +1467,20 @@ class ConfigureLogging(Frame):
|
||||
|
||||
# add general description headline:
|
||||
checkbox_label = Label(self.checkbox_frame, text="Select which data to log:")
|
||||
checkbox_label.grid(row=0, column=0, columnspan=2)
|
||||
checkbox_label.grid(row=0, column=0, columnspan=2, sticky='w')
|
||||
# generate and place all checkboxes:
|
||||
row = 1
|
||||
for key in log.axis_data_dict.keys(): # go through all loggable values
|
||||
row = 0
|
||||
column = 0
|
||||
for key, name in log.logging_selection_options.items(): # go through all loggable values
|
||||
self.checkbox_vars[key] = BooleanVar(value=True) # create variable for checkbox and put it in dictionary
|
||||
checkbox = Checkbutton(self.checkbox_frame, text=key, # generate checkbox
|
||||
checkbox = Checkbutton(self.checkbox_frame, text=name, # generate checkbox
|
||||
variable=self.checkbox_vars[key], onvalue=True, offvalue=False)
|
||||
checkbox.grid(row=row, column=0, sticky=W) # place checkbox in UI
|
||||
checkbox.grid(row=row+1, column=column, padx=(0, 20), sticky=W) # place checkbox in UI
|
||||
self.checkboxes.append(checkbox) # add created checkbox to list of all checkboxes
|
||||
row += 1
|
||||
if row > 8:
|
||||
row = 0
|
||||
column += 1
|
||||
|
||||
# self.controller.bind('<Escape>', self.escape_press) ToDo: implement escape button press event to power down
|
||||
|
||||
@@ -1536,7 +1540,7 @@ class ConfigureLogging(Frame):
|
||||
if try_again == 'yes': # user wants to try again
|
||||
self.write_to_file() # call same function again so user can retry
|
||||
else: # a valid filename was selected
|
||||
log.write_to_file(log.log_data, filepath) # write logged data to the file
|
||||
log.write_to_file(filepath) # write logged data to the file
|
||||
|
||||
def clear_data(self): # called on button press, asks user if he want to save logged data and then deletes it
|
||||
if log.unsaved_data: # there is logged data that has not been written to a file yet
|
||||
@@ -1560,10 +1564,9 @@ class ConfigureLogging(Frame):
|
||||
def update_choices(self): # updates the list storing which checkboxes are currently ticked
|
||||
# (this is passed to logging functions and determines which data is logged)
|
||||
|
||||
self.active_keys = [] # initialize the list
|
||||
self.active_keys = {} # initialize the dict
|
||||
for key in self.checkbox_vars.keys(): # go through all checkboxes
|
||||
if self.checkbox_vars[key].get(): # box is ticked
|
||||
self.active_keys.append(key) # add corresponding item to the list
|
||||
self.active_keys[key] = self.checkbox_vars[key].get() # add corresponding item to the list
|
||||
|
||||
def lock_checkboxes(self): # lock all checkboxes, so they can not be modified while logging
|
||||
# lock all checkboxes:
|
||||
|
||||
Reference in New Issue
Block a user