TCP server improvements #169

Merged
muellerr merged 2 commits from tcp-ip-improvements into main 2024-04-23 13:21:42 +02:00
5 changed files with 38 additions and 14 deletions

View File

@ -256,6 +256,7 @@ def main():
while True:
state = tmtc_backend.periodic_op(None)
if state.request == BackendRequest.TERMINATION_NO_ERROR:
tmtc_backend.close_com_if()
sys.exit(0)
elif state.request == BackendRequest.DELAY_IDLE:
_LOGGER.info("TMTC Client in IDLE mode")
@ -270,6 +271,7 @@ def main():
elif state.request == BackendRequest.CALL_NEXT:
pass
except KeyboardInterrupt:
tmtc_backend.close_com_if()
sys.exit(0)

View File

@ -1,3 +1,4 @@
use std::time::Duration;
use std::{
collections::{HashSet, VecDeque},
fmt::Debug,
@ -139,7 +140,9 @@ impl<TcSender: PacketSenderRaw<Error = SendError>, SendError: Debug + 'static>
pub fn periodic_operation(&mut self) {
loop {
let result = self.0.handle_all_connections(None);
let result = self
.0
.handle_all_connections(Some(Duration::from_millis(400)));
match result {
Ok(_conn_result) => (),
Err(e) => {

View File

@ -114,6 +114,7 @@ impl<
#[cfg(test)]
mod tests {
use std::net::Ipv4Addr;
use std::{
cell::RefCell,
collections::VecDeque,
@ -182,7 +183,7 @@ mod tests {
#[test]
fn test_transactions() {
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
let sock_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
let test_receiver = TestSender::default();
// let tc_queue = test_receiver.tc_vec.clone();
let udp_tc_server =
@ -200,8 +201,8 @@ mod tests {
.unwrap();
let client = UdpSocket::bind("127.0.0.1:0").expect("Connecting to UDP server failed");
let client_addr = client.local_addr().unwrap();
client.connect(server_addr).unwrap();
client.send(&ping_tc).unwrap();
println!("{}", server_addr);
client.send_to(&ping_tc, server_addr).unwrap();
udp_dyn_server.periodic_operation();
{
let mut queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow_mut();

View File

@ -21,6 +21,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- `parse_for_ccsds_space_packets` did not detect CCSDS space packets at the buffer end with the
smallest possible size of 7 bytes.
- TCP server component now re-registers the internal `mio::Poll` object if the client reset
the connection unexpectedly. Not doing so prevented the server from functioning properly
after a re-connect.
# [v0.2.0-rc.3] 2024-04-17

View File

@ -9,8 +9,6 @@ use mio::{Events, Interest, Poll, Token};
use socket2::{Domain, Socket, Type};
use std::io::{self, Read};
use std::net::SocketAddr;
// use std::net::TcpListener;
// use std::net::{SocketAddr, TcpStream};
use std::thread;
use crate::tmtc::{PacketSenderRaw, PacketSource};
@ -244,13 +242,16 @@ impl<
// Create a poll instance.
let poll = Poll::new()?;
// Create storage for events.
let events = Events::with_capacity(10);
let events = Events::with_capacity(32);
let listener: std::net::TcpListener = socket.into();
let mut mio_listener = TcpListener::from_std(listener);
// Start listening for incoming connections.
poll.registry()
.register(&mut mio_listener, Token(0), Interest::READABLE)?;
poll.registry().register(
&mut mio_listener,
Token(0),
Interest::READABLE | Interest::WRITABLE,
)?;
Ok(Self {
id: cfg.id,
@ -280,11 +281,11 @@ impl<
self.listener.local_addr()
}
/// This call is used to handle the next connection to a client. Right now, it performs
/// This call is used to handle all connection from clients. Right now, it performs
/// the following steps:
///
/// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API
/// until a client connects.
/// 1. It calls the [std::net::TcpListener::accept] method until a client connects. An optional
/// timeout can be specified for non-blocking acceptance.
/// 2. It reads all the telecommands from the client and parses all received data using the
/// user specified [TcpTcParser].
/// 3. After reading and parsing all telecommands, it sends back all telemetry using the
@ -317,11 +318,17 @@ impl<
loop {
match self.listener.accept() {
Ok((stream, addr)) => {
self.handle_accepted_connection(stream, addr)?;
if let Err(e) = self.handle_accepted_connection(stream, addr) {
self.reregister_poll_interest()?;
return Err(e);
}
handled_connections += 1;
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break,
Err(err) => return Err(TcpTmtcError::Io(err)),
Err(err) => {
self.reregister_poll_interest()?;
return Err(TcpTmtcError::Io(err));
}
}
}
}
@ -331,6 +338,14 @@ impl<
Ok(ConnectionResult::AcceptTimeout)
}
fn reregister_poll_interest(&mut self) -> io::Result<()> {
self.poll.registry().reregister(
&mut self.listener,
Token(0),
Interest::READABLE | Interest::WRITABLE,
)
}
fn handle_accepted_connection(
&mut self,
mut stream: TcpStream,