this intermediate struct is not necessary #86

Merged
muellerr merged 5 commits from simplify-some-tcp-components into main 2023-10-01 15:00:13 +02:00
5 changed files with 107 additions and 97 deletions

View File

@ -1,4 +1,3 @@
use alloc::boxed::Box;
use alloc::vec; use alloc::vec;
use cobs::encode; use cobs::encode;
use delegate::delegate; use delegate::delegate;
@ -29,7 +28,6 @@ impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for CobsTcParser {
current_write_idx: usize, current_write_idx: usize,
next_write_idx: &mut usize, next_write_idx: &mut usize,
) -> Result<(), TcpTmtcError<TmError, TcError>> { ) -> Result<(), TcpTmtcError<TmError, TcError>> {
// Reader vec full, need to parse for packets.
conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets( conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets(
&mut tc_buffer[..current_write_idx], &mut tc_buffer[..current_write_idx],
tc_receiver.upcast_mut(), tc_receiver.upcast_mut(),
@ -111,11 +109,23 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for CobsTmSender {
/// ///
/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs) /// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
/// test also serves as the example application for this module. /// test also serves as the example application for this module.
pub struct TcpTmtcInCobsServer<TmError, TcError: 'static> { pub struct TcpTmtcInCobsServer<
generic_server: TcpTmtcGenericServer<TmError, TcError, CobsTmSender, CobsTcParser>, TmError,
TcError: 'static,
TmSource: TmPacketSource<Error = TmError>,
TcReceiver: ReceivesTc<Error = TcError>,
> {
generic_server:
TcpTmtcGenericServer<TmError, TcError, TmSource, TcReceiver, CobsTmSender, CobsTcParser>,
} }
impl<TmError: 'static, TcError: 'static> TcpTmtcInCobsServer<TmError, TcError> { impl<
TmError: 'static,
TcError: 'static,
TmSource: TmPacketSource<Error = TmError>,
TcReceiver: ReceivesTc<Error = TcError>,
> TcpTmtcInCobsServer<TmError, TcError, TmSource, TcReceiver>
{
/// Create a new TCP TMTC server which exchanges TMTC packets encoded with /// Create a new TCP TMTC server which exchanges TMTC packets encoded with
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). /// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
/// ///
@ -128,8 +138,8 @@ impl<TmError: 'static, TcError: 'static> TcpTmtcInCobsServer<TmError, TcError> {
/// forwarded to this TC receiver. /// forwarded to this TC receiver.
pub fn new( pub fn new(
cfg: ServerConfig, cfg: ServerConfig,
tm_source: Box<dyn TmPacketSource<Error = TmError>>, tm_source: TmSource,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>, tc_receiver: TcReceiver,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
Ok(Self { Ok(Self {
generic_server: TcpTmtcGenericServer::new( generic_server: TcpTmtcGenericServer::new(
@ -177,7 +187,7 @@ mod tests {
ServerConfig, ServerConfig,
}, },
}; };
use alloc::{boxed::Box, sync::Arc}; use alloc::sync::Arc;
use cobs::encode; use cobs::encode;
use super::TcpTmtcInCobsServer; use super::TcpTmtcInCobsServer;
@ -202,11 +212,11 @@ mod tests {
addr: &SocketAddr, addr: &SocketAddr,
tc_receiver: SyncTcCacher, tc_receiver: SyncTcCacher,
tm_source: SyncTmSource, tm_source: SyncTmSource,
) -> TcpTmtcInCobsServer<(), ()> { ) -> TcpTmtcInCobsServer<(), (), SyncTmSource, SyncTcCacher> {
TcpTmtcInCobsServer::new( TcpTmtcInCobsServer::new(
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
Box::new(tm_source), tm_source,
Box::new(tc_receiver), tc_receiver,
) )
.expect("TCP server generation failed") .expect("TCP server generation failed")
} }

View File

@ -1,6 +1,6 @@
//! Generic TCP TMTC servers with different TMTC format flavours. //! Generic TCP TMTC servers with different TMTC format flavours.
use alloc::vec; use alloc::vec;
use alloc::{boxed::Box, vec::Vec}; use alloc::vec::Vec;
use core::time::Duration; use core::time::Duration;
use socket2::{Domain, Socket, Type}; use socket2::{Domain, Socket, Type};
use std::io::Read; use std::io::Read;
@ -134,20 +134,29 @@ pub trait TcpTmSender<TmError, TcError> {
pub struct TcpTmtcGenericServer< pub struct TcpTmtcGenericServer<
TmError, TmError,
TcError, TcError,
TmHandler: TcpTmSender<TmError, TcError>, TmSource: TmPacketSource<Error = TmError>,
TcHandler: TcpTcParser<TmError, TcError>, TcReceiver: ReceivesTc<Error = TcError>,
TmSender: TcpTmSender<TmError, TcError>,
TcParser: TcpTcParser<TmError, TcError>,
> { > {
base: TcpTmtcServerBase<TmError, TcError>, pub(crate) listener: TcpListener,
tc_handler: TcHandler, pub(crate) inner_loop_delay: Duration,
tm_handler: TmHandler, pub(crate) tm_source: TmSource,
pub(crate) tm_buffer: Vec<u8>,
pub(crate) tc_receiver: TcReceiver,
pub(crate) tc_buffer: Vec<u8>,
tc_handler: TcParser,
tm_handler: TmSender,
} }
impl< impl<
TmError: 'static, TmError: 'static,
TcError: 'static, TcError: 'static,
TmSource: TmPacketSource<Error = TmError>,
TcReceiver: ReceivesTc<Error = TcError>,
TmSender: TcpTmSender<TmError, TcError>, TmSender: TcpTmSender<TmError, TcError>,
TcParser: TcpTcParser<TmError, TcError>, TcParser: TcpTcParser<TmError, TcError>,
> TcpTmtcGenericServer<TmError, TcError, TmSender, TcParser> > TcpTmtcGenericServer<TmError, TcError, TmSource, TcReceiver, TmSender, TcParser>
{ {
/// Create a new generic TMTC server instance. /// Create a new generic TMTC server instance.
/// ///
@ -165,25 +174,37 @@ impl<
cfg: ServerConfig, cfg: ServerConfig,
tc_parser: TcParser, tc_parser: TcParser,
tm_sender: TmSender, tm_sender: TmSender,
tm_source: Box<dyn TmPacketSource<Error = TmError>>, tm_source: TmSource,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>, tc_receiver: TcReceiver,
) -> Result<TcpTmtcGenericServer<TmError, TcError, TmSender, TcParser>, std::io::Error> { ) -> Result<Self, std::io::Error> {
// Create a TCP listener bound to two addresses.
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
socket.set_reuse_address(cfg.reuse_addr)?;
socket.set_reuse_port(cfg.reuse_port)?;
let addr = (cfg.addr).into();
socket.bind(&addr)?;
socket.listen(128)?;
Ok(Self { Ok(Self {
base: TcpTmtcServerBase::new(cfg, tm_source, tc_receiver)?,
tc_handler: tc_parser, tc_handler: tc_parser,
tm_handler: tm_sender, tm_handler: tm_sender,
listener: socket.into(),
inner_loop_delay: cfg.inner_loop_delay,
tm_source,
tm_buffer: vec![0; cfg.tm_buffer_size],
tc_receiver,
tc_buffer: vec![0; cfg.tc_buffer_size],
}) })
} }
/// Retrieve the internal [TcpListener] class. /// Retrieve the internal [TcpListener] class.
pub fn listener(&mut self) -> &mut TcpListener { pub fn listener(&mut self) -> &mut TcpListener {
self.base.listener() &mut self.listener
} }
/// Can be used to retrieve the local assigned address of the TCP server. This is especially /// Can be used to retrieve the local assigned address of the TCP server. This is especially
/// useful if using the port number 0 for OS auto-assignment. /// useful if using the port number 0 for OS auto-assignment.
pub fn local_addr(&self) -> std::io::Result<SocketAddr> { pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.base.local_addr() self.listener.local_addr()
} }
/// This call is used to handle the next connection to a client. Right now, it performs /// This call is used to handle the next connection to a client. Right now, it performs
@ -205,20 +226,20 @@ impl<
let mut connection_result = ConnectionResult::default(); let mut connection_result = ConnectionResult::default();
let mut current_write_idx; let mut current_write_idx;
let mut next_write_idx = 0; let mut next_write_idx = 0;
let (mut stream, addr) = self.base.listener.accept()?; let (mut stream, addr) = self.listener.accept()?;
stream.set_nonblocking(true)?; stream.set_nonblocking(true)?;
connection_result.addr = Some(addr); connection_result.addr = Some(addr);
current_write_idx = next_write_idx; current_write_idx = next_write_idx;
loop { loop {
let read_result = stream.read(&mut self.base.tc_buffer[current_write_idx..]); let read_result = stream.read(&mut self.tc_buffer[current_write_idx..]);
match read_result { match read_result {
Ok(0) => { Ok(0) => {
// Connection closed by client. If any TC was read, parse for complete packets. // Connection closed by client. If any TC was read, parse for complete packets.
// After that, break the outer loop. // After that, break the outer loop.
if current_write_idx > 0 { if current_write_idx > 0 {
self.tc_handler.handle_tc_parsing( self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer, &mut self.tc_buffer,
self.base.tc_receiver.as_mut(), &mut self.tc_receiver,
&mut connection_result, &mut connection_result,
current_write_idx, current_write_idx,
&mut next_write_idx, &mut next_write_idx,
@ -229,10 +250,10 @@ impl<
Ok(read_len) => { Ok(read_len) => {
current_write_idx += read_len; current_write_idx += read_len;
// TC buffer is full, we must parse for complete packets now. // TC buffer is full, we must parse for complete packets now.
if current_write_idx == self.base.tc_buffer.capacity() { if current_write_idx == self.tc_buffer.capacity() {
self.tc_handler.handle_tc_parsing( self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer, &mut self.tc_buffer,
self.base.tc_receiver.as_mut(), &mut self.tc_receiver,
&mut connection_result, &mut connection_result,
current_write_idx, current_write_idx,
&mut next_write_idx, &mut next_write_idx,
@ -245,8 +266,8 @@ impl<
// both UNIX and Windows. // both UNIX and Windows.
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => { std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
self.tc_handler.handle_tc_parsing( self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer, &mut self.tc_buffer,
self.base.tc_receiver.as_mut(), &mut self.tc_receiver,
&mut connection_result, &mut connection_result,
current_write_idx, current_write_idx,
&mut next_write_idx, &mut next_write_idx,
@ -254,14 +275,14 @@ impl<
current_write_idx = next_write_idx; current_write_idx = next_write_idx;
if !self.tm_handler.handle_tm_sending( if !self.tm_handler.handle_tm_sending(
&mut self.base.tm_buffer, &mut self.tm_buffer,
self.base.tm_source.as_mut(), &mut self.tm_source,
&mut connection_result, &mut connection_result,
&mut stream, &mut stream,
)? { )? {
// No TC read, no TM was sent, but the client has not disconnected. // No TC read, no TM was sent, but the client has not disconnected.
// Perform an inner delay to avoid burning CPU time. // Perform an inner delay to avoid burning CPU time.
thread::sleep(self.base.inner_loop_delay); thread::sleep(self.inner_loop_delay);
} }
} }
_ => { _ => {
@ -271,8 +292,8 @@ impl<
} }
} }
self.tm_handler.handle_tm_sending( self.tm_handler.handle_tm_sending(
&mut self.base.tm_buffer, &mut self.tm_buffer,
self.base.tm_source.as_mut(), &mut self.tm_source,
&mut connection_result, &mut connection_result,
&mut stream, &mut stream,
)?; )?;
@ -280,47 +301,6 @@ impl<
} }
} }
pub(crate) struct TcpTmtcServerBase<TmError, TcError> {
pub(crate) listener: TcpListener,
pub(crate) inner_loop_delay: Duration,
pub(crate) tm_source: Box<dyn TmPacketSource<Error = TmError>>,
pub(crate) tm_buffer: Vec<u8>,
pub(crate) tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
pub(crate) tc_buffer: Vec<u8>,
}
impl<TmError, TcError> TcpTmtcServerBase<TmError, TcError> {
pub(crate) fn new(
cfg: ServerConfig,
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
) -> Result<Self, std::io::Error> {
// Create a TCP listener bound to two addresses.
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
socket.set_reuse_address(cfg.reuse_addr)?;
socket.set_reuse_port(cfg.reuse_port)?;
let addr = (cfg.addr).into();
socket.bind(&addr)?;
socket.listen(128)?;
Ok(Self {
listener: socket.into(),
inner_loop_delay: cfg.inner_loop_delay,
tm_source,
tm_buffer: vec![0; cfg.tm_buffer_size],
tc_receiver,
tc_buffer: vec![0; cfg.tc_buffer_size],
})
}
pub(crate) fn listener(&mut self) -> &mut TcpListener {
&mut self.listener
}
pub(crate) fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.listener.local_addr()
}
}
#[cfg(test)] #[cfg(test)]
pub(crate) mod tests { pub(crate) mod tests {
use std::sync::Mutex; use std::sync::Mutex;

View File

@ -88,16 +88,31 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for SpacepacketsTmSender {
/// [spacepackets::PacketId]s as part of the server configuration for that purpose. /// [spacepackets::PacketId]s as part of the server configuration for that purpose.
/// ///
/// ## Example /// ## Example
///
/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs) /// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
/// also serves as the example application for this module. /// also serves as the example application for this module.
pub struct TcpSpacepacketsServer<TmError, TcError: 'static> { pub struct TcpSpacepacketsServer<
generic_server: TmError,
TcpTmtcGenericServer<TmError, TcError, SpacepacketsTmSender, SpacepacketsTcParser>, TcError: 'static,
TmSource: TmPacketSource<Error = TmError>,
TcReceiver: ReceivesTc<Error = TcError>,
> {
generic_server: TcpTmtcGenericServer<
TmError,
TcError,
TmSource,
TcReceiver,
SpacepacketsTmSender,
SpacepacketsTcParser,
>,
} }
impl<TmError: 'static, TcError: 'static> TcpSpacepacketsServer<TmError, TcError> { impl<
/// Create a new TCP TMTC server which exchanges CCSDS space packets. TmError: 'static,
TcError: 'static,
TmSource: TmPacketSource<Error = TmError>,
TcReceiver: ReceivesTc<Error = TcError>,
> TcpSpacepacketsServer<TmError, TcError, TmSource, TcReceiver>
{
/// ///
/// ## Parameter /// ## Parameter
/// ///
@ -110,8 +125,8 @@ impl<TmError: 'static, TcError: 'static> TcpSpacepacketsServer<TmError, TcError>
/// parsing. This mechanism is used to have a start marker for finding CCSDS packets. /// parsing. This mechanism is used to have a start marker for finding CCSDS packets.
pub fn new( pub fn new(
cfg: ServerConfig, cfg: ServerConfig,
tm_source: Box<dyn TmPacketSource<Error = TmError>>, tm_source: TmSource,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>, tc_receiver: TcReceiver,
packet_id_lookup: Box<dyn PacketIdLookup + Send>, packet_id_lookup: Box<dyn PacketIdLookup + Send>,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
Ok(Self { Ok(Self {
@ -179,11 +194,11 @@ mod tests {
tc_receiver: SyncTcCacher, tc_receiver: SyncTcCacher,
tm_source: SyncTmSource, tm_source: SyncTmSource,
packet_id_lookup: HashSet<PacketId>, packet_id_lookup: HashSet<PacketId>,
) -> TcpSpacepacketsServer<(), ()> { ) -> TcpSpacepacketsServer<(), (), SyncTmSource, SyncTcCacher> {
TcpSpacepacketsServer::new( TcpSpacepacketsServer::new(
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
Box::new(tm_source), tm_source,
Box::new(tc_receiver), tc_receiver,
Box::new(packet_id_lookup), Box::new(packet_id_lookup),
) )
.expect("TCP server generation failed") .expect("TCP server generation failed")

View File

@ -94,8 +94,8 @@ fn test_cobs_server() {
tm_source.add_tm(&INVERTED_PACKET); tm_source.add_tm(&INVERTED_PACKET);
let mut tcp_server = TcpTmtcInCobsServer::new( let mut tcp_server = TcpTmtcInCobsServer::new(
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
Box::new(tm_source), tm_source,
Box::new(tc_receiver.clone()), tc_receiver.clone(),
) )
.expect("TCP server generation failed"); .expect("TCP server generation failed");
let dest_addr = tcp_server let dest_addr = tcp_server
@ -176,8 +176,8 @@ fn test_ccsds_server() {
packet_id_lookup.insert(TEST_PACKET_ID_0); packet_id_lookup.insert(TEST_PACKET_ID_0);
let mut tcp_server = TcpSpacepacketsServer::new( let mut tcp_server = TcpSpacepacketsServer::new(
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
Box::new(tm_source), tm_source,
Box::new(tc_receiver.clone()), tc_receiver.clone(),
Box::new(packet_id_lookup), Box::new(packet_id_lookup),
) )
.expect("TCP server generation failed"); .expect("TCP server generation failed");

View File

@ -72,7 +72,12 @@ impl TmPacketSourceCore for SyncTcpTmSource {
} }
pub struct TcpTask { pub struct TcpTask {
server: TcpSpacepacketsServer<(), CcsdsError<MpscStoreAndSendError>>, server: TcpSpacepacketsServer<
(),
CcsdsError<MpscStoreAndSendError>,
SyncTcpTmSource,
CcsdsDistributor<MpscStoreAndSendError>,
>,
} }
impl TcpTask { impl TcpTask {
@ -84,8 +89,8 @@ impl TcpTask {
Ok(Self { Ok(Self {
server: TcpSpacepacketsServer::new( server: TcpSpacepacketsServer::new(
cfg, cfg,
Box::new(tm_source), tm_source,
Box::new(tc_receiver), tc_receiver,
Box::new(PACKET_ID_LOOKUP), Box::new(PACKET_ID_LOOKUP),
)?, )?,
}) })