move generic server
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit

This commit is contained in:
Robin Müller 2023-09-18 00:18:01 +02:00
parent b622c3871a
commit 5aa339286a
Signed by: muellerr
GPG Key ID: A649FB78196E3849
2 changed files with 169 additions and 171 deletions

View File

@ -3,8 +3,10 @@ use alloc::vec;
use alloc::{boxed::Box, vec::Vec}; use alloc::{boxed::Box, vec::Vec};
use core::time::Duration; use core::time::Duration;
use socket2::{Domain, Socket, Type}; use socket2::{Domain, Socket, Type};
use std::net::SocketAddr; use std::io::Read;
use std::net::TcpListener; use std::net::TcpListener;
use std::net::{SocketAddr, TcpStream};
use std::thread;
use crate::tmtc::{ReceivesTc, TmPacketSource}; use crate::tmtc::{ReceivesTc, TmPacketSource};
use thiserror::Error; use thiserror::Error;
@ -85,6 +87,169 @@ pub struct ConnectionResult {
pub num_sent_tms: u32, pub num_sent_tms: u32,
} }
pub trait TcpTcHandler<TmError, TcError> {
fn handle_tc_parsing(
&mut self,
tc_buffer: &mut [u8],
tc_receiver: &mut dyn ReceivesTc<Error = TcError>,
conn_result: &mut ConnectionResult,
current_write_idx: usize,
next_write_idx: &mut usize,
) -> Result<(), TcpTmtcError<TmError, TcError>>;
}
pub trait TcpTmHandler<TmError, TcError> {
fn handle_tm_sending(
&mut self,
tm_buffer: &mut [u8],
tm_source: &mut dyn TmPacketSource<Error = TmError>,
conn_result: &mut ConnectionResult,
stream: &mut TcpStream,
) -> Result<bool, TcpTmtcError<TmError, TcError>>;
}
pub struct TcpTmtcGenericServer<
TmError,
TcError,
TmHandler: TcpTmHandler<TmError, TcError>,
TcHandler: TcpTcHandler<TmError, TcError>,
> {
base: TcpTmtcServerBase<TmError, TcError>,
tc_handler: TcHandler,
tm_handler: TmHandler,
}
impl<
TmError: 'static,
TcError: 'static,
TmHandler: TcpTmHandler<TmError, TcError>,
TcHandler: TcpTcHandler<TmError, TcError>,
> TcpTmtcGenericServer<TmError, TcError, TmHandler, TcHandler>
{
/// Create a new TMTC server which exchanges TMTC packets encoded with
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
///
/// ## Parameter
///
/// * `cfg` - Configuration of the server.
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
/// then sent back to the client.
/// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded
/// to this TC receiver.
pub fn new(
cfg: ServerConfig,
tc_handler: TcHandler,
tm_handler: TmHandler,
tm_source: Box<dyn TmPacketSource<Error = TmError> + Send>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError> + Send>,
) -> Result<TcpTmtcGenericServer<TmError, TcError, TmHandler, TcHandler>, std::io::Error> {
Ok(Self {
base: TcpTmtcServerBase::new(cfg, tm_source, tc_receiver)?,
tc_handler,
tm_handler,
})
}
/// Retrieve the internal [TcpListener] class.
pub fn listener(&mut self) -> &mut TcpListener {
self.base.listener()
}
/// Can be used to retrieve the local assigned address of the TCP server. This is especially
/// useful if using the port number 0 for OS auto-assignment.
pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.base.local_addr()
}
/// This call is used to handle the next connection to a client. Right now, it performs
/// the following steps:
///
/// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API
/// until a client connects.
/// 2. It reads all the telecommands from the client, which are expected to be COBS
/// encoded packets.
/// 3. After reading and parsing all telecommands, it sends back all telemetry it can retrieve
/// from the user specified [TmPacketSource] back to the client.
pub fn handle_next_connection(
&mut self,
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>> {
let mut connection_result = ConnectionResult::default();
let mut current_write_idx;
let mut next_write_idx = 0;
let (mut stream, addr) = self.base.listener.accept()?;
stream.set_nonblocking(true)?;
connection_result.addr = Some(addr);
current_write_idx = next_write_idx;
loop {
let read_result = stream.read(&mut self.base.tc_buffer[current_write_idx..]);
match read_result {
Ok(0) => {
// Connection closed by client. If any TC was read, parse for complete packets.
// After that, break the outer loop.
if current_write_idx > 0 {
self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer,
self.base.tc_receiver.as_mut(),
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
}
break;
}
Ok(read_len) => {
current_write_idx += read_len;
// TC buffer is full, we must parse for complete packets now.
if current_write_idx == self.base.tc_buffer.capacity() {
self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer,
self.base.tc_receiver.as_mut(),
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
current_write_idx = next_write_idx;
}
}
Err(e) => match e.kind() {
// As per [TcpStream::set_read_timeout] documentation, this should work for
// both UNIX and Windows.
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer,
self.base.tc_receiver.as_mut(),
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
current_write_idx = next_write_idx;
if !self.tm_handler.handle_tm_sending(
&mut self.base.tm_buffer,
self.base.tm_source.as_mut(),
&mut connection_result,
&mut stream,
)? {
// No TC read, no TM was sent, but the client has not disconnected.
// Perform an inner delay to avoid burning CPU time.
thread::sleep(self.base.inner_loop_delay);
}
}
_ => {
return Err(TcpTmtcError::Io(e));
}
},
}
}
self.tm_handler.handle_tm_sending(
&mut self.base.tm_buffer,
self.base.tm_source.as_mut(),
&mut connection_result,
&mut stream,
)?;
Ok(connection_result)
}
}
pub(crate) struct TcpTmtcServerBase<TmError, TcError> { pub(crate) struct TcpTmtcServerBase<TmError, TcError> {
pub(crate) listener: TcpListener, pub(crate) listener: TcpListener,
pub(crate) inner_loop_delay: Duration, pub(crate) inner_loop_delay: Duration,

View File

@ -3,32 +3,18 @@ use alloc::vec;
use cobs::decode_in_place; use cobs::decode_in_place;
use cobs::encode; use cobs::encode;
use delegate::delegate; use delegate::delegate;
use std::io::Read;
use std::io::Write; use std::io::Write;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::net::TcpListener; use std::net::TcpListener;
use std::net::TcpStream; use std::net::TcpStream;
use std::println;
use std::thread;
use std::vec::Vec; use std::vec::Vec;
use crate::hal::std::tcp_server::{ServerConfig, TcpTmtcServerBase};
use crate::tmtc::ReceivesTc; use crate::tmtc::ReceivesTc;
use crate::tmtc::TmPacketSource; use crate::tmtc::TmPacketSource;
use super::tcp_server::ConnectionResult; use crate::hal::std::tcp_server::{
use super::tcp_server::TcpTmtcError; ConnectionResult, ServerConfig, TcpTcHandler, TcpTmHandler, TcpTmtcError, TcpTmtcGenericServer,
};
pub trait TcpTcHandler<TmError, TcError> {
fn handle_tc_parsing(
&mut self,
tc_buffer: &mut [u8],
tc_receiver: &mut dyn ReceivesTc<Error = TcError>,
conn_result: &mut ConnectionResult,
current_write_idx: usize,
next_write_idx: &mut usize,
) -> Result<(), TcpTmtcError<TmError, TcError>>;
}
#[derive(Default)] #[derive(Default)]
struct CobsTcParser {} struct CobsTcParser {}
@ -53,16 +39,6 @@ impl<TmError, TcError> TcpTcHandler<TmError, TcError> for CobsTcParser {
} }
} }
pub trait TcpTmHandler<TmError, TcError> {
fn handle_tm_sending(
&mut self,
tm_buffer: &mut [u8],
tm_source: &mut dyn TmPacketSource<Error = TmError>,
conn_result: &mut ConnectionResult,
stream: &mut TcpStream,
) -> Result<bool, TcpTmtcError<TmError, TcError>>;
}
struct CobsTmParser { struct CobsTmParser {
tm_encoding_buffer: Vec<u8>, tm_encoding_buffer: Vec<u8>,
} }
@ -114,149 +90,6 @@ impl<TmError, TcError> TcpTmHandler<TmError, TcError> for CobsTmParser {
} }
} }
pub struct TcpTmtcGenericServer<
TmError,
TcError,
TmHandler: TcpTmHandler<TmError, TcError>,
TcHandler: TcpTcHandler<TmError, TcError>,
> {
base: TcpTmtcServerBase<TmError, TcError>,
tc_handler: TcHandler,
tm_handler: TmHandler,
}
impl<
TmError: 'static,
TcError: 'static,
TmHandler: TcpTmHandler<TmError, TcError>,
TcHandler: TcpTcHandler<TmError, TcError>,
> TcpTmtcGenericServer<TmError, TcError, TmHandler, TcHandler>
{
/// Create a new TMTC server which exchanges TMTC packets encoded with
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
///
/// ## Parameter
///
/// * `cfg` - Configuration of the server.
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
/// then sent back to the client.
/// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded
/// to this TC receiver.
pub fn new(
cfg: ServerConfig,
tc_handler: TcHandler,
tm_handler: TmHandler,
tm_source: Box<dyn TmPacketSource<Error = TmError> + Send>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError> + Send>,
) -> Result<TcpTmtcGenericServer<TmError, TcError, TmHandler, TcHandler>, std::io::Error> {
Ok(Self {
base: TcpTmtcServerBase::new(cfg, tm_source, tc_receiver)?,
tc_handler,
tm_handler,
})
}
/// Retrieve the internal [TcpListener] class.
pub fn listener(&mut self) -> &mut TcpListener {
self.base.listener()
}
/// Can be used to retrieve the local assigned address of the TCP server. This is especially
/// useful if using the port number 0 for OS auto-assignment.
pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.base.local_addr()
}
/// This call is used to handle the next connection to a client. Right now, it performs
/// the following steps:
///
/// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API
/// until a client connects.
/// 2. It reads all the telecommands from the client, which are expected to be COBS
/// encoded packets.
/// 3. After reading and parsing all telecommands, it sends back all telemetry it can retrieve
/// from the user specified [TmPacketSource] back to the client.
pub fn handle_next_connection(
&mut self,
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>> {
let mut connection_result = ConnectionResult::default();
let mut current_write_idx;
let mut next_write_idx = 0;
let (mut stream, addr) = self.base.listener.accept()?;
stream.set_nonblocking(true)?;
connection_result.addr = Some(addr);
current_write_idx = next_write_idx;
loop {
let read_result = stream.read(&mut self.base.tc_buffer[current_write_idx..]);
match read_result {
Ok(0) => {
// Connection closed by client. If any TC was read, parse for complete packets.
// After that, break the outer loop.
if current_write_idx > 0 {
self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer,
self.base.tc_receiver.as_mut(),
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
}
break;
}
Ok(read_len) => {
current_write_idx += read_len;
// TC buffer is full, we must parse for complete packets now.
if current_write_idx == self.base.tc_buffer.capacity() {
self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer,
self.base.tc_receiver.as_mut(),
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
current_write_idx = next_write_idx;
}
}
Err(e) => match e.kind() {
// As per [TcpStream::set_read_timeout] documentation, this should work for
// both UNIX and Windows.
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer,
self.base.tc_receiver.as_mut(),
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
current_write_idx = next_write_idx;
if !self.tm_handler.handle_tm_sending(
&mut self.base.tm_buffer,
self.base.tm_source.as_mut(),
&mut connection_result,
&mut stream,
)? {
// No TC read, no TM was sent, but the client has not disconnected.
// Perform an inner delay to avoid burning CPU time.
thread::sleep(self.base.inner_loop_delay);
}
}
_ => {
return Err(TcpTmtcError::Io(e));
}
},
}
}
self.tm_handler.handle_tm_sending(
&mut self.base.tm_buffer,
self.base.tm_source.as_mut(),
&mut connection_result,
&mut stream,
)?;
Ok(connection_result)
}
}
/// TCP TMTC server implementation for exchange of generic TMTC packets which are framed with the /// TCP TMTC server implementation for exchange of generic TMTC packets which are framed with the
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). /// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
/// ///