diff --git a/pytmtc/main.py b/pytmtc/main.py index 9fe494c..deb1fbe 100755 --- a/pytmtc/main.py +++ b/pytmtc/main.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Example client for the sat-rs example application""" +from __future__ import annotations import logging import sys import time @@ -17,7 +18,7 @@ from spacepackets.ccsds.time import CdsShortTimestamp from tmtccmd import TcHandlerBase, ProcedureParamsWrapper from tmtccmd.core.base import BackendRequest from tmtccmd.pus import VerificationWrapper -from tmtccmd.tmtc import CcsdsTmHandler, GenericApidHandlerBase +from tmtccmd.tmtc import CcsdsTmHandler, GenericApidHandlerBase, TelemetryListT from tmtccmd.com import ComInterface from tmtccmd.config import ( CmdTreeNode, @@ -44,6 +45,8 @@ from tmtccmd.tmtc import ( ) from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.util.obj_id import ObjectIdDictT +from collections import deque +import socket import pus_tc diff --git a/pytmtc/tcp_server.py b/pytmtc/tcp_server.py new file mode 100644 index 0000000..14cd551 --- /dev/null +++ b/pytmtc/tcp_server.py @@ -0,0 +1,137 @@ +from typing import Any, Optional +import socket +from threading import Thread, Event, Lock +from collections import deque + +from tmtccmd.com import ComInterface +from tmtccmd.tmtc import TelemetryListT + + +class TcpServer(ComInterface): + def __init__(self, port: int): + self.port = port + self._max_num_packets_in_tc_queue = 500 + self._max_num_packets_in_tm_queue = 500 + self._server_addr = ("localhost", self.port) + self._tc_packet_queue = deque() + self._tm_packet_queue = deque() + self._tc_lock = Lock() + self._tm_lock = Lock() + self._kill_signal = Event() + self._server_socket: Optional[socket.socket] = None + self._server_thread = Thread(target=TcpServer._server_task, daemon=True) + self._connected = False + + @property + def connected(self) -> bool: + return self._connected + + @property + def id(self) -> str: + return "tcp_server" + + def initialize(self, args: Any = 0) -> Any: + """Perform initializations step which can not be done in constructor or which require + returnvalues. + """ + pass + + def open(self, args: Any = 0): + """Opens the communication interface to allow communication. + + :return: + """ + if self.connected: + return + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # We need to check the kill signal periodically to allow closing the server. + self.server_socket.settimeout(0.5) + self.server_socket.bind(self._server_addr) + self._connected = True + self._server_thread.start() + + def _server_task(self): + assert self._server_socket is not None + while True and not self._kill_signal.is_set(): + self._server_socket.listen() + (conn_socket, conn_addr) = self._server_socket.accept() + print("TCP client {} connected", conn_addr) + while True: + bytes_recvd = conn_socket.recv(4096) + if len(bytes_recvd) > 0: + print(f"Received bytes from TCP client: {bytes_recvd.decode()}") + with self._tm_lock: + self._tm_packet_queue.append(bytes_recvd) + elif len(bytes_recvd) == 0: + break + else: + print("error receiving data from TCP client") + + queue_len = 0 + with self._tc_lock: + queue_len = len(self._tc_packet_queue) + while queue_len > 0: + next_packet = bytes() + with self._tc_lock: + next_packet = self._tc_packet_queue.popleft() + conn_socket.sendall(next_packet) + queue_len -= 1 + + def is_open(self) -> bool: + """Can be used to check whether the communication interface is open. This is useful if + opening a COM interface takes a longer time and is non-blocking + """ + return self.connected + + def close(self, args: Any = 0): + """Closes the ComIF and releases any held resources (for example a Communication Port). + + :return: + """ + self._kill_signal.set() + self._server_thread.join() + self._connected = False + + def send(self, data: bytes): + """Send raw data. + + :raises SendError: Sending failed for some reason. + """ + with self._tc_lock: + # Deque is thread-safe according to the documentation.. so this should be fine. + if len(self._tc_packet_queue) >= self._max_num_packets_in_tc_queue: + # Remove oldest packet + self._tc_packet_queue.popleft() + self._tc_packet_queue.append(data) + + def receive(self, parameters: Any = 0) -> TelemetryListT: + """Returns a list of received packets. The child class can use a separate thread to poll for + the packets or use some other mechanism and container like a deque to store packets + to be returned here. + + :param parameters: + :raises ReceptionDecodeError: If the underlying COM interface uses encoding and + decoding and the decoding fails, this exception will be returned. + :return: + """ + + with self._tm_lock: + packet_list = [] + while self._tm_packet_queue: + packet_list.append(self._tm_packet_queue.popleft()) + return packet_list + + def data_available(self, timeout: float, parameters: Any = 0) -> int: + """Check whether TM packets are available. + + :param timeout: Can be used to block on available data if supported by the specific + communication interface. + :param parameters: Can be an arbitrary parameter. + :raises ReceptionDecodeError: If the underlying COM interface uses encoding and + decoding when determining the number of available packets, this exception can be + thrown on decoding errors. + :return: 0 if no data is available, number of packets otherwise. + """ + with self._tm_lock: + return len(self._tm_packet_queue)