from typing import Any, Optional import select import time import socket import logging from threading import Thread, Event, Lock from collections import deque from tmtccmd.com import ComInterface from tmtccmd.tmtc import TelemetryListT _LOGGER = logging.getLogger(__name__) class TcpServer(ComInterface): def __init__(self, port: int): self.port = port self._max_num_packets_in_tc_queue = 500 self._max_num_packets_in_tm_queue = 500 self._default_timeout_secs = 0.5 self._server_addr = ("localhost", self.port) self._tc_packet_queue = deque() self._tm_packet_queue = deque() self._tc_lock = Lock() self._tm_lock = Lock() self._kill_signal = Event() self._server_socket: Optional[socket.socket] = None self._server_thread = Thread(target=self._server_task, daemon=True) self._connected = False # self._conn_start = None # self._writing_done = False # self._reading_done = False @property def connected(self) -> bool: return self._connected @property def id(self) -> str: return "tcp_server" def initialize(self, args: Any = 0) -> Any: """Perform initializations step which can not be done in constructor or which require returnvalues. """ pass def open(self, args: Any = 0): """Opens the communication interface to allow communication. :return: """ if self.connected: return self._connected = True self._server_thread.start() def _server_task(self): self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # We need to check the kill signal periodically to allow closing the server. self._server_socket.settimeout(self._default_timeout_secs) self._server_socket.bind(self._server_addr) self._server_socket.listen() while True and not self._kill_signal.is_set(): try: (conn_socket, conn_addr) = self._server_socket.accept() self._handle_connection(conn_socket, conn_addr) # conn_socket.close() """ if ( self._reading_done and self._writing_done ) or time.time() - self.conn_start > 0.5: print("reading and writing done") break """ except TimeoutError: continue def _handle_connection(self, conn_socket: socket.socket, conn_addr: Any): _LOGGER.info(f"TCP client {conn_addr} connected") queue_len = 0 while True: with self._tc_lock: queue_len = len(self._tc_packet_queue) outputs = [] if queue_len > 0: outputs.append(conn_socket) (readable, writable, _) = select.select( [conn_socket], outputs, [], 0.2, ) if writable and writable[0]: print("writeable") while queue_len > 0: next_packet = bytes() with self._tc_lock: next_packet = self._tc_packet_queue.popleft() if len(next_packet) > 0: conn_socket.sendall(next_packet) queue_len -= 1 if readable and readable[0]: print("readable") while True: bytes_recvd = conn_socket.recv(4096) if len(bytes_recvd) > 0: print(f"Received bytes from TCP client: {bytes_recvd.decode()}") with self._tm_lock: self._tm_packet_queue.append(bytes_recvd) elif len(bytes_recvd) == 0: break else: print("error receiving data from TCP client") def is_open(self) -> bool: """Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking """ return self.connected def close(self, args: Any = 0): """Closes the ComIF and releases any held resources (for example a Communication Port). :return: """ self._kill_signal.set() self._server_thread.join() self._connected = False def send(self, data: bytes): """Send raw data. :raises SendError: Sending failed for some reason. """ with self._tc_lock: if len(self._tc_packet_queue) >= self._max_num_packets_in_tc_queue: # Remove oldest packet self._tc_packet_queue.popleft() self._tc_packet_queue.append(data) def receive(self, parameters: Any = 0) -> TelemetryListT: """Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here. :param parameters: :raises ReceptionDecodeError: If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned. :return: """ with self._tm_lock: packet_list = [] while self._tm_packet_queue: packet_list.append(self._tm_packet_queue.popleft()) return packet_list def data_available(self, timeout: float, parameters: Any = 0) -> int: """Check whether TM packets are available. :param timeout: Can be used to block on available data if supported by the specific communication interface. :param parameters: Can be an arbitrary parameter. :raises ReceptionDecodeError: If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors. :return: 0 if no data is available, number of packets otherwise. """ with self._tm_lock: return len(self._tm_packet_queue)