From 452a2c436075087833a1f375d68d55dc7146a0a9 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 19 Apr 2024 10:27:01 +0200 Subject: [PATCH] almost at the goal --- pytmtc/{main.py => pyclient.py} | 0 pytmtc/pyserver.py | 120 ++++++++++++++++++++++++++ pytmtc/tcp_server.py | 90 +++++++++---------- pytmtc/tests/server-test.py | 39 --------- src/config.rs | 3 +- src/interface/tcp_spp_client.rs | 147 ++++++++++++++++++-------------- src/main.rs | 2 +- src/pus/test.rs | 1 - 8 files changed, 252 insertions(+), 150 deletions(-) rename pytmtc/{main.py => pyclient.py} (100%) create mode 100755 pytmtc/pyserver.py delete mode 100755 pytmtc/tests/server-test.py diff --git a/pytmtc/main.py b/pytmtc/pyclient.py similarity index 100% rename from pytmtc/main.py rename to pytmtc/pyclient.py diff --git a/pytmtc/pyserver.py b/pytmtc/pyserver.py new file mode 100755 index 0000000..a1d8056 --- /dev/null +++ b/pytmtc/pyserver.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +import socket +import abc +import select +import logging +from threading import Thread +from collections import deque +from multiprocessing import Queue +from spacepackets.ccsds.spacepacket import parse_space_packets, PacketId +from spacepackets.ecss.tc import PacketType + +EXP_ID = 278 +EXP_APID = 1024 + EXP_ID +EXP_PACKET_ID_TM = PacketId(PacketType.TM, True, EXP_APID) +EXP_PACKET_ID_TC = PacketId(PacketType.TC, True, EXP_APID) +OPSSAT_SERVER_PORT = 4096 +TMTC_SERVER_PORT = 4097 + + +TC_QUEUE = Queue() +TM_QUEUE = Queue() + + +_LOGGER = logging.getLogger(__name__) + + +def main(): + logging.basicConfig( + format="[%(asctime)s] [%(levelname)-5s] %(message)s", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", + ) + print("Starting OPS-SAT ground TMTC server") + ops_sat_thread = OpsSatServer() + ops_sat_thread.start() + tmtc_thread = TmtcServer() + tmtc_thread.start() + ops_sat_thread.join() + tmtc_thread.join() + + +class BaseServer(Thread): + def __init__(self, log_prefix: str, port: int): + self.log_prefix = log_prefix + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_addr = ("0.0.0.0", port) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.bind(server_addr) + super().__init__() + + def run(self) -> None: + while True: + self.server_socket.listen() + (conn_socket, conn_addr) = self.server_socket.accept() + # conn_socket.setblocking(True) + # conn_socket.settimeout(0.2) + print(f"{self.log_prefix} TCP client {conn_addr} connected") + analysis_deque = deque() + while True: + (readable, writable, _) = select([conn_socket], [], [], 0.2) + try: + bytes_recvd = conn_socket.recv(4096) + if len(bytes_recvd) > 0: + _LOGGER.debug(f"{self.log_prefix} RX RAW: {bytes_recvd}") + analysis_deque.append(bytes_recvd) + elif len(bytes_recvd) == 0: + self.handle_read_bytestream(analysis_deque) + break + else: + print("error receiving data from TCP client") + except TimeoutError: + if len(analysis_deque) > 0: + self.handle_read_bytestream(analysis_deque) + self.send_data_to_client(conn_socket) + + @abc.abstractmethod + def handle_read_bytestream(self, analysis_deque: deque): + pass + + @abc.abstractmethod + def send_data_to_client(self, conn_socket: socket.socket): + pass + + +class OpsSatServer(BaseServer): + def __init__(self): + super().__init__("[OPS-SAT]", OPSSAT_SERVER_PORT) + + def handle_read_bytestream(self, analysis_deque: deque): + parsed_packets = parse_space_packets(analysis_deque, [EXP_PACKET_ID_TM]) + for packet in parsed_packets: + _LOGGER.info(f"{self.log_prefix} RX TM: [{packet.hex(sep=',')}]") + TM_QUEUE.put(packet) + + def send_data_to_client(self, conn_socket: socket.socket): + while not TC_QUEUE.empty(): + next_packet = TC_QUEUE.get() + _LOGGER.info(f"{self.log_prefix} TX TC [{next_packet.hex(sep=',')}]") + conn_socket.sendall(next_packet) + + +class TmtcServer(BaseServer): + def __init__(self): + super().__init__("[TMTC]", TMTC_SERVER_PORT) + + def handle_read_bytestream(self, analysis_deque: deque): + parsed_packets = parse_space_packets(analysis_deque, [EXP_PACKET_ID_TC]) + for packet in parsed_packets: + _LOGGER.info(f"{self.log_prefix} RX TC: [{packet.hex(sep=',')}]") + TC_QUEUE.put(packet) + + def send_data_to_client(self, conn_socket: socket.socket): + while not TM_QUEUE.empty(): + next_packet = TM_QUEUE.get() + _LOGGER.info(f"{self.log_prefix} TX TM [{next_packet.hex(sep=',')}]") + conn_socket.sendall(next_packet) + + +if __name__ == "__main__": + main() diff --git a/pytmtc/tcp_server.py b/pytmtc/tcp_server.py index 0c2c6dc..a65cfb4 100644 --- a/pytmtc/tcp_server.py +++ b/pytmtc/tcp_server.py @@ -28,9 +28,9 @@ class TcpServer(ComInterface): self._server_socket: Optional[socket.socket] = None self._server_thread = Thread(target=self._server_task, daemon=True) self._connected = False - self._conn_start = None - self._writing_done = False - self._reading_done = False + # self._conn_start = None + # self._writing_done = False + # self._reading_done = False @property def connected(self) -> bool: @@ -66,55 +66,57 @@ class TcpServer(ComInterface): while True and not self._kill_signal.is_set(): try: (conn_socket, conn_addr) = self._server_socket.accept() - self.conn_start = time.time() - while True: - self._handle_connection(conn_socket, conn_addr) - if ( - self._reading_done and self._writing_done - ) or time.time() - self.conn_start > 0.5: - print("reading and writing done") - break + self._handle_connection(conn_socket, conn_addr) + # conn_socket.close() + """ + if ( + self._reading_done and self._writing_done + ) or time.time() - self.conn_start > 0.5: + print("reading and writing done") + break + + """ except TimeoutError: - print("timeout error") continue def _handle_connection(self, conn_socket: socket.socket, conn_addr: Any): _LOGGER.info(f"TCP client {conn_addr} connected") - (readable, writable, _) = select.select( - [conn_socket], - [conn_socket], - [], - 0.1, - ) + queue_len = 0 - # TODO: Why is the stupid conn socket never readable? - print(f"Writable: {writable}") - print(f"Readable: {readable}") - if writable and writable[0]: - queue_len = 0 + while True: with self._tc_lock: queue_len = len(self._tc_packet_queue) - while queue_len > 0: - next_packet = bytes() - with self._tc_lock: - next_packet = self._tc_packet_queue.popleft() - if len(next_packet) > 0: - conn_socket.sendall(next_packet) - queue_len -= 1 - self._writing_done = True - if readable and readable[0]: - print("reading shit") - while True: - bytes_recvd = conn_socket.recv(4096) - if len(bytes_recvd) > 0: - print(f"Received bytes from TCP client: {bytes_recvd.decode()}") - with self._tm_lock: - self._tm_packet_queue.append(bytes_recvd) - elif len(bytes_recvd) == 0: - self._reading_done = True - break - else: - print("error receiving data from TCP client") + outputs = [] + if queue_len > 0: + outputs.append(conn_socket) + (readable, writable, _) = select.select( + [conn_socket], + outputs, + [], + 0.2, + ) + + if writable and writable[0]: + print("writeable") + while queue_len > 0: + next_packet = bytes() + with self._tc_lock: + next_packet = self._tc_packet_queue.popleft() + if len(next_packet) > 0: + conn_socket.sendall(next_packet) + queue_len -= 1 + if readable and readable[0]: + print("readable") + while True: + bytes_recvd = conn_socket.recv(4096) + if len(bytes_recvd) > 0: + print(f"Received bytes from TCP client: {bytes_recvd.decode()}") + with self._tm_lock: + self._tm_packet_queue.append(bytes_recvd) + elif len(bytes_recvd) == 0: + break + else: + print("error receiving data from TCP client") def is_open(self) -> bool: """Can be used to check whether the communication interface is open. This is useful if diff --git a/pytmtc/tests/server-test.py b/pytmtc/tests/server-test.py deleted file mode 100755 index da8b67e..0000000 --- a/pytmtc/tests/server-test.py +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env python3 -import socket -from spacepackets.ecss.tc import PusTelecommand - - -EXP_ID = 278 -APID = 1024 + EXP_ID -SEND_PING_ONCE = True - - -def main(): - global SEND_PING_ONCE - server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server_addr = ("localhost", 4096) - server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(server_addr) - while True: - server_socket.listen() - (conn_socket, conn_addr) = server_socket.accept() - print(f"TCP client {conn_addr} connected") - while True: - bytes_recvd = conn_socket.recv(4096) - if len(bytes_recvd) > 0: - print(f"Received bytes from TCP client: {bytes_recvd}") - elif len(bytes_recvd) == 0: - break - else: - print("error receiving data from TCP client") - if SEND_PING_ONCE: - ping_tc = PusTelecommand(service=17, subservice=1, seq_count=0, apid=APID) - conn_socket.sendall(ping_tc.pack()) - SEND_PING_ONCE = False - conn_socket.close() - continue - # server_socket.close() - - -if __name__ == "__main__": - main() diff --git a/src/config.rs b/src/config.rs index b33b8f3..e48b764 100644 --- a/src/config.rs +++ b/src/config.rs @@ -22,7 +22,8 @@ pub const VALID_PACKET_ID_LIST: &[PacketId] = &[PacketId::new_for_tc(true, EXPER // TODO: Would be nice if this can be commanded as well.. /// Can be enabled to print all SPP packets received from the SPP server on port 4096. -pub const SPP_CLIENT_WIRETAPPING_RX: bool = false; +pub const SPP_CLIENT_WIRETAPPING_RX: bool = true; +pub const SPP_CLIENT_WIRETAPPING_TX: bool = true; #[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)] #[repr(u8)] diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 6f208c5..479f285 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -5,7 +5,7 @@ use std::sync::mpsc; use mio::net::TcpStream; use mio::{Events, Interest, Poll, Token}; use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; -use ops_sat_rs::config::{SPP_CLIENT_WIRETAPPING_RX, TCP_SPP_SERVER_PORT}; +use ops_sat_rs::config::{SPP_CLIENT_WIRETAPPING_RX, SPP_CLIENT_WIRETAPPING_TX}; use satrs::encoding::ccsds::parse_buffer_for_ccsds_space_packets; use satrs::queue::GenericSendError; use satrs::spacepackets::PacketId; @@ -23,15 +23,24 @@ pub enum ClientError { Io(#[from] io::Error), } +#[derive(Debug, PartialEq, Eq)] +pub enum ConnectionStatus { + Unknown, + Connected, + LostConnection, + TryingReconnect, +} + pub struct TcpSppClient { id: ComponentId, poll: Poll, events: Events, // Optional to allow periodic reconnection attempts on the TCP server. - client: TcpStream, + client: Option, read_buf: [u8; 4096], tm_tcp_client_rx: mpsc::Receiver, server_addr: SocketAddr, + connection: ConnectionStatus, tc_source_tx: mpsc::Sender, validator: SimpleSpValidator, } @@ -46,74 +55,68 @@ impl TcpSppClient { ) -> io::Result { let poll = Poll::new()?; let events = Events::with_capacity(128); - let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); - let mut client = TcpStream::connect(server_addr)?; - poll.registry().register( + let mut client = Self { + id, + poll, + events, + client: None, + connection: ConnectionStatus::Unknown, + read_buf: [0; 4096], + server_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port), + tm_tcp_client_rx, + tc_source_tx, + validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), + }; + client.connect()?; + Ok(client) + } + + pub fn connect(&mut self) -> io::Result<()> { + let mut client = TcpStream::connect(self.server_addr)?; + + self.poll.registry().register( &mut client, Token(0), Interest::READABLE | Interest::WRITABLE, )?; - Ok(Self { - id, - poll, - events, - client, - read_buf: [0; 4096], - server_addr, - tm_tcp_client_rx, - tc_source_tx, - validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), - }) - } - - pub fn periodic_operation(&mut self) -> Result<(), ClientError> { - // if self.client.is_some() { - return self.perform_regular_operation(); - /* - } else { - log::info!("attempting reconnect"); - let client_result = self.attempt_connection(); - match client_result { - Ok(_) => { - self.perform_regular_operation()?; - } - Err(ref e) => { - log::warn!( - "connection to TCP server {} failed: {}", - self.server_addr, - e - ); - } - } - } - */ + self.client = Some(client); + self.connection = ConnectionStatus::TryingReconnect; Ok(()) } - pub fn perform_regular_operation(&mut self) -> Result<(), ClientError> { - self.poll - .poll(&mut self.events, Some(STOP_CHECK_FREQUENCY))?; - let events: Vec = self.events.iter().cloned().collect(); - for event in events { - if event.token() == Token(0) { - if event.is_readable() { - log::info!("readable event"); - self.check_conn_status()?; - self.read_from_server()?; - } - if event.is_writable() { - log::info!("writable event"); - self.check_conn_status()?; - self.write_to_server()?; - } + pub fn operation(&mut self) -> Result<(), ClientError> { + match self.connection { + ConnectionStatus::TryingReconnect | ConnectionStatus::Unknown => { + self.check_conn_status()? } - } + ConnectionStatus::Connected => { + self.check_conn_status()?; + self.poll + .poll(&mut self.events, Some(STOP_CHECK_FREQUENCY))?; + let events: Vec = self.events.iter().cloned().collect(); + for event in events { + if event.token() == Token(0) { + if event.is_readable() { + log::warn!("TCP client is readable"); + self.read_from_server()?; + } + if event.is_writable() { + log::warn!("TCP client is writable"); + self.write_to_server()?; + } + } + } + return Ok(()); + } + ConnectionStatus::LostConnection => self.connect()?, + }; + std::thread::sleep(STOP_CHECK_FREQUENCY); Ok(()) } pub fn read_from_server(&mut self) -> Result<(), ClientError> { - match self.client.read(&mut self.read_buf) { - Ok(0) => (), // return Err(io::Error::from(io::ErrorKind::BrokenPipe).into()), + match self.client.as_mut().unwrap().read(&mut self.read_buf) { + Ok(0) => (), Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?, Err(e) => return Err(e.into()), } @@ -124,7 +127,14 @@ impl TcpSppClient { loop { match self.tm_tcp_client_rx.try_recv() { Ok(tm) => { - self.client.write_all(&tm.packet)?; + if SPP_CLIENT_WIRETAPPING_TX { + log::debug!( + "SPP TCP TX {}: {:x?}", + tm.packet.len(), + tm.packet.as_slice() + ); + } + self.client.as_mut().unwrap().write_all(&tm.packet)?; } Err(e) => match e { mpsc::TryRecvError::Empty => break, @@ -138,12 +148,21 @@ impl TcpSppClient { Ok(()) } - pub fn check_conn_status(&mut self) -> io::Result { - match self.client.peer_addr() { - Ok(_) => Ok(true), + pub fn check_conn_status(&mut self) -> io::Result<()> { + match self.client.as_mut().unwrap().peer_addr() { + Ok(_) => { + if self.connection == ConnectionStatus::Unknown + || self.connection == ConnectionStatus::TryingReconnect + { + self.connection = ConnectionStatus::Connected; + } + Ok(()) + } Err(e) => { if e.kind() == io::ErrorKind::NotConnected { - return Ok(false); + log::warn!("lost connection, or do not have one"); + self.connection = ConnectionStatus::LostConnection; + return Ok(()); } Err(e) } @@ -154,7 +173,7 @@ impl TcpSppClient { let mut dummy = 0; if SPP_CLIENT_WIRETAPPING_RX { log::debug!( - "received {} bytes on TCP client: {:x?}", + "SPP TCP RX {} bytes: {:x?}", read_bytes, &self.read_buf[..read_bytes] ); diff --git a/src/main.rs b/src/main.rs index 8f8f7b0..85870d0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -232,7 +232,7 @@ fn main() { .spawn(move || { info!("Running TCP SPP client"); loop { - let result = tcp_spp_client.periodic_operation(); + let result = tcp_spp_client.operation(); if let Err(e) = result { log::error!("TCP SPP client error: {}", e); } diff --git a/src/pus/test.rs b/src/pus/test.rs index 8428be7..28a3103 100644 --- a/src/pus/test.rs +++ b/src/pus/test.rs @@ -42,7 +42,6 @@ pub struct TestCustomServiceWrapper { EcssTcInVecConverter, VerificationReporter, >, - // pub test_srv_event_sender: mpsc::Sender, } impl TestCustomServiceWrapper {