diff --git a/pytmtc/pyserver.py b/pytmtc/pyserver.py index 8893579..19caf93 100755 --- a/pytmtc/pyserver.py +++ b/pytmtc/pyserver.py @@ -4,7 +4,8 @@ import abc import time import select import logging -from threading import Thread +from typing import Any +from threading import Event, Thread from collections import deque from multiprocessing import Queue from spacepackets.ccsds.spacepacket import parse_space_packets, PacketId @@ -21,6 +22,7 @@ LOG_LEVEL = logging.DEBUG TC_QUEUE = Queue() TM_QUEUE = Queue() +KILL_SIGNAL = Event() _LOGGER = logging.getLogger(__name__) @@ -33,10 +35,17 @@ def main(): datefmt="%Y-%m-%d %H:%M:%S", ) print("Starting OPS-SAT ground TMTC server") + KILL_SIGNAL.clear() ops_sat_thread = OpsSatServer() ops_sat_thread.start() tmtc_thread = TmtcServer() tmtc_thread.start() + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + _LOGGER.info("Shutting down server gracefully") + KILL_SIGNAL.set() ops_sat_thread.join() tmtc_thread.join() @@ -47,6 +56,8 @@ class BaseServer(Thread): self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_addr = ("0.0.0.0", port) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.setblocking(False) + self.server_socket.settimeout(0.4) self.server_socket.bind(server_addr) super().__init__() @@ -54,29 +65,35 @@ class BaseServer(Thread): self.run_sync_version() def run_sync_version(self) -> None: - while True: - self.server_socket.listen() - (conn_socket, conn_addr) = self.server_socket.accept() - conn_socket.setblocking(False) - print(f"{self.log_prefix} TCP client {conn_addr} connected") - analysis_deque = deque() - while True: - conn_socket.settimeout(0.2) - try: - bytes_recvd = conn_socket.recv(4096) - if len(bytes_recvd) > 0: - _LOGGER.debug(f"{self.log_prefix} RX RAW: {bytes_recvd}") - analysis_deque.append(bytes_recvd) - elif len(bytes_recvd) == 0: - self.handle_read_bytestream(analysis_deque) - break - else: - print("error receiving data from TCP client") - except BlockingIOError: - self.handle_timeout(conn_socket, analysis_deque) - time.sleep(0.2) - except TimeoutError: - self.handle_timeout(conn_socket, analysis_deque) + self.server_socket.listen() + while True and not KILL_SIGNAL.is_set(): + try: + (conn_socket, conn_addr) = self.server_socket.accept() + self.handle_connection(conn_socket, conn_addr) + except TimeoutError: + continue + + def handle_connection(self, conn_socket: socket.socket, conn_addr: Any): + conn_socket.setblocking(False) + print(f"{self.log_prefix} TCP client {conn_addr} connected") + analysis_deque = deque() + while True and not KILL_SIGNAL.is_set(): + conn_socket.settimeout(0.2) + try: + bytes_recvd = conn_socket.recv(4096) + if len(bytes_recvd) > 0: + _LOGGER.debug(f"{self.log_prefix} RX RAW: {bytes_recvd}") + analysis_deque.append(bytes_recvd) + elif len(bytes_recvd) == 0: + self.handle_read_bytestream(analysis_deque) + break + else: + print("error receiving data from TCP client") + except BlockingIOError: + self.handle_timeout(conn_socket, analysis_deque) + time.sleep(0.2) + except TimeoutError: + self.handle_timeout(conn_socket, analysis_deque) def handle_timeout(self, conn_socket: socket.socket, analysis_deque: deque): if len(analysis_deque) > 0: diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 94ea272..8e41c43 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -1,4 +1,3 @@ -use socket2::{Domain, Socket, Type}; use std::io::{self, Read}; use std::net::TcpStream as StdTcpStream; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -212,7 +211,7 @@ impl TcpSppClientMio { pub struct TcpSppClientStd { common: TcpSppClientCommon, // Optional to allow periodic reconnection attempts on the TCP server. - client: Option, + stream: Option, } impl TcpSppClientStd { @@ -224,23 +223,16 @@ impl TcpSppClientStd { port: u16, ) -> io::Result { let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); - // Create a TCP listener bound to two addresses. - let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; - socket.set_reuse_address(true)?; - socket.set_nonblocking(true)?; - socket.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?; - let client = match socket.connect(&server_addr.into()) { - Ok(_) => { - let client: StdTcpStream = socket.into(); - Some(client) + let client = match StdTcpStream::connect(server_addr) { + Ok(stream) => { + stream.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?; + Some(stream) } Err(e) => { - log::error!("error connecting to server: {}", e); + log::warn!("error connecting to server: {}", e); None } }; - - // let tcp_stream = StdTcpStream::connect_timeout(addr, timeout) Ok(Self { common: TcpSppClientCommon { id, @@ -250,28 +242,53 @@ impl TcpSppClientStd { tc_source_tx, validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), }, - client, + stream: client, + }) + } + + pub fn attempt_connect(&mut self, log_error: bool) -> io::Result { + Ok(match StdTcpStream::connect(self.common.server_addr) { + Ok(stream) => { + stream.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?; + self.stream = Some(stream); + true + } + Err(e) => { + if log_error { + log::warn!("error connecting to server: {}", e); + } + false + } }) } pub fn operation(&mut self) -> Result<(), ClientError> { - if let Some(client) = &mut self.client { - log::info!("client is valid"); + if let Some(client) = &mut self.stream { match client.read(&mut self.common.read_buf) { - Ok(0) => println!("read 0 bytes"), + // Not sure if this can happen or this is actually an error condition.. + Ok(0) => { + log::info!("server closed connection"); + self.stream = None; + } Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?, Err(e) => { if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut { - log::info!("timeout shit"); self.common.write_to_server(client)?; return Ok(()); } + log::warn!("server error: {e:?}"); + if e.kind() == io::ErrorKind::ConnectionReset { + self.stream = None; + } return Err(e.into()); } } } else { - // TODO: Reconnect attempt occasionally. + if self.attempt_connect(false)? { + log::info!("reconnected to server succesfully"); + return self.operation(); + } std::thread::sleep(STOP_CHECK_FREQUENCY); } diff --git a/src/main.rs b/src/main.rs index 370a5df..45871e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,12 +9,13 @@ use log::info; use ops_sat_rs::config::{ cfg_file::create_app_config, components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, - tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY}, + tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, VALID_PACKET_ID_LIST, }; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}; +use crate::pus::{PusTcDistributor, PusTcMpscRouter}; use crate::tmtc::tm_sink::TmFunnelDynamic; use crate::{controller::ExperimentController, pus::test::create_test_service}; use crate::{ @@ -22,10 +23,6 @@ use crate::{ interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer}, logger::setup_logger, }; -use crate::{ - interface::tcp_spp_client::TcpSppClientMio, - pus::{PusTcDistributor, PusTcMpscRouter}, -}; use crate::{interface::tcp_spp_client::TcpSppClientStd, tmtc::tc_source::TcSourceTaskDynamic}; use crate::{ pus::{action::create_action_service, stack::PusStack},