From 536051e05bd0632b423eec71f666b965efd5809b Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 22 Apr 2024 20:29:14 +0200 Subject: [PATCH] improvements and fixes --- satrs-example/satrs-tmtc/main.py | 2 ++ satrs-example/src/interface/tcp.rs | 5 ++++- satrs-example/src/interface/udp.rs | 7 +++--- satrs/src/hal/std/tcp_server.rs | 35 +++++++++++++++++++++--------- 4 files changed, 35 insertions(+), 14 deletions(-) diff --git a/satrs-example/satrs-tmtc/main.py b/satrs-example/satrs-tmtc/main.py index a3e0caf..c44d891 100755 --- a/satrs-example/satrs-tmtc/main.py +++ b/satrs-example/satrs-tmtc/main.py @@ -256,6 +256,7 @@ def main(): while True: state = tmtc_backend.periodic_op(None) if state.request == BackendRequest.TERMINATION_NO_ERROR: + tmtc_backend.close_com_if() sys.exit(0) elif state.request == BackendRequest.DELAY_IDLE: _LOGGER.info("TMTC Client in IDLE mode") @@ -270,6 +271,7 @@ def main(): elif state.request == BackendRequest.CALL_NEXT: pass except KeyboardInterrupt: + tmtc_backend.close_com_if() sys.exit(0) diff --git a/satrs-example/src/interface/tcp.rs b/satrs-example/src/interface/tcp.rs index cc3f669..021ad31 100644 --- a/satrs-example/src/interface/tcp.rs +++ b/satrs-example/src/interface/tcp.rs @@ -1,3 +1,4 @@ +use std::time::Duration; use std::{ collections::{HashSet, VecDeque}, fmt::Debug, @@ -139,7 +140,9 @@ impl, SendError: Debug + 'static> pub fn periodic_operation(&mut self) { loop { - let result = self.0.handle_all_connections(None); + let result = self + .0 + .handle_all_connections(Some(Duration::from_millis(400))); match result { Ok(_conn_result) => (), Err(e) => { diff --git a/satrs-example/src/interface/udp.rs b/satrs-example/src/interface/udp.rs index cae1c8c..d7816e2 100644 --- a/satrs-example/src/interface/udp.rs +++ b/satrs-example/src/interface/udp.rs @@ -114,6 +114,7 @@ impl< #[cfg(test)] mod tests { + use std::net::Ipv4Addr; use std::{ cell::RefCell, collections::VecDeque, @@ -182,7 +183,7 @@ mod tests { #[test] fn test_transactions() { - let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); + let sock_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0); let test_receiver = TestSender::default(); // let tc_queue = test_receiver.tc_vec.clone(); let udp_tc_server = @@ -200,8 +201,8 @@ mod tests { .unwrap(); let client = UdpSocket::bind("127.0.0.1:0").expect("Connecting to UDP server failed"); let client_addr = client.local_addr().unwrap(); - client.connect(server_addr).unwrap(); - client.send(&ping_tc).unwrap(); + println!("{}", server_addr); + client.send_to(&ping_tc, server_addr).unwrap(); udp_dyn_server.periodic_operation(); { let mut queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow_mut(); diff --git a/satrs/src/hal/std/tcp_server.rs b/satrs/src/hal/std/tcp_server.rs index 6d7120c..983702a 100644 --- a/satrs/src/hal/std/tcp_server.rs +++ b/satrs/src/hal/std/tcp_server.rs @@ -9,8 +9,6 @@ use mio::{Events, Interest, Poll, Token}; use socket2::{Domain, Socket, Type}; use std::io::{self, Read}; use std::net::SocketAddr; -// use std::net::TcpListener; -// use std::net::{SocketAddr, TcpStream}; use std::thread; use crate::tmtc::{PacketSenderRaw, PacketSource}; @@ -244,13 +242,16 @@ impl< // Create a poll instance. let poll = Poll::new()?; // Create storage for events. - let events = Events::with_capacity(10); + let events = Events::with_capacity(32); let listener: std::net::TcpListener = socket.into(); let mut mio_listener = TcpListener::from_std(listener); // Start listening for incoming connections. - poll.registry() - .register(&mut mio_listener, Token(0), Interest::READABLE)?; + poll.registry().register( + &mut mio_listener, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )?; Ok(Self { id: cfg.id, @@ -280,11 +281,11 @@ impl< self.listener.local_addr() } - /// This call is used to handle the next connection to a client. Right now, it performs + /// This call is used to handle all connection from clients. Right now, it performs /// the following steps: /// - /// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API - /// until a client connects. + /// 1. It calls the [std::net::TcpListener::accept] method until a client connects. An optional + /// timeout can be specified for non-blocking acceptance. /// 2. It reads all the telecommands from the client and parses all received data using the /// user specified [TcpTcParser]. /// 3. After reading and parsing all telecommands, it sends back all telemetry using the @@ -317,11 +318,17 @@ impl< loop { match self.listener.accept() { Ok((stream, addr)) => { - self.handle_accepted_connection(stream, addr)?; + if let Err(e) = self.handle_accepted_connection(stream, addr) { + self.reregister_poll_interest()?; + return Err(e); + } handled_connections += 1; } Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break, - Err(err) => return Err(TcpTmtcError::Io(err)), + Err(err) => { + self.reregister_poll_interest()?; + return Err(TcpTmtcError::Io(err)); + } } } } @@ -331,6 +338,14 @@ impl< Ok(ConnectionResult::AcceptTimeout) } + fn reregister_poll_interest(&mut self) -> io::Result<()> { + self.poll.registry().reregister( + &mut self.listener, + Token(0), + Interest::READABLE | Interest::WRITABLE, + ) + } + fn handle_accepted_connection( &mut self, mut stream: TcpStream,