ops-sat-rs/pytmtc/pyserver.py

186 lines
6.4 KiB
Python
Raw Permalink Normal View History

2024-04-19 17:40:38 +02:00
#!/usr/bin/env python3
import socket
import abc
import time
import select
import logging
from typing import Any
from threading import Event, Thread
from collections import deque
from multiprocessing import Queue
from spacepackets.ccsds.spacepacket import parse_space_packets, PacketId
from spacepackets.ecss.tc import PacketType
EXP_ID = 278
EXP_APID = 1024 + EXP_ID
EXP_PACKET_ID_TM = PacketId(PacketType.TM, True, EXP_APID)
EXP_PACKET_ID_TC = PacketId(PacketType.TC, True, EXP_APID)
OPSSAT_SERVER_PORT = 4096
TMTC_SERVER_PORT = 4097
2024-04-24 20:40:51 +02:00
LOG_LEVEL = logging.INFO
2024-04-19 17:40:38 +02:00
TC_QUEUE = Queue()
TM_QUEUE = Queue()
KILL_SIGNAL = Event()
_LOGGER = logging.getLogger(__name__)
def main():
logging.basicConfig(
format="[%(asctime)s] [%(levelname)-5s] %(message)s",
level=LOG_LEVEL,
datefmt="%Y-%m-%d %H:%M:%S",
)
print("Starting OPS-SAT ground TMTC server")
KILL_SIGNAL.clear()
ops_sat_thread = OpsSatServer()
ops_sat_thread.start()
tmtc_thread = TmtcServer()
tmtc_thread.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
_LOGGER.info("Shutting down server gracefully")
KILL_SIGNAL.set()
ops_sat_thread.join()
tmtc_thread.join()
class BaseServer(Thread):
def __init__(self, log_prefix: str, port: int):
self.log_prefix = log_prefix
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_addr = ("0.0.0.0", port)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.setblocking(False)
self.server_socket.settimeout(0.4)
self.server_socket.bind(server_addr)
super().__init__()
def run(self) -> None:
self.run_sync_version()
def run_sync_version(self) -> None:
self.server_socket.listen()
while True and not KILL_SIGNAL.is_set():
try:
(conn_socket, conn_addr) = self.server_socket.accept()
self.handle_connection(conn_socket, conn_addr)
except TimeoutError:
continue
def handle_connection(self, conn_socket: socket.socket, conn_addr: Any):
conn_socket.setblocking(False)
print(f"{self.log_prefix} TCP client {conn_addr} connected")
analysis_deque = deque()
while True and not KILL_SIGNAL.is_set():
conn_socket.settimeout(0.2)
try:
bytes_recvd = conn_socket.recv(4096)
if len(bytes_recvd) > 0:
_LOGGER.debug(f"{self.log_prefix} RX RAW: {bytes_recvd}")
analysis_deque.append(bytes_recvd)
elif len(bytes_recvd) == 0:
self.handle_read_bytestream(analysis_deque)
break
else:
print("error receiving data from TCP client")
except BlockingIOError:
self.handle_timeout(conn_socket, analysis_deque)
time.sleep(0.2)
except TimeoutError:
self.handle_timeout(conn_socket, analysis_deque)
def handle_timeout(self, conn_socket: socket.socket, analysis_deque: deque):
if len(analysis_deque) > 0:
self.handle_read_bytestream(analysis_deque)
self.send_data_to_client(conn_socket)
def run_select_version(self) -> None:
while True:
self.server_socket.listen()
(conn_socket, conn_addr) = self.server_socket.accept()
print(f"{self.log_prefix} TCP client {conn_addr} connected")
analysis_deque = deque()
while True:
outputs = []
if self.send_data_available():
outputs.append(conn_socket)
(readable, writable, _) = select.select([conn_socket], outputs, [], 0.2)
if readable and readable[0]:
bytes_recvd = conn_socket.recv(4096)
if len(bytes_recvd) > 0:
_LOGGER.debug("received data from TCP client: {}", bytes_recvd)
analysis_deque.append(bytes_recvd)
elif len(bytes_recvd) == 0:
self.handle_read_bytestream(analysis_deque)
break
else:
print("error receiving data from TCP client")
if writable and writable[0]:
self.send_data_to_client(conn_socket)
if not writable and not readable:
if len(analysis_deque) > 0:
self.handle_read_bytestream(analysis_deque)
self.send_data_to_client(conn_socket)
@abc.abstractmethod
def handle_read_bytestream(self, analysis_deque: deque):
pass
@abc.abstractmethod
def send_data_to_client(self, conn_socket: socket.socket):
pass
@abc.abstractmethod
def send_data_available(self) -> bool:
pass
class OpsSatServer(BaseServer):
def __init__(self):
super().__init__("[OPS-SAT]", OPSSAT_SERVER_PORT)
def handle_read_bytestream(self, analysis_deque: deque):
parsed_packets = parse_space_packets(analysis_deque, [EXP_PACKET_ID_TM])
for packet in parsed_packets:
_LOGGER.info(f"{self.log_prefix} RX TM: [{packet.hex(sep=',')}]")
TM_QUEUE.put(packet)
def send_data_to_client(self, conn_socket: socket.socket):
while not TC_QUEUE.empty():
next_packet = TC_QUEUE.get()
_LOGGER.info(f"{self.log_prefix} TX TC [{next_packet.hex(sep=',')}]")
conn_socket.sendall(next_packet)
def send_data_available(self) -> bool:
return not TC_QUEUE.empty()
class TmtcServer(BaseServer):
def __init__(self):
super().__init__("[TMTC]", TMTC_SERVER_PORT)
def handle_read_bytestream(self, analysis_deque: deque):
parsed_packets = parse_space_packets(analysis_deque, [EXP_PACKET_ID_TC])
for packet in parsed_packets:
_LOGGER.info(f"{self.log_prefix} RX TM: [{packet.hex(sep=',')}]")
TC_QUEUE.put(packet)
def send_data_to_client(self, conn_socket: socket.socket):
while not TM_QUEUE.empty():
next_packet = TM_QUEUE.get()
_LOGGER.info(f"{self.log_prefix} TX TM [{next_packet.hex(sep=',')}]")
conn_socket.sendall(next_packet)
def send_data_available(self) -> bool:
return not TM_QUEUE.empty()
if __name__ == "__main__":
main()