from typing import Any, Optional import socket import logging from threading import Thread, Event, Lock from collections import deque from tmtccmd.com import ComInterface from tmtccmd.tmtc import TelemetryListT _LOGGER = logging.getLogger(__name__) class TcpServer(ComInterface): def __init__(self, port: int): self.port = port self._max_num_packets_in_tc_queue = 500 self._max_num_packets_in_tm_queue = 500 self._server_addr = ("localhost", self.port) self._tc_packet_queue = deque() self._tm_packet_queue = deque() self._tc_lock = Lock() self._tm_lock = Lock() self._kill_signal = Event() self._server_socket: Optional[socket.socket] = None self._server_thread = Thread(target=TcpServer._server_task, daemon=True) self._connected = False @property def connected(self) -> bool: return self._connected @property def id(self) -> str: return "tcp_server" def initialize(self, args: Any = 0) -> Any: """Perform initializations step which can not be done in constructor or which require returnvalues. """ pass def open(self, args: Any = 0): """Opens the communication interface to allow communication. :return: """ if self.connected: return self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # We need to check the kill signal periodically to allow closing the server. self.server_socket.settimeout(0.5) self.server_socket.bind(self._server_addr) self._connected = True self._server_thread.start() def _server_task(self): assert self._server_socket is not None while True and not self._kill_signal.is_set(): self._server_socket.listen() (conn_socket, conn_addr) = self._server_socket.accept() _LOGGER.info("TCP client {} connected", conn_addr) while True: bytes_recvd = conn_socket.recv(4096) if len(bytes_recvd) > 0: print(f"Received bytes from TCP client: {bytes_recvd.decode()}") with self._tm_lock: self._tm_packet_queue.append(bytes_recvd) elif len(bytes_recvd) == 0: break else: print("error receiving data from TCP client") queue_len = 0 with self._tc_lock: queue_len = len(self._tc_packet_queue) while queue_len > 0: next_packet = bytes() with self._tc_lock: next_packet = self._tc_packet_queue.popleft() conn_socket.sendall(next_packet) queue_len -= 1 def is_open(self) -> bool: """Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking """ return self.connected def close(self, args: Any = 0): """Closes the ComIF and releases any held resources (for example a Communication Port). :return: """ self._kill_signal.set() self._server_thread.join() self._connected = False def send(self, data: bytes): """Send raw data. :raises SendError: Sending failed for some reason. """ with self._tc_lock: # Deque is thread-safe according to the documentation.. so this should be fine. if len(self._tc_packet_queue) >= self._max_num_packets_in_tc_queue: # Remove oldest packet self._tc_packet_queue.popleft() self._tc_packet_queue.append(data) def receive(self, parameters: Any = 0) -> TelemetryListT: """Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here. :param parameters: :raises ReceptionDecodeError: If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned. :return: """ with self._tm_lock: packet_list = [] while self._tm_packet_queue: packet_list.append(self._tm_packet_queue.popleft()) return packet_list def data_available(self, timeout: float, parameters: Any = 0) -> int: """Check whether TM packets are available. :param timeout: Can be used to block on available data if supported by the specific communication interface. :param parameters: Can be an arbitrary parameter. :raises ReceptionDecodeError: If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors. :return: 0 if no data is available, number of packets otherwise. """ with self._tm_lock: return len(self._tm_packet_queue)