diff --git a/pytmtc/pyserver.py b/pytmtc/pyserver.py index a1d8056..8893579 100755 --- a/pytmtc/pyserver.py +++ b/pytmtc/pyserver.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import socket import abc +import time import select import logging from threading import Thread @@ -15,6 +16,7 @@ EXP_PACKET_ID_TM = PacketId(PacketType.TM, True, EXP_APID) EXP_PACKET_ID_TC = PacketId(PacketType.TC, True, EXP_APID) OPSSAT_SERVER_PORT = 4096 TMTC_SERVER_PORT = 4097 +LOG_LEVEL = logging.DEBUG TC_QUEUE = Queue() @@ -27,7 +29,7 @@ _LOGGER = logging.getLogger(__name__) def main(): logging.basicConfig( format="[%(asctime)s] [%(levelname)-5s] %(message)s", - level=logging.INFO, + level=LOG_LEVEL, datefmt="%Y-%m-%d %H:%M:%S", ) print("Starting OPS-SAT ground TMTC server") @@ -49,15 +51,17 @@ class BaseServer(Thread): super().__init__() def run(self) -> None: + self.run_sync_version() + + def run_sync_version(self) -> None: while True: self.server_socket.listen() (conn_socket, conn_addr) = self.server_socket.accept() - # conn_socket.setblocking(True) - # conn_socket.settimeout(0.2) + conn_socket.setblocking(False) print(f"{self.log_prefix} TCP client {conn_addr} connected") analysis_deque = deque() while True: - (readable, writable, _) = select([conn_socket], [], [], 0.2) + conn_socket.settimeout(0.2) try: bytes_recvd = conn_socket.recv(4096) if len(bytes_recvd) > 0: @@ -68,7 +72,41 @@ class BaseServer(Thread): break else: print("error receiving data from TCP client") + except BlockingIOError: + self.handle_timeout(conn_socket, analysis_deque) + time.sleep(0.2) except TimeoutError: + self.handle_timeout(conn_socket, analysis_deque) + + def handle_timeout(self, conn_socket: socket.socket, analysis_deque: deque): + if len(analysis_deque) > 0: + self.handle_read_bytestream(analysis_deque) + self.send_data_to_client(conn_socket) + + def run_select_version(self) -> None: + while True: + self.server_socket.listen() + (conn_socket, conn_addr) = self.server_socket.accept() + print(f"{self.log_prefix} TCP client {conn_addr} connected") + analysis_deque = deque() + while True: + outputs = [] + if self.send_data_available(): + outputs.append(conn_socket) + (readable, writable, _) = select.select([conn_socket], outputs, [], 0.2) + if readable and readable[0]: + bytes_recvd = conn_socket.recv(4096) + if len(bytes_recvd) > 0: + _LOGGER.debug("received data from TCP client: {}", bytes_recvd) + analysis_deque.append(bytes_recvd) + elif len(bytes_recvd) == 0: + self.handle_read_bytestream(analysis_deque) + break + else: + print("error receiving data from TCP client") + if writable and writable[0]: + self.send_data_to_client(conn_socket) + if not writable and not readable: if len(analysis_deque) > 0: self.handle_read_bytestream(analysis_deque) self.send_data_to_client(conn_socket) @@ -81,6 +119,10 @@ class BaseServer(Thread): def send_data_to_client(self, conn_socket: socket.socket): pass + @abc.abstractmethod + def send_data_available(self) -> bool: + pass + class OpsSatServer(BaseServer): def __init__(self): @@ -98,6 +140,9 @@ class OpsSatServer(BaseServer): _LOGGER.info(f"{self.log_prefix} TX TC [{next_packet.hex(sep=',')}]") conn_socket.sendall(next_packet) + def send_data_available(self) -> bool: + return not TC_QUEUE.empty() + class TmtcServer(BaseServer): def __init__(self): @@ -106,7 +151,7 @@ class TmtcServer(BaseServer): def handle_read_bytestream(self, analysis_deque: deque): parsed_packets = parse_space_packets(analysis_deque, [EXP_PACKET_ID_TC]) for packet in parsed_packets: - _LOGGER.info(f"{self.log_prefix} RX TC: [{packet.hex(sep=',')}]") + _LOGGER.info(f"{self.log_prefix} RX TM: [{packet.hex(sep=',')}]") TC_QUEUE.put(packet) def send_data_to_client(self, conn_socket: socket.socket): @@ -115,6 +160,9 @@ class TmtcServer(BaseServer): _LOGGER.info(f"{self.log_prefix} TX TM [{next_packet.hex(sep=',')}]") conn_socket.sendall(next_packet) + def send_data_available(self) -> bool: + return not TM_QUEUE.empty() + if __name__ == "__main__": main() diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 479f285..94ea272 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -1,8 +1,10 @@ -use std::io::{self, Read, Write}; +use socket2::{Domain, Socket, Type}; +use std::io::{self, Read}; +use std::net::TcpStream as StdTcpStream; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::mpsc; -use mio::net::TcpStream; +use mio::net::TcpStream as MioTcpStream; use mio::{Events, Interest, Poll, Token}; use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use ops_sat_rs::config::{SPP_CLIENT_WIRETAPPING_RX, SPP_CLIENT_WIRETAPPING_TX}; @@ -23,6 +25,64 @@ pub enum ClientError { Io(#[from] io::Error), } +#[allow(dead_code)] +pub struct TcpSppClientCommon { + id: ComponentId, + read_buf: [u8; 4096], + tm_tcp_client_rx: mpsc::Receiver, + server_addr: SocketAddr, + tc_source_tx: mpsc::Sender, + validator: SimpleSpValidator, +} + +#[allow(dead_code)] +impl TcpSppClientCommon { + pub fn handle_read_bytstream(&mut self, read_bytes: usize) -> Result<(), ClientError> { + let mut dummy = 0; + if SPP_CLIENT_WIRETAPPING_RX { + log::debug!( + "SPP TCP RX {} bytes: {:x?}", + read_bytes, + &self.read_buf[..read_bytes] + ); + } + // This parser is able to deal with broken tail packets, but we ignore those for now.. + parse_buffer_for_ccsds_space_packets( + &mut self.read_buf[..read_bytes], + &self.validator, + self.id, + &self.tc_source_tx, + &mut dummy, + )?; + Ok(()) + } + + pub fn write_to_server(&mut self, client: &mut impl io::Write) -> io::Result<()> { + loop { + match self.tm_tcp_client_rx.try_recv() { + Ok(tm) => { + if SPP_CLIENT_WIRETAPPING_TX { + log::debug!( + "SPP TCP TX {}: {:x?}", + tm.packet.len(), + tm.packet.as_slice() + ); + } + client.write_all(&tm.packet)?; + } + Err(e) => match e { + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + log::error!("TM sender to TCP client has disconnected"); + break; + } + }, + } + } + Ok(()) + } +} + #[derive(Debug, PartialEq, Eq)] pub enum ConnectionStatus { Unknown, @@ -31,21 +91,18 @@ pub enum ConnectionStatus { TryingReconnect, } -pub struct TcpSppClient { - id: ComponentId, +#[allow(dead_code)] +pub struct TcpSppClientMio { + common: TcpSppClientCommon, poll: Poll, events: Events, // Optional to allow periodic reconnection attempts on the TCP server. - client: Option, - read_buf: [u8; 4096], - tm_tcp_client_rx: mpsc::Receiver, - server_addr: SocketAddr, + client: Option, connection: ConnectionStatus, - tc_source_tx: mpsc::Sender, - validator: SimpleSpValidator, } -impl TcpSppClient { +#[allow(dead_code)] +impl TcpSppClientMio { pub fn new( id: ComponentId, tc_source_tx: mpsc::Sender, @@ -56,23 +113,25 @@ impl TcpSppClient { let poll = Poll::new()?; let events = Events::with_capacity(128); let mut client = Self { - id, + common: TcpSppClientCommon { + id, + read_buf: [0; 4096], + tm_tcp_client_rx, + server_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port), + tc_source_tx, + validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), + }, poll, events, client: None, connection: ConnectionStatus::Unknown, - read_buf: [0; 4096], - server_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port), - tm_tcp_client_rx, - tc_source_tx, - validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), }; client.connect()?; Ok(client) } pub fn connect(&mut self) -> io::Result<()> { - let mut client = TcpStream::connect(self.server_addr)?; + let mut client = MioTcpStream::connect(self.common.server_addr)?; self.poll.registry().register( &mut client, @@ -102,7 +161,7 @@ impl TcpSppClient { } if event.is_writable() { log::warn!("TCP client is writable"); - self.write_to_server()?; + self.common.write_to_server(self.client.as_mut().unwrap())?; } } } @@ -115,39 +174,19 @@ impl TcpSppClient { } pub fn read_from_server(&mut self) -> Result<(), ClientError> { - match self.client.as_mut().unwrap().read(&mut self.read_buf) { + match self + .client + .as_mut() + .unwrap() + .read(&mut self.common.read_buf) + { Ok(0) => (), - Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?, + Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?, Err(e) => return Err(e.into()), } Ok(()) } - pub fn write_to_server(&mut self) -> io::Result<()> { - loop { - match self.tm_tcp_client_rx.try_recv() { - Ok(tm) => { - if SPP_CLIENT_WIRETAPPING_TX { - log::debug!( - "SPP TCP TX {}: {:x?}", - tm.packet.len(), - tm.packet.as_slice() - ); - } - self.client.as_mut().unwrap().write_all(&tm.packet)?; - } - Err(e) => match e { - mpsc::TryRecvError::Empty => break, - mpsc::TryRecvError::Disconnected => { - log::error!("TM sender to TCP client has disconnected"); - break; - } - }, - } - } - Ok(()) - } - pub fn check_conn_status(&mut self) -> io::Result<()> { match self.client.as_mut().unwrap().peer_addr() { Ok(_) => { @@ -168,24 +207,74 @@ impl TcpSppClient { } } } +} - pub fn handle_read_bytstream(&mut self, read_bytes: usize) -> Result<(), ClientError> { - let mut dummy = 0; - if SPP_CLIENT_WIRETAPPING_RX { - log::debug!( - "SPP TCP RX {} bytes: {:x?}", - read_bytes, - &self.read_buf[..read_bytes] - ); +pub struct TcpSppClientStd { + common: TcpSppClientCommon, + // Optional to allow periodic reconnection attempts on the TCP server. + client: Option, +} + +impl TcpSppClientStd { + pub fn new( + id: ComponentId, + tc_source_tx: mpsc::Sender, + tm_tcp_client_rx: mpsc::Receiver, + valid_ids: &'static [PacketId], + port: u16, + ) -> io::Result { + let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); + // Create a TCP listener bound to two addresses. + let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; + socket.set_reuse_address(true)?; + socket.set_nonblocking(true)?; + socket.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?; + let client = match socket.connect(&server_addr.into()) { + Ok(_) => { + let client: StdTcpStream = socket.into(); + Some(client) + } + Err(e) => { + log::error!("error connecting to server: {}", e); + None + } + }; + + // let tcp_stream = StdTcpStream::connect_timeout(addr, timeout) + Ok(Self { + common: TcpSppClientCommon { + id, + read_buf: [0; 4096], + tm_tcp_client_rx, + server_addr, + tc_source_tx, + validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), + }, + client, + }) + } + + pub fn operation(&mut self) -> Result<(), ClientError> { + if let Some(client) = &mut self.client { + log::info!("client is valid"); + match client.read(&mut self.common.read_buf) { + Ok(0) => println!("read 0 bytes"), + Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?, + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut + { + log::info!("timeout shit"); + self.common.write_to_server(client)?; + return Ok(()); + } + return Err(e.into()); + } + } + } else { + // TODO: Reconnect attempt occasionally. + std::thread::sleep(STOP_CHECK_FREQUENCY); } - // This parser is able to deal with broken tail packets, but we ignore those for now.. - parse_buffer_for_ccsds_space_packets( - &mut self.read_buf[..read_bytes], - &self.validator, - self.id, - &self.tc_source_tx, - &mut dummy, - )?; + Ok(()) } } diff --git a/src/main.rs b/src/main.rs index 85870d0..370a5df 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,6 @@ use ops_sat_rs::config::{ use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}; -use crate::tmtc::tc_source::TcSourceTaskDynamic; use crate::tmtc::tm_sink::TmFunnelDynamic; use crate::{controller::ExperimentController, pus::test::create_test_service}; use crate::{ @@ -24,9 +23,10 @@ use crate::{ logger::setup_logger, }; use crate::{ - interface::tcp_spp_client::TcpSppClient, + interface::tcp_spp_client::TcpSppClientMio, pus::{PusTcDistributor, PusTcMpscRouter}, }; +use crate::{interface::tcp_spp_client::TcpSppClientStd, tmtc::tc_source::TcSourceTaskDynamic}; use crate::{ pus::{action::create_action_service, stack::PusStack}, requests::GenericRequestRouter, @@ -168,7 +168,7 @@ fn main() { stop_signal.clone(), ); - let mut tcp_spp_client = TcpSppClient::new( + let mut tcp_spp_client = TcpSppClientStd::new( TCP_SPP_CLIENT.id(), tc_source_tx, tm_tcp_client_rx,