198 lines
6.9 KiB
Python
Executable File
198 lines
6.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import socket
|
|
import json
|
|
import abc
|
|
import time
|
|
import select
|
|
import logging
|
|
from typing import Any
|
|
from threading import Event, Thread
|
|
from collections import deque
|
|
from multiprocessing import Queue
|
|
from spacepackets.ccsds.spacepacket import parse_space_packets, PacketId
|
|
from spacepackets.ecss.tc import PacketType
|
|
|
|
EXP_ID = 278
|
|
EXP_APID = 1024 + EXP_ID
|
|
EXP_PACKET_ID_TM = PacketId(PacketType.TM, True, EXP_APID)
|
|
EXP_PACKET_ID_TC = PacketId(PacketType.TC, True, EXP_APID)
|
|
OPSSAT_DEFAULT_SERVER_PORT = 4096
|
|
TMTC_SERVER_PORT = 4097
|
|
LOG_LEVEL = logging.INFO
|
|
|
|
|
|
TC_QUEUE = Queue()
|
|
TM_QUEUE = Queue()
|
|
KILL_SIGNAL = Event()
|
|
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def main():
|
|
logging.basicConfig(
|
|
format="[%(asctime)s] [%(levelname)-5s] %(message)s",
|
|
level=LOG_LEVEL,
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
print("Starting OPS-SAT ground TMTC server")
|
|
KILL_SIGNAL.clear()
|
|
|
|
ops_sat_server_port = OPSSAT_DEFAULT_SERVER_PORT
|
|
with open("tmtc_conf.json") as cfg_file:
|
|
# Load JSON data
|
|
data = json.loads(cfg_file.read())
|
|
# Access the value of the tcpip_tcp_server_port key
|
|
maybe_ops_sat_server_port = data.get("tcpip_tcp_server_port")
|
|
if maybe_ops_sat_server_port is not None:
|
|
ops_sat_server_port = maybe_ops_sat_server_port
|
|
_LOGGER.info(f"creating OPS-SAT server on port {ops_sat_server_port}")
|
|
ops_sat_thread = OpsSatServer(ops_sat_server_port)
|
|
ops_sat_thread.start()
|
|
tmtc_thread = TmtcServer()
|
|
tmtc_thread.start()
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
_LOGGER.info("Shutting down server gracefully")
|
|
KILL_SIGNAL.set()
|
|
ops_sat_thread.join()
|
|
tmtc_thread.join()
|
|
|
|
|
|
class BaseServer(Thread):
|
|
def __init__(self, log_prefix: str, port: int):
|
|
self.log_prefix = log_prefix
|
|
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
server_addr = ("0.0.0.0", port)
|
|
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
self.server_socket.setblocking(False)
|
|
self.server_socket.settimeout(0.4)
|
|
self.server_socket.bind(server_addr)
|
|
super().__init__()
|
|
|
|
def run(self) -> None:
|
|
self.run_sync_version()
|
|
|
|
def run_sync_version(self) -> None:
|
|
self.server_socket.listen()
|
|
while True and not KILL_SIGNAL.is_set():
|
|
try:
|
|
(conn_socket, conn_addr) = self.server_socket.accept()
|
|
self.handle_connection(conn_socket, conn_addr)
|
|
except TimeoutError:
|
|
continue
|
|
|
|
def handle_connection(self, conn_socket: socket.socket, conn_addr: Any):
|
|
conn_socket.setblocking(False)
|
|
print(f"{self.log_prefix} TCP client {conn_addr} connected")
|
|
analysis_deque = deque()
|
|
while True and not KILL_SIGNAL.is_set():
|
|
conn_socket.settimeout(0.2)
|
|
try:
|
|
bytes_recvd = conn_socket.recv(4096)
|
|
if len(bytes_recvd) > 0:
|
|
_LOGGER.debug(f"{self.log_prefix} RX RAW: {bytes_recvd}")
|
|
analysis_deque.append(bytes_recvd)
|
|
elif len(bytes_recvd) == 0:
|
|
self.handle_read_bytestream(analysis_deque)
|
|
break
|
|
else:
|
|
print("error receiving data from TCP client")
|
|
except BlockingIOError:
|
|
self.handle_timeout(conn_socket, analysis_deque)
|
|
time.sleep(0.2)
|
|
except TimeoutError:
|
|
self.handle_timeout(conn_socket, analysis_deque)
|
|
|
|
def handle_timeout(self, conn_socket: socket.socket, analysis_deque: deque):
|
|
if len(analysis_deque) > 0:
|
|
self.handle_read_bytestream(analysis_deque)
|
|
self.send_data_to_client(conn_socket)
|
|
|
|
def run_select_version(self) -> None:
|
|
while True:
|
|
self.server_socket.listen()
|
|
(conn_socket, conn_addr) = self.server_socket.accept()
|
|
print(f"{self.log_prefix} TCP client {conn_addr} connected")
|
|
analysis_deque = deque()
|
|
while True:
|
|
outputs = []
|
|
if self.send_data_available():
|
|
outputs.append(conn_socket)
|
|
(readable, writable, _) = select.select([conn_socket], outputs, [], 0.2)
|
|
if readable and readable[0]:
|
|
bytes_recvd = conn_socket.recv(4096)
|
|
if len(bytes_recvd) > 0:
|
|
_LOGGER.debug("received data from TCP client: {}", bytes_recvd)
|
|
analysis_deque.append(bytes_recvd)
|
|
elif len(bytes_recvd) == 0:
|
|
self.handle_read_bytestream(analysis_deque)
|
|
break
|
|
else:
|
|
print("error receiving data from TCP client")
|
|
if writable and writable[0]:
|
|
self.send_data_to_client(conn_socket)
|
|
if not writable and not readable:
|
|
if len(analysis_deque) > 0:
|
|
self.handle_read_bytestream(analysis_deque)
|
|
self.send_data_to_client(conn_socket)
|
|
|
|
@abc.abstractmethod
|
|
def handle_read_bytestream(self, analysis_deque: deque):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def send_data_to_client(self, conn_socket: socket.socket):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def send_data_available(self) -> bool:
|
|
pass
|
|
|
|
|
|
class OpsSatServer(BaseServer):
|
|
def __init__(self, port: int):
|
|
self.port = port
|
|
super().__init__("[OPS-SAT]", port)
|
|
|
|
def handle_read_bytestream(self, analysis_deque: deque):
|
|
parsed_packets = parse_space_packets(analysis_deque, [EXP_PACKET_ID_TM])
|
|
for packet in parsed_packets:
|
|
_LOGGER.info(f"{self.log_prefix} RX TM: [{packet.hex(sep=',')}]")
|
|
TM_QUEUE.put(packet)
|
|
|
|
def send_data_to_client(self, conn_socket: socket.socket):
|
|
while not TC_QUEUE.empty():
|
|
next_packet = TC_QUEUE.get()
|
|
_LOGGER.info(f"{self.log_prefix} TX TC [{next_packet.hex(sep=',')}]")
|
|
conn_socket.sendall(next_packet)
|
|
|
|
def send_data_available(self) -> bool:
|
|
return not TC_QUEUE.empty()
|
|
|
|
|
|
class TmtcServer(BaseServer):
|
|
def __init__(self):
|
|
super().__init__("[TMTC]", TMTC_SERVER_PORT)
|
|
|
|
def handle_read_bytestream(self, analysis_deque: deque):
|
|
parsed_packets = parse_space_packets(analysis_deque, [EXP_PACKET_ID_TC])
|
|
for packet in parsed_packets:
|
|
_LOGGER.info(f"{self.log_prefix} RX TM: [{packet.hex(sep=',')}]")
|
|
TC_QUEUE.put(packet)
|
|
|
|
def send_data_to_client(self, conn_socket: socket.socket):
|
|
while not TM_QUEUE.empty():
|
|
next_packet = TM_QUEUE.get()
|
|
_LOGGER.info(f"{self.log_prefix} TX TM [{next_packet.hex(sep=',')}]")
|
|
conn_socket.sendall(next_packet)
|
|
|
|
def send_data_available(self) -> bool:
|
|
return not TM_QUEUE.empty()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|