implement basic TCP server for TMTC handling

This commit is contained in:
Robin Müller 2024-04-18 17:28:51 +02:00
parent 7a86078223
commit 1792bab7fe
Signed by: muellerr
GPG Key ID: A649FB78196E3849
2 changed files with 141 additions and 1 deletions

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
"""Example client for the sat-rs example application"""
from __future__ import annotations
import logging
import sys
import time
@ -17,7 +18,7 @@ from spacepackets.ccsds.time import CdsShortTimestamp
from tmtccmd import TcHandlerBase, ProcedureParamsWrapper
from tmtccmd.core.base import BackendRequest
from tmtccmd.pus import VerificationWrapper
from tmtccmd.tmtc import CcsdsTmHandler, GenericApidHandlerBase
from tmtccmd.tmtc import CcsdsTmHandler, GenericApidHandlerBase, TelemetryListT
from tmtccmd.com import ComInterface
from tmtccmd.config import (
CmdTreeNode,
@ -44,6 +45,8 @@ from tmtccmd.tmtc import (
)
from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.obj_id import ObjectIdDictT
from collections import deque
import socket
import pus_tc

137
pytmtc/tcp_server.py Normal file
View File

@ -0,0 +1,137 @@
from typing import Any, Optional
import socket
from threading import Thread, Event, Lock
from collections import deque
from tmtccmd.com import ComInterface
from tmtccmd.tmtc import TelemetryListT
class TcpServer(ComInterface):
def __init__(self, port: int):
self.port = port
self._max_num_packets_in_tc_queue = 500
self._max_num_packets_in_tm_queue = 500
self._server_addr = ("localhost", self.port)
self._tc_packet_queue = deque()
self._tm_packet_queue = deque()
self._tc_lock = Lock()
self._tm_lock = Lock()
self._kill_signal = Event()
self._server_socket: Optional[socket.socket] = None
self._server_thread = Thread(target=TcpServer._server_task, daemon=True)
self._connected = False
@property
def connected(self) -> bool:
return self._connected
@property
def id(self) -> str:
return "tcp_server"
def initialize(self, args: Any = 0) -> Any:
"""Perform initializations step which can not be done in constructor or which require
returnvalues.
"""
pass
def open(self, args: Any = 0):
"""Opens the communication interface to allow communication.
:return:
"""
if self.connected:
return
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# We need to check the kill signal periodically to allow closing the server.
self.server_socket.settimeout(0.5)
self.server_socket.bind(self._server_addr)
self._connected = True
self._server_thread.start()
def _server_task(self):
assert self._server_socket is not None
while True and not self._kill_signal.is_set():
self._server_socket.listen()
(conn_socket, conn_addr) = self._server_socket.accept()
print("TCP client {} connected", conn_addr)
while True:
bytes_recvd = conn_socket.recv(4096)
if len(bytes_recvd) > 0:
print(f"Received bytes from TCP client: {bytes_recvd.decode()}")
with self._tm_lock:
self._tm_packet_queue.append(bytes_recvd)
elif len(bytes_recvd) == 0:
break
else:
print("error receiving data from TCP client")
queue_len = 0
with self._tc_lock:
queue_len = len(self._tc_packet_queue)
while queue_len > 0:
next_packet = bytes()
with self._tc_lock:
next_packet = self._tc_packet_queue.popleft()
conn_socket.sendall(next_packet)
queue_len -= 1
def is_open(self) -> bool:
"""Can be used to check whether the communication interface is open. This is useful if
opening a COM interface takes a longer time and is non-blocking
"""
return self.connected
def close(self, args: Any = 0):
"""Closes the ComIF and releases any held resources (for example a Communication Port).
:return:
"""
self._kill_signal.set()
self._server_thread.join()
self._connected = False
def send(self, data: bytes):
"""Send raw data.
:raises SendError: Sending failed for some reason.
"""
with self._tc_lock:
# Deque is thread-safe according to the documentation.. so this should be fine.
if len(self._tc_packet_queue) >= self._max_num_packets_in_tc_queue:
# Remove oldest packet
self._tc_packet_queue.popleft()
self._tc_packet_queue.append(data)
def receive(self, parameters: Any = 0) -> TelemetryListT:
"""Returns a list of received packets. The child class can use a separate thread to poll for
the packets or use some other mechanism and container like a deque to store packets
to be returned here.
:param parameters:
:raises ReceptionDecodeError: If the underlying COM interface uses encoding and
decoding and the decoding fails, this exception will be returned.
:return:
"""
with self._tm_lock:
packet_list = []
while self._tm_packet_queue:
packet_list.append(self._tm_packet_queue.popleft())
return packet_list
def data_available(self, timeout: float, parameters: Any = 0) -> int:
"""Check whether TM packets are available.
:param timeout: Can be used to block on available data if supported by the specific
communication interface.
:param parameters: Can be an arbitrary parameter.
:raises ReceptionDecodeError: If the underlying COM interface uses encoding and
decoding when determining the number of available packets, this exception can be
thrown on decoding errors.
:return: 0 if no data is available, number of packets otherwise.
"""
with self._tm_lock:
return len(self._tm_packet_queue)