TCP server improvements #169
@ -256,6 +256,7 @@ def main():
|
|||||||
while True:
|
while True:
|
||||||
state = tmtc_backend.periodic_op(None)
|
state = tmtc_backend.periodic_op(None)
|
||||||
if state.request == BackendRequest.TERMINATION_NO_ERROR:
|
if state.request == BackendRequest.TERMINATION_NO_ERROR:
|
||||||
|
tmtc_backend.close_com_if()
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
elif state.request == BackendRequest.DELAY_IDLE:
|
elif state.request == BackendRequest.DELAY_IDLE:
|
||||||
_LOGGER.info("TMTC Client in IDLE mode")
|
_LOGGER.info("TMTC Client in IDLE mode")
|
||||||
@ -270,6 +271,7 @@ def main():
|
|||||||
elif state.request == BackendRequest.CALL_NEXT:
|
elif state.request == BackendRequest.CALL_NEXT:
|
||||||
pass
|
pass
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
|
tmtc_backend.close_com_if()
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
use std::time::Duration;
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashSet, VecDeque},
|
collections::{HashSet, VecDeque},
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
@ -139,7 +140,9 @@ impl<TcSender: PacketSenderRaw<Error = SendError>, SendError: Debug + 'static>
|
|||||||
|
|
||||||
pub fn periodic_operation(&mut self) {
|
pub fn periodic_operation(&mut self) {
|
||||||
loop {
|
loop {
|
||||||
let result = self.0.handle_all_connections(None);
|
let result = self
|
||||||
|
.0
|
||||||
|
.handle_all_connections(Some(Duration::from_millis(400)));
|
||||||
match result {
|
match result {
|
||||||
Ok(_conn_result) => (),
|
Ok(_conn_result) => (),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -114,6 +114,7 @@ impl<
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::net::Ipv4Addr;
|
||||||
use std::{
|
use std::{
|
||||||
cell::RefCell,
|
cell::RefCell,
|
||||||
collections::VecDeque,
|
collections::VecDeque,
|
||||||
@ -182,7 +183,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_transactions() {
|
fn test_transactions() {
|
||||||
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
|
let sock_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
|
||||||
let test_receiver = TestSender::default();
|
let test_receiver = TestSender::default();
|
||||||
// let tc_queue = test_receiver.tc_vec.clone();
|
// let tc_queue = test_receiver.tc_vec.clone();
|
||||||
let udp_tc_server =
|
let udp_tc_server =
|
||||||
@ -200,8 +201,8 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
let client = UdpSocket::bind("127.0.0.1:0").expect("Connecting to UDP server failed");
|
let client = UdpSocket::bind("127.0.0.1:0").expect("Connecting to UDP server failed");
|
||||||
let client_addr = client.local_addr().unwrap();
|
let client_addr = client.local_addr().unwrap();
|
||||||
client.connect(server_addr).unwrap();
|
println!("{}", server_addr);
|
||||||
client.send(&ping_tc).unwrap();
|
client.send_to(&ping_tc, server_addr).unwrap();
|
||||||
udp_dyn_server.periodic_operation();
|
udp_dyn_server.periodic_operation();
|
||||||
{
|
{
|
||||||
let mut queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow_mut();
|
let mut queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow_mut();
|
||||||
|
@ -21,6 +21,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
|
|||||||
|
|
||||||
- `parse_for_ccsds_space_packets` did not detect CCSDS space packets at the buffer end with the
|
- `parse_for_ccsds_space_packets` did not detect CCSDS space packets at the buffer end with the
|
||||||
smallest possible size of 7 bytes.
|
smallest possible size of 7 bytes.
|
||||||
|
- TCP server component now re-registers the internal `mio::Poll` object if the client reset
|
||||||
|
the connection unexpectedly. Not doing so prevented the server from functioning properly
|
||||||
|
after a re-connect.
|
||||||
|
|
||||||
# [v0.2.0-rc.3] 2024-04-17
|
# [v0.2.0-rc.3] 2024-04-17
|
||||||
|
|
||||||
|
@ -9,8 +9,6 @@ use mio::{Events, Interest, Poll, Token};
|
|||||||
use socket2::{Domain, Socket, Type};
|
use socket2::{Domain, Socket, Type};
|
||||||
use std::io::{self, Read};
|
use std::io::{self, Read};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
// use std::net::TcpListener;
|
|
||||||
// use std::net::{SocketAddr, TcpStream};
|
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
use crate::tmtc::{PacketSenderRaw, PacketSource};
|
use crate::tmtc::{PacketSenderRaw, PacketSource};
|
||||||
@ -244,13 +242,16 @@ impl<
|
|||||||
// Create a poll instance.
|
// Create a poll instance.
|
||||||
let poll = Poll::new()?;
|
let poll = Poll::new()?;
|
||||||
// Create storage for events.
|
// Create storage for events.
|
||||||
let events = Events::with_capacity(10);
|
let events = Events::with_capacity(32);
|
||||||
let listener: std::net::TcpListener = socket.into();
|
let listener: std::net::TcpListener = socket.into();
|
||||||
let mut mio_listener = TcpListener::from_std(listener);
|
let mut mio_listener = TcpListener::from_std(listener);
|
||||||
|
|
||||||
// Start listening for incoming connections.
|
// Start listening for incoming connections.
|
||||||
poll.registry()
|
poll.registry().register(
|
||||||
.register(&mut mio_listener, Token(0), Interest::READABLE)?;
|
&mut mio_listener,
|
||||||
|
Token(0),
|
||||||
|
Interest::READABLE | Interest::WRITABLE,
|
||||||
|
)?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
id: cfg.id,
|
id: cfg.id,
|
||||||
@ -280,11 +281,11 @@ impl<
|
|||||||
self.listener.local_addr()
|
self.listener.local_addr()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This call is used to handle the next connection to a client. Right now, it performs
|
/// This call is used to handle all connection from clients. Right now, it performs
|
||||||
/// the following steps:
|
/// the following steps:
|
||||||
///
|
///
|
||||||
/// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API
|
/// 1. It calls the [std::net::TcpListener::accept] method until a client connects. An optional
|
||||||
/// until a client connects.
|
/// timeout can be specified for non-blocking acceptance.
|
||||||
/// 2. It reads all the telecommands from the client and parses all received data using the
|
/// 2. It reads all the telecommands from the client and parses all received data using the
|
||||||
/// user specified [TcpTcParser].
|
/// user specified [TcpTcParser].
|
||||||
/// 3. After reading and parsing all telecommands, it sends back all telemetry using the
|
/// 3. After reading and parsing all telecommands, it sends back all telemetry using the
|
||||||
@ -317,11 +318,17 @@ impl<
|
|||||||
loop {
|
loop {
|
||||||
match self.listener.accept() {
|
match self.listener.accept() {
|
||||||
Ok((stream, addr)) => {
|
Ok((stream, addr)) => {
|
||||||
self.handle_accepted_connection(stream, addr)?;
|
if let Err(e) = self.handle_accepted_connection(stream, addr) {
|
||||||
|
self.reregister_poll_interest()?;
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
handled_connections += 1;
|
handled_connections += 1;
|
||||||
}
|
}
|
||||||
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break,
|
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break,
|
||||||
Err(err) => return Err(TcpTmtcError::Io(err)),
|
Err(err) => {
|
||||||
|
self.reregister_poll_interest()?;
|
||||||
|
return Err(TcpTmtcError::Io(err));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -331,6 +338,14 @@ impl<
|
|||||||
Ok(ConnectionResult::AcceptTimeout)
|
Ok(ConnectionResult::AcceptTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn reregister_poll_interest(&mut self) -> io::Result<()> {
|
||||||
|
self.poll.registry().reregister(
|
||||||
|
&mut self.listener,
|
||||||
|
Token(0),
|
||||||
|
Interest::READABLE | Interest::WRITABLE,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
fn handle_accepted_connection(
|
fn handle_accepted_connection(
|
||||||
&mut self,
|
&mut self,
|
||||||
mut stream: TcpStream,
|
mut stream: TcpStream,
|
||||||
|
Loading…
Reference in New Issue
Block a user