use std::io::{self, Read, Write}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::mpsc; use mio::net::TcpStream; use mio::{Events, Interest, Poll, Token}; use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use ops_sat_rs::config::{SPP_CLIENT_WIRETAPPING_RX, SPP_CLIENT_WIRETAPPING_TX}; use satrs::encoding::ccsds::parse_buffer_for_ccsds_space_packets; use satrs::queue::GenericSendError; use satrs::spacepackets::PacketId; use satrs::tmtc::PacketAsVec; use satrs::ComponentId; use thiserror::Error; use super::{SimpleSpValidator, TcpComponent}; #[derive(Debug, Error)] pub enum ClientError { #[error("send error: {0}")] Send(#[from] GenericSendError), #[error("io error: {0}")] Io(#[from] io::Error), } #[derive(Debug, PartialEq, Eq)] pub enum ConnectionStatus { Unknown, Connected, LostConnection, TryingReconnect, } pub struct TcpSppClient { id: ComponentId, poll: Poll, events: Events, // Optional to allow periodic reconnection attempts on the TCP server. client: Option<TcpStream>, read_buf: [u8; 4096], tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>, server_addr: SocketAddr, connection: ConnectionStatus, tc_source_tx: mpsc::Sender<PacketAsVec>, validator: SimpleSpValidator, } impl TcpSppClient { pub fn new( id: ComponentId, tc_source_tx: mpsc::Sender<PacketAsVec>, tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>, valid_ids: &'static [PacketId], port: u16, ) -> io::Result<Self> { let poll = Poll::new()?; let events = Events::with_capacity(128); let mut client = Self { id, poll, events, client: None, connection: ConnectionStatus::Unknown, read_buf: [0; 4096], server_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port), tm_tcp_client_rx, tc_source_tx, validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), }; client.connect()?; Ok(client) } pub fn connect(&mut self) -> io::Result<()> { let mut client = TcpStream::connect(self.server_addr)?; self.poll.registry().register( &mut client, Token(0), Interest::READABLE | Interest::WRITABLE, )?; self.client = Some(client); self.connection = ConnectionStatus::TryingReconnect; Ok(()) } pub fn operation(&mut self) -> Result<(), ClientError> { match self.connection { ConnectionStatus::TryingReconnect | ConnectionStatus::Unknown => { self.check_conn_status()? } ConnectionStatus::Connected => { self.check_conn_status()?; self.poll .poll(&mut self.events, Some(STOP_CHECK_FREQUENCY))?; let events: Vec<mio::event::Event> = self.events.iter().cloned().collect(); for event in events { if event.token() == Token(0) { if event.is_readable() { log::warn!("TCP client is readable"); self.read_from_server()?; } if event.is_writable() { log::warn!("TCP client is writable"); self.write_to_server()?; } } } return Ok(()); } ConnectionStatus::LostConnection => self.connect()?, }; std::thread::sleep(STOP_CHECK_FREQUENCY); Ok(()) } pub fn read_from_server(&mut self) -> Result<(), ClientError> { match self.client.as_mut().unwrap().read(&mut self.read_buf) { Ok(0) => (), Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?, Err(e) => return Err(e.into()), } Ok(()) } pub fn write_to_server(&mut self) -> io::Result<()> { loop { match self.tm_tcp_client_rx.try_recv() { Ok(tm) => { if SPP_CLIENT_WIRETAPPING_TX { log::debug!( "SPP TCP TX {}: {:x?}", tm.packet.len(), tm.packet.as_slice() ); } self.client.as_mut().unwrap().write_all(&tm.packet)?; } Err(e) => match e { mpsc::TryRecvError::Empty => break, mpsc::TryRecvError::Disconnected => { log::error!("TM sender to TCP client has disconnected"); break; } }, } } Ok(()) } pub fn check_conn_status(&mut self) -> io::Result<()> { match self.client.as_mut().unwrap().peer_addr() { Ok(_) => { if self.connection == ConnectionStatus::Unknown || self.connection == ConnectionStatus::TryingReconnect { self.connection = ConnectionStatus::Connected; } Ok(()) } Err(e) => { if e.kind() == io::ErrorKind::NotConnected { log::warn!("lost connection, or do not have one"); self.connection = ConnectionStatus::LostConnection; return Ok(()); } Err(e) } } } pub fn handle_read_bytstream(&mut self, read_bytes: usize) -> Result<(), ClientError> { let mut dummy = 0; if SPP_CLIENT_WIRETAPPING_RX { log::debug!( "SPP TCP RX {} bytes: {:x?}", read_bytes, &self.read_buf[..read_bytes] ); } // This parser is able to deal with broken tail packets, but we ignore those for now.. parse_buffer_for_ccsds_space_packets( &mut self.read_buf[..read_bytes], &self.validator, self.id, &self.tc_source_tx, &mut dummy, )?; Ok(()) } }