TCP server improvements #169

Merged
muellerr merged 2 commits from tcp-ip-improvements into main 2024-04-23 13:21:42 +02:00
5 changed files with 38 additions and 14 deletions

View File

@ -256,6 +256,7 @@ def main():
while True: while True:
state = tmtc_backend.periodic_op(None) state = tmtc_backend.periodic_op(None)
if state.request == BackendRequest.TERMINATION_NO_ERROR: if state.request == BackendRequest.TERMINATION_NO_ERROR:
tmtc_backend.close_com_if()
sys.exit(0) sys.exit(0)
elif state.request == BackendRequest.DELAY_IDLE: elif state.request == BackendRequest.DELAY_IDLE:
_LOGGER.info("TMTC Client in IDLE mode") _LOGGER.info("TMTC Client in IDLE mode")
@ -270,6 +271,7 @@ def main():
elif state.request == BackendRequest.CALL_NEXT: elif state.request == BackendRequest.CALL_NEXT:
pass pass
except KeyboardInterrupt: except KeyboardInterrupt:
tmtc_backend.close_com_if()
sys.exit(0) sys.exit(0)

View File

@ -1,3 +1,4 @@
use std::time::Duration;
use std::{ use std::{
collections::{HashSet, VecDeque}, collections::{HashSet, VecDeque},
fmt::Debug, fmt::Debug,
@ -139,7 +140,9 @@ impl<TcSender: PacketSenderRaw<Error = SendError>, SendError: Debug + 'static>
pub fn periodic_operation(&mut self) { pub fn periodic_operation(&mut self) {
loop { loop {
let result = self.0.handle_all_connections(None); let result = self
.0
.handle_all_connections(Some(Duration::from_millis(400)));
match result { match result {
Ok(_conn_result) => (), Ok(_conn_result) => (),
Err(e) => { Err(e) => {

View File

@ -114,6 +114,7 @@ impl<
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::net::Ipv4Addr;
use std::{ use std::{
cell::RefCell, cell::RefCell,
collections::VecDeque, collections::VecDeque,
@ -182,7 +183,7 @@ mod tests {
#[test] #[test]
fn test_transactions() { fn test_transactions() {
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); let sock_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
let test_receiver = TestSender::default(); let test_receiver = TestSender::default();
// let tc_queue = test_receiver.tc_vec.clone(); // let tc_queue = test_receiver.tc_vec.clone();
let udp_tc_server = let udp_tc_server =
@ -200,8 +201,8 @@ mod tests {
.unwrap(); .unwrap();
let client = UdpSocket::bind("127.0.0.1:0").expect("Connecting to UDP server failed"); let client = UdpSocket::bind("127.0.0.1:0").expect("Connecting to UDP server failed");
let client_addr = client.local_addr().unwrap(); let client_addr = client.local_addr().unwrap();
client.connect(server_addr).unwrap(); println!("{}", server_addr);
client.send(&ping_tc).unwrap(); client.send_to(&ping_tc, server_addr).unwrap();
udp_dyn_server.periodic_operation(); udp_dyn_server.periodic_operation();
{ {
let mut queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow_mut(); let mut queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow_mut();

View File

@ -21,6 +21,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- `parse_for_ccsds_space_packets` did not detect CCSDS space packets at the buffer end with the - `parse_for_ccsds_space_packets` did not detect CCSDS space packets at the buffer end with the
smallest possible size of 7 bytes. smallest possible size of 7 bytes.
- TCP server component now re-registers the internal `mio::Poll` object if the client reset
the connection unexpectedly. Not doing so prevented the server from functioning properly
after a re-connect.
# [v0.2.0-rc.3] 2024-04-17 # [v0.2.0-rc.3] 2024-04-17

View File

@ -9,8 +9,6 @@ use mio::{Events, Interest, Poll, Token};
use socket2::{Domain, Socket, Type}; use socket2::{Domain, Socket, Type};
use std::io::{self, Read}; use std::io::{self, Read};
use std::net::SocketAddr; use std::net::SocketAddr;
// use std::net::TcpListener;
// use std::net::{SocketAddr, TcpStream};
use std::thread; use std::thread;
use crate::tmtc::{PacketSenderRaw, PacketSource}; use crate::tmtc::{PacketSenderRaw, PacketSource};
@ -244,13 +242,16 @@ impl<
// Create a poll instance. // Create a poll instance.
let poll = Poll::new()?; let poll = Poll::new()?;
// Create storage for events. // Create storage for events.
let events = Events::with_capacity(10); let events = Events::with_capacity(32);
let listener: std::net::TcpListener = socket.into(); let listener: std::net::TcpListener = socket.into();
let mut mio_listener = TcpListener::from_std(listener); let mut mio_listener = TcpListener::from_std(listener);
// Start listening for incoming connections. // Start listening for incoming connections.
poll.registry() poll.registry().register(
.register(&mut mio_listener, Token(0), Interest::READABLE)?; &mut mio_listener,
Token(0),
Interest::READABLE | Interest::WRITABLE,
)?;
Ok(Self { Ok(Self {
id: cfg.id, id: cfg.id,
@ -280,11 +281,11 @@ impl<
self.listener.local_addr() self.listener.local_addr()
} }
/// This call is used to handle the next connection to a client. Right now, it performs /// This call is used to handle all connection from clients. Right now, it performs
/// the following steps: /// the following steps:
/// ///
/// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API /// 1. It calls the [std::net::TcpListener::accept] method until a client connects. An optional
/// until a client connects. /// timeout can be specified for non-blocking acceptance.
/// 2. It reads all the telecommands from the client and parses all received data using the /// 2. It reads all the telecommands from the client and parses all received data using the
/// user specified [TcpTcParser]. /// user specified [TcpTcParser].
/// 3. After reading and parsing all telecommands, it sends back all telemetry using the /// 3. After reading and parsing all telecommands, it sends back all telemetry using the
@ -317,11 +318,17 @@ impl<
loop { loop {
match self.listener.accept() { match self.listener.accept() {
Ok((stream, addr)) => { Ok((stream, addr)) => {
self.handle_accepted_connection(stream, addr)?; if let Err(e) = self.handle_accepted_connection(stream, addr) {
self.reregister_poll_interest()?;
return Err(e);
}
handled_connections += 1; handled_connections += 1;
} }
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break, Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break,
Err(err) => return Err(TcpTmtcError::Io(err)), Err(err) => {
self.reregister_poll_interest()?;
return Err(TcpTmtcError::Io(err));
}
} }
} }
} }
@ -331,6 +338,14 @@ impl<
Ok(ConnectionResult::AcceptTimeout) Ok(ConnectionResult::AcceptTimeout)
} }
fn reregister_poll_interest(&mut self) -> io::Result<()> {
self.poll.registry().reregister(
&mut self.listener,
Token(0),
Interest::READABLE | Interest::WRITABLE,
)
}
fn handle_accepted_connection( fn handle_accepted_connection(
&mut self, &mut self,
mut stream: TcpStream, mut stream: TcpStream,