sat-rs/satrs-core/src/hal/std/tcp_server.rs

324 lines
13 KiB
Rust
Raw Normal View History

2023-09-16 21:28:22 +02:00
//! Generic TCP TMTC servers with different TMTC format flavours.
2023-09-15 15:37:57 +02:00
use alloc::vec;
2023-09-15 19:15:26 +02:00
use alloc::{boxed::Box, vec::Vec};
2023-09-17 02:31:02 +02:00
use core::time::Duration;
2023-09-17 01:32:18 +02:00
use socket2::{Domain, Socket, Type};
2023-09-18 00:18:01 +02:00
use std::io::Read;
2023-09-17 01:32:18 +02:00
use std::net::TcpListener;
2023-09-18 00:18:01 +02:00
use std::net::{SocketAddr, TcpStream};
use std::thread;
2023-09-15 19:15:26 +02:00
use crate::tmtc::{ReceivesTc, TmPacketSource};
2023-09-15 18:34:05 +02:00
use thiserror::Error;
2023-09-15 15:37:57 +02:00
2023-09-15 19:15:26 +02:00
// Re-export the TMTC in COBS server.
2023-09-16 21:28:22 +02:00
pub use crate::hal::std::tcp_with_cobs_server::{
2023-09-18 00:45:13 +02:00
parse_buffer_for_cobs_encoded_packets, CobsTcParser, CobsTmSender, TcpTmtcInCobsServer,
2023-09-15 19:22:12 +02:00
};
2023-09-15 18:34:05 +02:00
2023-09-17 02:35:08 +02:00
/// TCP configuration struct.
///
/// ## Parameters
///
/// * `addr` - Address of the TCP server.
/// * `inner_loop_delay` - If a client connects for a longer period, but no TC is received or
/// no TM needs to be sent, the TCP server will delay for the specified amount of time
/// to reduce CPU load.
/// * `tm_buffer_size` - Size of the TM buffer used to read TM from the [TmPacketSource] and
/// encoding of that data. This buffer should at large enough to hold the maximum expected
/// TM size in addition to the COBS encoding overhead. You can use
/// [cobs::max_encoding_length] to calculate this size.
/// * `tc_buffer_size` - Size of the TC buffer used to read encoded telecommands sent from
/// the client. It is recommended to make this buffer larger to allow reading multiple
/// consecutive packets as well, for example by using 4096 or 8192 byte. The buffer should
/// at the very least be large enough to hold the maximum expected telecommand size in
/// addition to its COBS encoding overhead. You can use [cobs::max_encoding_length] to
/// calculate this size.
/// * `reuse_addr` - Can be used to set the `SO_REUSEADDR` option on the raw socket. This is
/// especially useful if the address and port are static for the server. Set to false by
/// default.
/// * `reuse_port` - Can be used to set the `SO_REUSEPORT` option on the raw socket. This is
/// especially useful if the address and port are static for the server. Set to false by
/// default.
#[derive(Debug, Copy, Clone)]
pub struct ServerConfig {
pub addr: SocketAddr,
pub inner_loop_delay: Duration,
pub tm_buffer_size: usize,
pub tc_buffer_size: usize,
pub reuse_addr: bool,
pub reuse_port: bool,
}
impl ServerConfig {
pub fn new(
addr: SocketAddr,
inner_loop_delay: Duration,
tm_buffer_size: usize,
tc_buffer_size: usize,
) -> Self {
Self {
addr,
inner_loop_delay,
tm_buffer_size,
tc_buffer_size,
reuse_addr: false,
reuse_port: false,
}
}
}
2023-09-15 18:34:05 +02:00
#[derive(Error, Debug)]
2023-09-16 16:23:42 +02:00
pub enum TcpTmtcError<TmError, TcError> {
2023-09-15 18:34:05 +02:00
#[error("TM retrieval error: {0}")]
TmError(TmError),
#[error("TC retrieval error: {0}")]
TcError(TcError),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
2023-09-15 15:37:57 +02:00
}
2023-09-15 19:15:26 +02:00
/// Result of one connection attempt. Contains the client address if a connection was established,
/// in addition to the number of telecommands and telemetry packets exchanged.
#[derive(Debug, Default)]
pub struct ConnectionResult {
pub addr: Option<SocketAddr>,
pub num_received_tcs: u32,
pub num_sent_tms: u32,
}
2023-09-18 00:45:13 +02:00
/// Generic parser abstraction for an object which can parse for telecommands given a raw
/// bytestream received from a TCP socket and send them to a generic [ReceivesTc] telecommand
/// receiver. This allows different encoding schemes for telecommands.
pub trait TcpTcParser<TmError, TcError> {
2023-09-18 00:18:01 +02:00
fn handle_tc_parsing(
&mut self,
tc_buffer: &mut [u8],
tc_receiver: &mut dyn ReceivesTc<Error = TcError>,
conn_result: &mut ConnectionResult,
current_write_idx: usize,
next_write_idx: &mut usize,
) -> Result<(), TcpTmtcError<TmError, TcError>>;
}
2023-09-18 00:45:13 +02:00
/// Generic sender abstraction for an object which can pull telemetry from a given TM source
/// using a [TmPacketSource] and then send them back to a client using a given [TcpStream].
/// The concrete implementation can also perform any encoding steps which are necessary before
/// sending back the data to a client.
pub trait TcpTmSender<TmError, TcError> {
2023-09-18 00:18:01 +02:00
fn handle_tm_sending(
&mut self,
tm_buffer: &mut [u8],
tm_source: &mut dyn TmPacketSource<Error = TmError>,
conn_result: &mut ConnectionResult,
stream: &mut TcpStream,
) -> Result<bool, TcpTmtcError<TmError, TcError>>;
}
2023-09-18 00:45:13 +02:00
/// TCP TMTC server implementation for exchange of generic TMTC packets in a generic way which
/// stays agnostic to the encoding scheme and format used for both telecommands and telemetry.
///
/// This server implements a generic TMTC handling logic and allows modifying its behaviour
/// through the following 4 core abstractions:
///
/// 1. [TcpTcParser] to parse for telecommands from the raw bytestream received from a client.
/// 2. Parsed telecommands will be sent to the [ReceivesTc] telecommand receiver.
/// 3. [TcpTmSender] to send telemetry pulled from a TM source back to the client.
/// 4. [TmPacketSource] as a generic TM source used by the [TcpTmSender].
///
/// It is possible to specify custom abstractions to build a dedicated TCP TMTC server without
/// having to re-implement common logic.
///
/// Currently, this framework offers the following concrete implementations:
///
/// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol.
2023-09-18 00:18:01 +02:00
pub struct TcpTmtcGenericServer<
TmError,
TcError,
2023-09-18 00:45:13 +02:00
TmHandler: TcpTmSender<TmError, TcError>,
TcHandler: TcpTcParser<TmError, TcError>,
2023-09-18 00:18:01 +02:00
> {
base: TcpTmtcServerBase<TmError, TcError>,
tc_handler: TcHandler,
tm_handler: TmHandler,
}
impl<
TmError: 'static,
TcError: 'static,
2023-09-18 00:45:13 +02:00
TmSender: TcpTmSender<TmError, TcError>,
TcParser: TcpTcParser<TmError, TcError>,
> TcpTmtcGenericServer<TmError, TcError, TmSender, TcParser>
2023-09-18 00:18:01 +02:00
{
2023-09-18 00:45:13 +02:00
/// Create a new generic TMTC server instance.
2023-09-18 00:18:01 +02:00
///
/// ## Parameter
///
/// * `cfg` - Configuration of the server.
2023-09-18 00:45:13 +02:00
/// * `tc_parser` - Parser which extracts telecommands from the raw bytestream received from
/// the client.
/// * `tm_sender` - Sends back telemetry to the client using the specified TM source.
2023-09-18 00:18:01 +02:00
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
/// then sent back to the client.
/// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded
/// to this TC receiver.
pub fn new(
cfg: ServerConfig,
2023-09-18 00:45:13 +02:00
tc_parser: TcParser,
tm_sender: TmSender,
2023-09-18 00:18:01 +02:00
tm_source: Box<dyn TmPacketSource<Error = TmError> + Send>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError> + Send>,
2023-09-18 00:45:13 +02:00
) -> Result<TcpTmtcGenericServer<TmError, TcError, TmSender, TcParser>, std::io::Error> {
2023-09-18 00:18:01 +02:00
Ok(Self {
base: TcpTmtcServerBase::new(cfg, tm_source, tc_receiver)?,
2023-09-18 00:45:13 +02:00
tc_handler: tc_parser,
tm_handler: tm_sender,
2023-09-18 00:18:01 +02:00
})
}
/// Retrieve the internal [TcpListener] class.
pub fn listener(&mut self) -> &mut TcpListener {
self.base.listener()
}
/// Can be used to retrieve the local assigned address of the TCP server. This is especially
/// useful if using the port number 0 for OS auto-assignment.
pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.base.local_addr()
}
/// This call is used to handle the next connection to a client. Right now, it performs
/// the following steps:
///
/// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API
/// until a client connects.
2023-09-18 00:45:13 +02:00
/// 2. It reads all the telecommands from the client and parses all received data using the
/// user specified [TcpTcParser].
/// 3. After reading and parsing all telecommands, it sends back all telemetry using the
/// user specified [TcpTmSender].
///
/// The server will delay for a user-specified period if the client connects to the server
/// for prolonged periods and there is no traffic for the server. This is the case if the
/// client does not send any telecommands and no telemetry needs to be sent back to the client.
2023-09-18 00:18:01 +02:00
pub fn handle_next_connection(
&mut self,
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>> {
let mut connection_result = ConnectionResult::default();
let mut current_write_idx;
let mut next_write_idx = 0;
let (mut stream, addr) = self.base.listener.accept()?;
stream.set_nonblocking(true)?;
connection_result.addr = Some(addr);
current_write_idx = next_write_idx;
loop {
let read_result = stream.read(&mut self.base.tc_buffer[current_write_idx..]);
match read_result {
Ok(0) => {
// Connection closed by client. If any TC was read, parse for complete packets.
// After that, break the outer loop.
if current_write_idx > 0 {
self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer,
self.base.tc_receiver.as_mut(),
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
}
break;
}
Ok(read_len) => {
current_write_idx += read_len;
// TC buffer is full, we must parse for complete packets now.
if current_write_idx == self.base.tc_buffer.capacity() {
self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer,
self.base.tc_receiver.as_mut(),
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
current_write_idx = next_write_idx;
}
}
Err(e) => match e.kind() {
// As per [TcpStream::set_read_timeout] documentation, this should work for
// both UNIX and Windows.
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer,
self.base.tc_receiver.as_mut(),
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
current_write_idx = next_write_idx;
if !self.tm_handler.handle_tm_sending(
&mut self.base.tm_buffer,
self.base.tm_source.as_mut(),
&mut connection_result,
&mut stream,
)? {
// No TC read, no TM was sent, but the client has not disconnected.
// Perform an inner delay to avoid burning CPU time.
thread::sleep(self.base.inner_loop_delay);
}
}
_ => {
return Err(TcpTmtcError::Io(e));
}
},
}
}
self.tm_handler.handle_tm_sending(
&mut self.base.tm_buffer,
self.base.tm_source.as_mut(),
&mut connection_result,
&mut stream,
)?;
Ok(connection_result)
}
}
2023-09-18 00:11:01 +02:00
pub(crate) struct TcpTmtcServerBase<TmError, TcError> {
2023-09-15 19:22:12 +02:00
pub(crate) listener: TcpListener,
2023-09-17 02:31:02 +02:00
pub(crate) inner_loop_delay: Duration,
2023-09-16 16:23:42 +02:00
pub(crate) tm_source: Box<dyn TmPacketSource<Error = TmError> + Send>,
2023-09-15 19:22:12 +02:00
pub(crate) tm_buffer: Vec<u8>,
2023-09-16 16:23:42 +02:00
pub(crate) tc_receiver: Box<dyn ReceivesTc<Error = TcError> + Send>,
2023-09-15 19:22:12 +02:00
pub(crate) tc_buffer: Vec<u8>,
2023-09-15 19:15:26 +02:00
}
2023-09-18 00:11:01 +02:00
impl<TmError, TcError> TcpTmtcServerBase<TmError, TcError> {
2023-09-17 01:32:18 +02:00
pub(crate) fn new(
2023-09-18 00:11:01 +02:00
cfg: ServerConfig,
2023-09-16 16:23:42 +02:00
tm_source: Box<dyn TmPacketSource<Error = TmError> + Send>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError> + Send>,
2023-09-15 15:37:57 +02:00
) -> Result<Self, std::io::Error> {
2023-09-17 01:32:18 +02:00
// Create a TCP listener bound to two addresses.
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
2023-09-18 00:11:01 +02:00
socket.set_reuse_address(cfg.reuse_addr)?;
socket.set_reuse_port(cfg.reuse_port)?;
let addr = (cfg.addr).into();
2023-09-17 01:32:18 +02:00
socket.bind(&addr)?;
socket.listen(128)?;
2023-09-15 18:34:05 +02:00
Ok(Self {
2023-09-17 01:32:18 +02:00
listener: socket.into(),
2023-09-18 00:11:01 +02:00
inner_loop_delay: cfg.inner_loop_delay,
2023-09-15 18:34:05 +02:00
tm_source,
2023-09-18 00:11:01 +02:00
tm_buffer: vec![0; cfg.tm_buffer_size],
2023-09-15 15:37:57 +02:00
tc_receiver,
2023-09-18 00:11:01 +02:00
tc_buffer: vec![0; cfg.tc_buffer_size],
2023-09-15 15:37:57 +02:00
})
}
2023-09-16 22:19:48 +02:00
2023-09-17 01:32:18 +02:00
pub(crate) fn listener(&mut self) -> &mut TcpListener {
&mut self.listener
}
2023-09-16 22:19:48 +02:00
pub(crate) fn local_addr(&self) -> std::io::Result<SocketAddr> {
2023-09-16 21:51:06 +02:00
self.listener.local_addr()
}
2023-09-14 23:51:17 +02:00
}