diff --git a/Cargo.lock b/Cargo.lock index 232bf65..f6c288d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -485,7 +485,6 @@ dependencies = [ "humantime", "lazy_static", "log", - "mio", "num_enum", "satrs", "satrs-mib", diff --git a/Cargo.toml b/Cargo.toml index 00db36a..2fcceb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,9 @@ strum = { version = "0.26", features = ["derive"] } thiserror = "1" derive-new = "0.6" num_enum = "0.7" -mio = "0.8" + +# socket2 = "0.5" +# mio = "0.8" [dependencies.satrs] version = "0.2.0-rc.3" diff --git a/pytmtc/server-test.py b/pytmtc/server-test.py new file mode 100755 index 0000000..5761687 --- /dev/null +++ b/pytmtc/server-test.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +import socket +from spacepackets.ecss.tc import PusTelecommand + + +EXP_ID = 278 +APID = 1024 + EXP_ID +SEND_PING_ONCE = True + + +def main(): + global SEND_PING_ONCE + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_addr = ("localhost", 4096) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind(server_addr) + while True: + server_socket.listen() + (conn_socket, conn_addr) = server_socket.accept() + print("TCP client {} connected", conn_addr) + while True: + bytes_recvd = conn_socket.recv(4096) + if len(bytes_recvd) > 0: + print(f"Received bytes from TCP client: {bytes_recvd.decode()}") + elif len(bytes_recvd) == 0: + break + else: + print("error receiving data from TCP client") + if SEND_PING_ONCE: + print("sending back ping") + ping_tc = PusTelecommand(service=17, subservice=1, seq_count=0, apid=APID) + conn_socket.sendall(ping_tc.pack()) + SEND_PING_ONCE = False + conn_socket.close() + continue + # server_socket.close() + + +if __name__ == "__main__": + main() diff --git a/src/config.rs b/src/config.rs index 2b1a0cf..b33b8f3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -215,11 +215,14 @@ pub mod components { } pub mod tasks { + use std::time::Duration; + pub const FREQ_MS_UDP_TMTC: u64 = 200; pub const FREQ_MS_EVENT_HANDLING: u64 = 400; pub const FREQ_MS_AOCS: u64 = 500; pub const FREQ_MS_PUS_STACK: u64 = 200; pub const FREQ_MS_CTRL: u64 = 400; - pub const STOP_CHECK_FREQUENCY: u64 = 400; + pub const STOP_CHECK_FREQUENCY_MS: u64 = 400; + pub const STOP_CHECK_FREQUENCY: Duration = Duration::from_millis(STOP_CHECK_FREQUENCY_MS); } diff --git a/src/interface/tcp_server.rs b/src/interface/tcp_server.rs index c3275c6..23c773d 100644 --- a/src/interface/tcp_server.rs +++ b/src/interface/tcp_server.rs @@ -1,7 +1,6 @@ use std::{ collections::VecDeque, sync::{atomic::AtomicBool, mpsc, Arc, Mutex}, - time::Duration, }; use log::{info, warn}; @@ -113,9 +112,7 @@ impl TcpTask { } pub fn periodic_operation(&mut self) { - let result = self - .0 - .handle_all_connections(Some(Duration::from_millis(STOP_CHECK_FREQUENCY))); + let result = self.0.handle_all_connections(Some(STOP_CHECK_FREQUENCY)); match result { Ok(_conn_result) => (), Err(e) => { diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index b926a05..3b55cbb 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -3,21 +3,25 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::mpsc; use std::time::Duration; -use mio::net::TcpStream; -use mio::{Events, Interest, Poll, Token}; -use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use ops_sat_rs::config::SPP_CLIENT_WIRETAPPING_RX; use satrs::encoding::ccsds::parse_buffer_for_ccsds_space_packets; use satrs::queue::GenericSendError; use satrs::spacepackets::PacketId; use satrs::tmtc::PacketAsVec; use satrs::ComponentId; +use std::net::TcpStream; use thiserror::Error; use super::{SimpleSpValidator, TcpComponent}; +#[derive(Debug)] +pub enum ConnectionResult { + Connected(TcpStream), + Timeout, +} + #[derive(Debug, Error)] -pub enum PacketForwardingError { +pub enum ClientError { #[error("send error: {0}")] Send(#[from] GenericSendError), #[error("io error: {0}")] @@ -26,10 +30,6 @@ pub enum PacketForwardingError { pub struct TcpSppClient { id: ComponentId, - poll: Poll, - events: Events, - // Optional to allow periodic reconnection attempts on the TCP server. - client: Option, read_buf: [u8; 4096], tm_tcp_client_rx: mpsc::Receiver, server_addr: SocketAddr, @@ -45,33 +45,9 @@ impl TcpSppClient { valid_ids: &'static [PacketId], port: u16, ) -> io::Result { - let mut poll = Poll::new()?; - let events = Events::with_capacity(128); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); - let client = Self::attempt_connection(&mut poll, &server_addr); - if client.is_err() { - log::warn!( - "connection to TCP server {} failed: {}", - server_addr, - client.unwrap_err() - ); - return Ok(Self { - id, - poll, - events, - client: None, - read_buf: [0; 4096], - server_addr, - tm_tcp_client_rx, - tc_source_tx, - validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), - }); - } Ok(Self { id, - poll, - events, - client: Some(client.unwrap()), read_buf: [0; 4096], server_addr, tm_tcp_client_rx, @@ -80,62 +56,45 @@ impl TcpSppClient { }) } - pub fn attempt_connection(poll: &mut Poll, server_addr: &SocketAddr) -> io::Result { - let mut client = TcpStream::connect(*server_addr)?; - poll.registry().register( - &mut client, - Token(0), - Interest::READABLE | Interest::WRITABLE, - )?; - Ok(client) - } - - pub fn periodic_operation(&mut self) -> Result<(), PacketForwardingError> { - if self.client.is_some() { - return self.perform_regular_operation(); - } else { - let client_result = Self::attempt_connection(&mut self.poll, &self.server_addr); - match client_result { - Ok(client) => { - self.client = Some(client); - self.perform_regular_operation()?; - } - Err(ref e) => { - log::warn!( - "connection to TCP server {} failed: {}", - self.server_addr, - e - ); + pub fn attempt_connection( + &mut self, + timeout: Duration, + ) -> Result { + match TcpStream::connect_timeout(&self.server_addr, timeout) { + Ok(client) => Ok(ConnectionResult::Connected(client)), + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut { + return Ok(ConnectionResult::Timeout); } + Err(e.into()) } } + } + + pub fn operation(&mut self, timeout: Duration) { + match self.attempt_connection(timeout) { + Ok(result) => { + if let ConnectionResult::Connected(mut client) = result { + if let Err(e) = self.handle_client_operation(&mut client) { + log::error!("error handling TCP client operation: {}", e) + } + drop(client); + std::thread::sleep(timeout); + } + } + Err(e) => { + log::error!("TCP client error: {}", e); + } + } + } + + pub fn handle_client_operation(&mut self, client: &mut TcpStream) -> Result<(), ClientError> { + self.read_from_server(client)?; + self.write_to_server(client)?; Ok(()) } - pub fn perform_regular_operation(&mut self) -> Result<(), PacketForwardingError> { - self.poll.poll( - &mut self.events, - Some(Duration::from_millis(STOP_CHECK_FREQUENCY)), - )?; - let events: Vec = self.events.iter().cloned().collect(); - for event in events { - if event.token() == Token(0) { - if event.is_readable() { - self.read_from_server()?; - } - if event.is_writable() { - self.write_to_server()?; - } - } - } - Ok(()) - } - - pub fn read_from_server(&mut self) -> Result<(), PacketForwardingError> { - let client = self - .client - .as_mut() - .expect("TCP stream invalid when it should not be"); + pub fn read_from_server(&mut self, client: &mut TcpStream) -> Result<(), ClientError> { match client.read(&mut self.read_buf) { Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe).into()), Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?, @@ -144,11 +103,7 @@ impl TcpSppClient { Ok(()) } - pub fn write_to_server(&mut self) -> io::Result<()> { - let client = self - .client - .as_mut() - .expect("TCP stream invalid when it should not be"); + pub fn write_to_server(&mut self, client: &mut TcpStream) -> io::Result<()> { loop { match self.tm_tcp_client_rx.try_recv() { Ok(tm) => { @@ -166,10 +121,7 @@ impl TcpSppClient { Ok(()) } - pub fn handle_read_bytstream( - &mut self, - read_bytes: usize, - ) -> Result<(), PacketForwardingError> { + pub fn handle_read_bytstream(&mut self, read_bytes: usize) -> Result<(), ClientError> { let mut dummy = 0; if SPP_CLIENT_WIRETAPPING_RX { log::debug!( diff --git a/src/main.rs b/src/main.rs index 69b6e12..d6a65c1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ use log::info; use ops_sat_rs::config::{ cfg_file::create_app_config, components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, - tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, + tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY}, VALID_PACKET_ID_LIST, }; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; @@ -232,7 +232,7 @@ fn main() { .spawn(move || { info!("Running TCP SPP client"); loop { - let _result = tcp_spp_client.periodic_operation(); + tcp_spp_client.operation(STOP_CHECK_FREQUENCY); if tcp_client_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } diff --git a/src/tmtc/tm_sink.rs b/src/tmtc/tm_sink.rs index fdb8043..e2e0127 100644 --- a/src/tmtc/tm_sink.rs +++ b/src/tmtc/tm_sink.rs @@ -1,6 +1,6 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; -use std::{collections::HashMap, sync::mpsc, time::Duration}; +use std::{collections::HashMap, sync::mpsc}; use log::info; use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; @@ -101,10 +101,7 @@ impl TmFunnelDynamic { pub fn operation(&mut self) { loop { - match self - .tm_funnel_rx - .recv_timeout(Duration::from_millis(STOP_CHECK_FREQUENCY)) - { + match self.tm_funnel_rx.recv_timeout(STOP_CHECK_FREQUENCY) { Ok(mut tm) => { // Read the TM, set sequence counter and message counter, and finally update // the CRC.