#!/usr/bin/env python3 import socket import abc import time import select import logging from threading import Thread from collections import deque from multiprocessing import Queue from spacepackets.ccsds.spacepacket import parse_space_packets, PacketId from spacepackets.ecss.tc import PacketType EXP_ID = 278 EXP_APID = 1024 + EXP_ID EXP_PACKET_ID_TM = PacketId(PacketType.TM, True, EXP_APID) EXP_PACKET_ID_TC = PacketId(PacketType.TC, True, EXP_APID) OPSSAT_SERVER_PORT = 4096 TMTC_SERVER_PORT = 4097 LOG_LEVEL = logging.DEBUG TC_QUEUE = Queue() TM_QUEUE = Queue() _LOGGER = logging.getLogger(__name__) def main(): logging.basicConfig( format="[%(asctime)s] [%(levelname)-5s] %(message)s", level=LOG_LEVEL, datefmt="%Y-%m-%d %H:%M:%S", ) print("Starting OPS-SAT ground TMTC server") ops_sat_thread = OpsSatServer() ops_sat_thread.start() tmtc_thread = TmtcServer() tmtc_thread.start() ops_sat_thread.join() tmtc_thread.join() class BaseServer(Thread): def __init__(self, log_prefix: str, port: int): self.log_prefix = log_prefix self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_addr = ("0.0.0.0", port) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind(server_addr) super().__init__() def run(self) -> None: self.run_sync_version() def run_sync_version(self) -> None: while True: self.server_socket.listen() (conn_socket, conn_addr) = self.server_socket.accept() conn_socket.setblocking(False) print(f"{self.log_prefix} TCP client {conn_addr} connected") analysis_deque = deque() while True: conn_socket.settimeout(0.2) try: bytes_recvd = conn_socket.recv(4096) if len(bytes_recvd) > 0: _LOGGER.debug(f"{self.log_prefix} RX RAW: {bytes_recvd}") analysis_deque.append(bytes_recvd) elif len(bytes_recvd) == 0: self.handle_read_bytestream(analysis_deque) break else: print("error receiving data from TCP client") except BlockingIOError: self.handle_timeout(conn_socket, analysis_deque) time.sleep(0.2) except TimeoutError: self.handle_timeout(conn_socket, analysis_deque) def handle_timeout(self, conn_socket: socket.socket, analysis_deque: deque): if len(analysis_deque) > 0: self.handle_read_bytestream(analysis_deque) self.send_data_to_client(conn_socket) def run_select_version(self) -> None: while True: self.server_socket.listen() (conn_socket, conn_addr) = self.server_socket.accept() print(f"{self.log_prefix} TCP client {conn_addr} connected") analysis_deque = deque() while True: outputs = [] if self.send_data_available(): outputs.append(conn_socket) (readable, writable, _) = select.select([conn_socket], outputs, [], 0.2) if readable and readable[0]: bytes_recvd = conn_socket.recv(4096) if len(bytes_recvd) > 0: _LOGGER.debug("received data from TCP client: {}", bytes_recvd) analysis_deque.append(bytes_recvd) elif len(bytes_recvd) == 0: self.handle_read_bytestream(analysis_deque) break else: print("error receiving data from TCP client") if writable and writable[0]: self.send_data_to_client(conn_socket) if not writable and not readable: if len(analysis_deque) > 0: self.handle_read_bytestream(analysis_deque) self.send_data_to_client(conn_socket) @abc.abstractmethod def handle_read_bytestream(self, analysis_deque: deque): pass @abc.abstractmethod def send_data_to_client(self, conn_socket: socket.socket): pass @abc.abstractmethod def send_data_available(self) -> bool: pass class OpsSatServer(BaseServer): def __init__(self): super().__init__("[OPS-SAT]", OPSSAT_SERVER_PORT) def handle_read_bytestream(self, analysis_deque: deque): parsed_packets = parse_space_packets(analysis_deque, [EXP_PACKET_ID_TM]) for packet in parsed_packets: _LOGGER.info(f"{self.log_prefix} RX TM: [{packet.hex(sep=',')}]") TM_QUEUE.put(packet) def send_data_to_client(self, conn_socket: socket.socket): while not TC_QUEUE.empty(): next_packet = TC_QUEUE.get() _LOGGER.info(f"{self.log_prefix} TX TC [{next_packet.hex(sep=',')}]") conn_socket.sendall(next_packet) def send_data_available(self) -> bool: return not TC_QUEUE.empty() class TmtcServer(BaseServer): def __init__(self): super().__init__("[TMTC]", TMTC_SERVER_PORT) def handle_read_bytestream(self, analysis_deque: deque): parsed_packets = parse_space_packets(analysis_deque, [EXP_PACKET_ID_TC]) for packet in parsed_packets: _LOGGER.info(f"{self.log_prefix} RX TM: [{packet.hex(sep=',')}]") TC_QUEUE.put(packet) def send_data_to_client(self, conn_socket: socket.socket): while not TM_QUEUE.empty(): next_packet = TM_QUEUE.get() _LOGGER.info(f"{self.log_prefix} TX TM [{next_packet.hex(sep=',')}]") conn_socket.sendall(next_packet) def send_data_available(self) -> bool: return not TM_QUEUE.empty() if __name__ == "__main__": main()