from threading import Thread import socket import numpy as np import src.globals as g from src.utility import ui_print import src.helmholtz_cage_device as helmholtz_cage_device # --- Definition of TCP interface --- # # Clients should by default initialize a TCP connection to port 6677 # The commands shown must be terminated with a single \n (newline) char # Commands may be split across multiple packets. # Before useful commands can be sent, declare_api_version must be called. # # A description of the TCP api (safety limits are always enforced): # # set_raw_field [X comp.] [Y comp.] [Z comp.] # Returns: 0 or 1 for success # Accepts decimal point formatted floats, with or without scientific notation. The float() cast must understand it. # The field units are Tesla # This causes an additional field of the given strength to be generated, without regard for the pre-existing # geomagnetic/external fields. # # set_compensated_field [X comp.] [Y comp.] [Z comp.] # Returns: 0 or 1 for success # Accepts decimal point formatted floats, with or without scientific notation. The float() cast must understand it. # The field units are Tesla # This causes a field of exactly the given magnitude to be generated by compensating external factors such as the # geomagnetic field. # # set_coil_currents [X comp.] [Y comp.] [Z comp.] # Returns: 0 or 1 for success # Accepts decimal point formatted floats, with or without scientific notation. The float() cast must understand it. # The field units are Ampere # This establishes the requested current in the individual coils. # # get_api_version # Returns: a string uniquely identifying each API version. # This function can be called before declare_api_version. # Please dont put # # magnetometer_field [X comp.] [Y comp.] [Z comp.] # Returns: 1 # Accepts decimal point formatted floats, with or without scientific notation. The float() cast must understand it. # The field units are Tesla # Sets the state of an a virtual magnetometer object which mirrors a physical sensor providing data by means of # this command. # # declare_api_version [version] # Returns: 0 or 1 (terminated with newline) # Declare the api version the client application was programmed for. It must be compatible with the current # API version. This prevents unexpected behaviour by forcing programmers to specify which API they are expecting. # This function must be called before sending HW commands. SOCKET_INTERFACE_API_VERSION = "2" class ClientConnectionThread(Thread): def __init__(self, client_socket, address): Thread.__init__(self) self.client_socket = client_socket self.client_address = address # Indicates whether this thread was providing magnetometer data. If yes, set the magnetometer proxy object as # disconnected when the socket is closed. self.magnetometer_connection = False # Holds proxy model to cage device if required. Is initialized lazily to prevent always blocking interface self._cage_dev = None self.api_compat = False # Indicates whether the client has a compatible API version def run(self): msg = '' while True: try: raw_msg = self.client_socket.recv(2048).decode() # Check for end of stream if raw_msg == "": self.client_socket.close() if self._cage_dev: self._cage_dev.close() g.MAGNETOMETER.connected = False return # Process message for char in raw_msg: if char == '\n': msg = msg.rstrip() # Some systems will try to send \r characters... looking at you windows O_O try: response = self.handle_msg(msg) except Exception as e: ui_print("An error occurred while processing a client message") ui_print("Msg: {}".format(msg)) ui_print(e) response = "err" self.client_socket.sendall((response + '\n').encode('utf-8')) msg = '' else: msg += char except ConnectionResetError as e: ui_print("A connection was closed by the client.") self.client_socket.close() if self._cage_dev: self._cage_dev.close() g.MAGNETOMETER.connected = False return def handle_msg(self, message): """ Executes command logic and returns string response (for client). """ tokens = message.split(" ") if tokens[0] == "get_api_version": return SOCKET_INTERFACE_API_VERSION elif tokens[0] == "declare_api_version": if tokens[1] == SOCKET_INTERFACE_API_VERSION: self.api_compat = True return "1" else: ui_print("Declared socket API version ({}) is incompatible with current version ({})!".format(tokens[1], SOCKET_INTERFACE_API_VERSION)) return "0" else: # api_compat indicates we have checked the api version and are ready to accept commands if self.api_compat: if tokens[0] == "set_raw_field": x = float(tokens[1]) y = float(tokens[2]) z = float(tokens[3]) field_vec = np.array([x, y, z], dtype=np.float32) # uncompensated self.cage_dev.set_field_raw(field_vec) return "1" elif tokens[0] == "set_compensated_field": x = float(tokens[1]) y = float(tokens[2]) z = float(tokens[3]) field_vec = np.array([x, y, z], dtype=np.float32) # compensated self.cage_dev.set_field_compensated(field_vec) return "1" elif tokens[0] == "set_coil_currents": x = float(tokens[1]) y = float(tokens[2]) z = float(tokens[3]) current_vec = np.array([x, y, z], dtype=np.float32) self.cage_dev.set_signed_currents(current_vec) return "1" elif tokens[0] == "magnetometer_field": """The client is sending us information about the magnetometer state. This is used for some calibration procedures for example.""" x = float(tokens[1]) y = float(tokens[2]) z = float(tokens[3]) field = np.array([x, y, z], dtype=np.float32) g.MAGNETOMETER.field = field self.magnetometer_connection = True return "1" else: # The message given is unknown. The programmer probably did not intend for this, so display an error # even if is not inherently problematic. raise Exception("The command '{}' is unknown".format(tokens[0])) else: raise Exception("The command '{}' may not be called before 'declare_api_version'".format(tokens[0])) @property def cage_dev(self): if self._cage_dev is None: try: self._cage_dev = g.CAGE_DEVICE.request_proxy() except helmholtz_cage_device.DeviceBusy: # Return none. This will cause an error and show up as "err" on the client # A more helpful error message is shown on application side ui_print("Socket client attempted to acquire busy device.") return self._cage_dev class SocketInterfaceThread(Thread): def __init__(self): Thread.__init__(self, daemon=True) self.server_socket = None # Can throw exception, which should be passed on to the instantiator of this class self.configure_tcp_port() def run(self): while True: (client_socket, address) = self.server_socket.accept() new_thread = ClientConnectionThread(client_socket, address) new_thread.start() ui_print("Accepted connection from {}".format(address)) def configure_tcp_port(self): # Creates and configures the listening port self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind(('', g.SOCKET_PORT)) self.server_socket.listen(5) # Limit to max. 5 simultaneous connections ui_print("Listening for TCP connections on port {}".format(g.SOCKET_PORT))