TCP Server #77

Merged
muellerr merged 52 commits from tcp-server into main 2023-09-21 18:11:38 +02:00
2 changed files with 147 additions and 58 deletions
Showing only changes of commit 8d8e319aee - Show all commits

View File

@ -1,6 +1,7 @@
//! Generic TCP TMTC servers with different TMTC format flavours.
use alloc::vec;
use alloc::{boxed::Box, vec::Vec};
use core::time::Duration;
use socket2::{Domain, Socket, Type};
use std::net::SocketAddr;
use std::net::TcpListener;
@ -34,6 +35,7 @@ pub struct ConnectionResult {
pub(crate) struct TcpTmtcServerBase<TcError, TmError> {
pub(crate) listener: TcpListener,
pub(crate) inner_loop_delay: Duration,
pub(crate) tm_source: Box<dyn TmPacketSource<Error = TmError> + Send>,
pub(crate) tm_buffer: Vec<u8>,
pub(crate) tc_receiver: Box<dyn ReceivesTc<Error = TcError> + Send>,
@ -43,6 +45,7 @@ pub(crate) struct TcpTmtcServerBase<TcError, TmError> {
impl<TcError, TmError> TcpTmtcServerBase<TcError, TmError> {
pub(crate) fn new(
addr: &SocketAddr,
inner_loop_delay: Duration,
reuse_addr: bool,
reuse_port: bool,
tm_buffer_size: usize,
@ -59,6 +62,7 @@ impl<TcError, TmError> TcpTmtcServerBase<TcError, TmError> {
socket.listen(128)?;
Ok(Self {
listener: socket.into(),
inner_loop_delay,
tm_source,
tm_buffer: vec![0; tm_buffer_size],
tc_receiver,

View File

@ -3,10 +3,14 @@ use alloc::vec;
use cobs::decode_in_place;
use cobs::encode;
use cobs::max_encoding_length;
use core::time::Duration;
use std::io::Read;
use std::io::Write;
use std::net::SocketAddr;
use std::net::TcpListener;
use std::net::TcpStream;
use std::println;
use std::thread;
use std::vec::Vec;
use crate::hal::std::tcp_server::TcpTmtcServerBase;
@ -16,6 +20,58 @@ use crate::tmtc::TmPacketSource;
use super::tcp_server::ConnectionResult;
use super::tcp_server::TcpTmtcError;
/// TCP configuration struct.
///
/// ## Parameters
///
/// * `addr` - Address of the TCP server.
/// * `inner_loop_delay` - If a client connects for a longer period, but no TC is received or
/// no TM needs to be sent, the TCP server will delay for the specified amount of time
/// to reduce CPU load.
/// * `tm_buffer_size` - Size of the TM buffer used to read TM from the [TmPacketSource] and
/// encoding of that data. This buffer should at large enough to hold the maximum expected
/// TM size in addition to the COBS encoding overhead. You can use
/// [cobs::max_encoding_length] to calculate this size.
/// * `tc_buffer_size` - Size of the TC buffer used to read encoded telecommands sent from
/// the client. It is recommended to make this buffer larger to allow reading multiple
/// consecutive packets as well, for example by using 4096 or 8192 byte. The buffer should
/// at the very least be large enough to hold the maximum expected telecommand size in
/// addition to its COBS encoding overhead. You can use [cobs::max_encoding_length] to
/// calculate this size.
/// * `reuse_addr` - Can be used to set the `SO_REUSEADDR` option on the raw socket. This is
/// especially useful if the address and port are static for the server. Set to false by
/// default.
/// * `reuse_port` - Can be used to set the `SO_REUSEPORT` option on the raw socket. This is
/// especially useful if the address and port are static for the server. Set to false by
/// default.
#[derive(Debug, Copy, Clone)]
pub struct ServerConfig {
pub addr: SocketAddr,
pub inner_loop_delay: Duration,
pub tm_buffer_size: usize,
pub tc_buffer_size: usize,
pub reuse_addr: bool,
pub reuse_port: bool,
}
impl ServerConfig {
pub fn new(
addr: SocketAddr,
inner_loop_delay: Duration,
tm_buffer_size: usize,
tc_buffer_size: usize,
) -> Self {
Self {
addr,
inner_loop_delay,
tm_buffer_size,
tc_buffer_size,
reuse_addr: false,
reuse_port: false,
}
}
}
/// TCP TMTC server implementation for exchange of generic TMTC packets which are framed with the
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
///
@ -43,45 +99,27 @@ impl<TcError: 'static, TmError: 'static> TcpTmtcInCobsServer<TcError, TmError> {
///
/// ## Parameter
///
/// * `addr` - Address of the TCP server.
/// * `reuse_addr` - Can be used to set the `SO_REUSEADDR` option on the raw socket. This is
/// especially useful if the address and port are static for the server.
/// * `reuse_port` - Can be used to set the `SO_REUSEPORT` option on the raw socket. This is
/// especially useful if the address and port are static for the server.
/// * `tm_buffer_size` - Size of the TM buffer used to read TM from the [TmPacketSource] and
/// encoding of that data. This buffer should at large enough to hold the maximum expected
/// TM size in addition to the COBS encoding overhead. You can use
/// [cobs::max_encoding_length] to calculate this size.
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
/// then sent back to the client.
/// * `tc_buffer_size` - Size of the TC buffer used to read encoded telecommands sent from
/// the client. It is recommended to make this buffer larger to allow reading multiple
/// consecutive packets as well, for example by using 4096 or 8192 byte. The buffer should
/// at the very least be large enough to hold the maximum expected telecommand size in
/// addition to its COBS encoding overhead. You can use [cobs::max_encoding_length] to
/// calculate this size.
/// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded
/// to this TC receiver.
pub fn new(
addr: &SocketAddr,
reuse_addr: bool,
reuse_port: bool,
tm_buffer_size: usize,
cfg: ServerConfig,
tm_source: Box<dyn TmPacketSource<Error = TmError> + Send>,
tc_buffer_size: usize,
tc_receiver: Box<dyn ReceivesTc<Error = TcError> + Send>,
) -> Result<Self, std::io::Error> {
Ok(Self {
base: TcpTmtcServerBase::new(
addr,
reuse_addr,
reuse_port,
tm_buffer_size,
&cfg.addr,
cfg.inner_loop_delay,
cfg.reuse_addr,
cfg.reuse_port,
cfg.tm_buffer_size,
tm_source,
tc_buffer_size,
cfg.tc_buffer_size,
tc_receiver,
)?,
tm_encoding_buffer: vec![0; max_encoding_length(tc_buffer_size)],
tm_encoding_buffer: vec![0; max_encoding_length(cfg.tc_buffer_size)],
})
}
@ -112,35 +150,85 @@ impl<TcError: 'static, TmError: 'static> TcpTmtcInCobsServer<TcError, TmError> {
let mut current_write_idx;
let mut next_write_idx = 0;
let (mut stream, addr) = self.base.listener.accept()?;
stream.set_nonblocking(true)?;
connection_result.addr = Some(addr);
current_write_idx = next_write_idx;
next_write_idx = 0;
loop {
let read_len = stream.read(&mut self.base.tc_buffer[current_write_idx..])?;
if read_len > 0 {
current_write_idx += read_len;
if current_write_idx == self.base.tc_buffer.capacity() {
// Reader vec full, need to parse for packets.
connection_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets(
&mut self.base.tc_buffer[..current_write_idx],
self.base.tc_receiver.as_mut(),
&mut next_write_idx,
)
.map_err(|e| TcpTmtcError::TcError(e))?;
current_write_idx = next_write_idx;
let read_result = stream.read(&mut self.base.tc_buffer[current_write_idx..]);
match read_result {
Ok(0) => {
// Connection closed by client. If any TC was read, parse for complete packets.
// After that, break the outer loop.
if current_write_idx > 0 {
self.handle_tc_parsing(
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
}
break;
}
continue;
Ok(read_len) => {
current_write_idx += read_len;
// TC buffer is full, we must parse for complete packets now.
if current_write_idx == self.base.tc_buffer.capacity() {
self.handle_tc_parsing(
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
current_write_idx = next_write_idx;
}
}
Err(e) => match e.kind() {
// As per [TcpStream::set_read_timeout] documentation, this should work for
// both UNIX and Windows.
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
println!("should be here..");
self.handle_tc_parsing(
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
current_write_idx = next_write_idx;
if !self.handle_tm_sending(&mut connection_result, &mut stream)? {
// No TC read, no TM was sent, but the client has not disconnected.
// Perform an inner delay to avoid burning CPU time.
thread::sleep(self.base.inner_loop_delay);
}
}
_ => {
return Err(TcpTmtcError::Io(e));
}
},
}
break;
}
if current_write_idx > 0 {
connection_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets(
&mut self.base.tc_buffer[..current_write_idx],
self.base.tc_receiver.as_mut(),
&mut next_write_idx,
)
.map_err(|e| TcpTmtcError::TcError(e))?;
}
self.handle_tm_sending(&mut connection_result, &mut stream)?;
Ok(connection_result)
}
fn handle_tc_parsing(
&mut self,
conn_result: &mut ConnectionResult,
current_write_idx: usize,
next_write_idx: &mut usize,
) -> Result<(), TcpTmtcError<TmError, TcError>> {
// Reader vec full, need to parse for packets.
conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets(
&mut self.base.tc_buffer[..current_write_idx],
self.base.tc_receiver.as_mut(),
next_write_idx,
)
.map_err(|e| TcpTmtcError::TcError(e))?;
Ok(())
}
fn handle_tm_sending(
&mut self,
conn_result: &mut ConnectionResult,
stream: &mut TcpStream,
) -> Result<bool, TcpTmtcError<TmError, TcError>> {
let mut tm_was_sent = false;
loop {
// Write TM until TM source is exhausted. For now, there is no limit for the amount
// of TM written this way.
@ -149,10 +237,12 @@ impl<TcError: 'static, TmError: 'static> TcpTmtcInCobsServer<TcError, TmError> {
.tm_source
.retrieve_packet(&mut self.base.tm_buffer)
.map_err(|e| TcpTmtcError::TmError(e))?;
if read_tm_len == 0 {
break;
return Ok(tm_was_sent);
}
connection_result.num_sent_tms += 1;
tm_was_sent = true;
conn_result.num_sent_tms += 1;
// Encode into COBS and sent to client.
let mut current_idx = 0;
@ -166,7 +256,6 @@ impl<TcError: 'static, TmError: 'static> TcpTmtcInCobsServer<TcError, TmError> {
current_idx += 1;
stream.write_all(&self.tm_encoding_buffer[..current_idx])?;
}
Ok(connection_result)
}
}
@ -239,7 +328,7 @@ mod tests {
use alloc::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec};
use cobs::encode;
use super::{parse_buffer_for_cobs_encoded_packets, TcpTmtcInCobsServer};
use super::{parse_buffer_for_cobs_encoded_packets, ServerConfig, TcpTmtcInCobsServer};
const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5];
const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 2, 1];
@ -476,12 +565,8 @@ mod tests {
tm_source: SyncTmSource,
) -> TcpTmtcInCobsServer<(), ()> {
TcpTmtcInCobsServer::new(
addr,
false,
false,
1024,
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
Box::new(tm_source),
1024,
Box::new(tc_receiver.clone()),
)
.expect("TCP server generation failed")