#!/usr/bin/env python3 import socket import abc import time import select import logging from typing import Any from threading import Event, Thread from collections import deque from multiprocessing import Queue from spacepackets.ccsds.spacepacket import parse_space_packets, PacketId from spacepackets.ecss.tc import PacketType EXP_ID = 278 EXP_APID = 1024 + EXP_ID EXP_PACKET_ID_TM = PacketId(PacketType.TM, True, EXP_APID) EXP_PACKET_ID_TC = PacketId(PacketType.TC, True, EXP_APID) OPSSAT_SERVER_PORT = 4096 TMTC_SERVER_PORT = 4097 LOG_LEVEL = logging.DEBUG TC_QUEUE = Queue() TM_QUEUE = Queue() KILL_SIGNAL = Event() _LOGGER = logging.getLogger(__name__) def main(): logging.basicConfig( format="[%(asctime)s] [%(levelname)-5s] %(message)s", level=LOG_LEVEL, datefmt="%Y-%m-%d %H:%M:%S", ) print("Starting OPS-SAT ground TMTC server") KILL_SIGNAL.clear() ops_sat_thread = OpsSatServer() ops_sat_thread.start() tmtc_thread = TmtcServer() tmtc_thread.start() try: while True: time.sleep(1) except KeyboardInterrupt: _LOGGER.info("Shutting down server gracefully") KILL_SIGNAL.set() ops_sat_thread.join() tmtc_thread.join() class BaseServer(Thread): def __init__(self, log_prefix: str, port: int): self.log_prefix = log_prefix self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_addr = ("0.0.0.0", port) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.setblocking(False) self.server_socket.settimeout(0.4) self.server_socket.bind(server_addr) super().__init__() def run(self) -> None: self.run_sync_version() def run_sync_version(self) -> None: self.server_socket.listen() while True and not KILL_SIGNAL.is_set(): try: (conn_socket, conn_addr) = self.server_socket.accept() self.handle_connection(conn_socket, conn_addr) except TimeoutError: continue def handle_connection(self, conn_socket: socket.socket, conn_addr: Any): conn_socket.setblocking(False) print(f"{self.log_prefix} TCP client {conn_addr} connected") analysis_deque = deque() while True and not KILL_SIGNAL.is_set(): conn_socket.settimeout(0.2) try: bytes_recvd = conn_socket.recv(4096) if len(bytes_recvd) > 0: _LOGGER.debug(f"{self.log_prefix} RX RAW: {bytes_recvd}") analysis_deque.append(bytes_recvd) elif len(bytes_recvd) == 0: self.handle_read_bytestream(analysis_deque) break else: print("error receiving data from TCP client") except BlockingIOError: self.handle_timeout(conn_socket, analysis_deque) time.sleep(0.2) except TimeoutError: self.handle_timeout(conn_socket, analysis_deque) def handle_timeout(self, conn_socket: socket.socket, analysis_deque: deque): if len(analysis_deque) > 0: self.handle_read_bytestream(analysis_deque) self.send_data_to_client(conn_socket) def run_select_version(self) -> None: while True: self.server_socket.listen() (conn_socket, conn_addr) = self.server_socket.accept() print(f"{self.log_prefix} TCP client {conn_addr} connected") analysis_deque = deque() while True: outputs = [] if self.send_data_available(): outputs.append(conn_socket) (readable, writable, _) = select.select([conn_socket], outputs, [], 0.2) if readable and readable[0]: bytes_recvd = conn_socket.recv(4096) if len(bytes_recvd) > 0: _LOGGER.debug("received data from TCP client: {}", bytes_recvd) analysis_deque.append(bytes_recvd) elif len(bytes_recvd) == 0: self.handle_read_bytestream(analysis_deque) break else: print("error receiving data from TCP client") if writable and writable[0]: self.send_data_to_client(conn_socket) if not writable and not readable: if len(analysis_deque) > 0: self.handle_read_bytestream(analysis_deque) self.send_data_to_client(conn_socket) @abc.abstractmethod def handle_read_bytestream(self, analysis_deque: deque): pass @abc.abstractmethod def send_data_to_client(self, conn_socket: socket.socket): pass @abc.abstractmethod def send_data_available(self) -> bool: pass class OpsSatServer(BaseServer): def __init__(self): super().__init__("[OPS-SAT]", OPSSAT_SERVER_PORT) def handle_read_bytestream(self, analysis_deque: deque): parsed_packets = parse_space_packets(analysis_deque, [EXP_PACKET_ID_TM]) for packet in parsed_packets: _LOGGER.info(f"{self.log_prefix} RX TM: [{packet.hex(sep=',')}]") TM_QUEUE.put(packet) def send_data_to_client(self, conn_socket: socket.socket): while not TC_QUEUE.empty(): next_packet = TC_QUEUE.get() _LOGGER.info(f"{self.log_prefix} TX TC [{next_packet.hex(sep=',')}]") conn_socket.sendall(next_packet) def send_data_available(self) -> bool: return not TC_QUEUE.empty() class TmtcServer(BaseServer): def __init__(self): super().__init__("[TMTC]", TMTC_SERVER_PORT) def handle_read_bytestream(self, analysis_deque: deque): parsed_packets = parse_space_packets(analysis_deque, [EXP_PACKET_ID_TC]) for packet in parsed_packets: _LOGGER.info(f"{self.log_prefix} RX TM: [{packet.hex(sep=',')}]") TC_QUEUE.put(packet) def send_data_to_client(self, conn_socket: socket.socket): while not TM_QUEUE.empty(): next_packet = TM_QUEUE.get() _LOGGER.info(f"{self.log_prefix} TX TM [{next_packet.hex(sep=',')}]") conn_socket.sendall(next_packet) def send_data_available(self) -> bool: return not TM_QUEUE.empty() if __name__ == "__main__": main()