diff --git a/satrs-core/src/hal/std/tcp_cobs_server.rs b/satrs-core/src/hal/std/tcp_cobs_server.rs index dc00c44..2d14589 100644 --- a/satrs-core/src/hal/std/tcp_cobs_server.rs +++ b/satrs-core/src/hal/std/tcp_cobs_server.rs @@ -1,4 +1,3 @@ -use alloc::boxed::Box; use alloc::vec; use cobs::encode; use delegate::delegate; @@ -29,7 +28,6 @@ impl TcpTcParser for CobsTcParser { current_write_idx: usize, next_write_idx: &mut usize, ) -> Result<(), TcpTmtcError> { - // Reader vec full, need to parse for packets. conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets( &mut tc_buffer[..current_write_idx], tc_receiver.upcast_mut(), @@ -111,11 +109,23 @@ impl TcpTmSender for CobsTmSender { /// /// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs) /// test also serves as the example application for this module. -pub struct TcpTmtcInCobsServer { - generic_server: TcpTmtcGenericServer, +pub struct TcpTmtcInCobsServer< + TmError, + TcError: 'static, + TmSource: TmPacketSource, + TcReceiver: ReceivesTc, +> { + generic_server: + TcpTmtcGenericServer, } -impl TcpTmtcInCobsServer { +impl< + TmError: 'static, + TcError: 'static, + TmSource: TmPacketSource, + TcReceiver: ReceivesTc, + > TcpTmtcInCobsServer +{ /// Create a new TCP TMTC server which exchanges TMTC packets encoded with /// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). /// @@ -128,8 +138,8 @@ impl TcpTmtcInCobsServer { /// forwarded to this TC receiver. pub fn new( cfg: ServerConfig, - tm_source: Box>, - tc_receiver: Box>, + tm_source: TmSource, + tc_receiver: TcReceiver, ) -> Result { Ok(Self { generic_server: TcpTmtcGenericServer::new( @@ -177,7 +187,7 @@ mod tests { ServerConfig, }, }; - use alloc::{boxed::Box, sync::Arc}; + use alloc::sync::Arc; use cobs::encode; use super::TcpTmtcInCobsServer; @@ -202,11 +212,11 @@ mod tests { addr: &SocketAddr, tc_receiver: SyncTcCacher, tm_source: SyncTmSource, - ) -> TcpTmtcInCobsServer<(), ()> { + ) -> TcpTmtcInCobsServer<(), (), SyncTmSource, SyncTcCacher> { TcpTmtcInCobsServer::new( ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), - Box::new(tm_source), - Box::new(tc_receiver), + tm_source, + tc_receiver, ) .expect("TCP server generation failed") } diff --git a/satrs-core/src/hal/std/tcp_server.rs b/satrs-core/src/hal/std/tcp_server.rs index e9fe657..184c4c1 100644 --- a/satrs-core/src/hal/std/tcp_server.rs +++ b/satrs-core/src/hal/std/tcp_server.rs @@ -1,6 +1,6 @@ //! Generic TCP TMTC servers with different TMTC format flavours. use alloc::vec; -use alloc::{boxed::Box, vec::Vec}; +use alloc::vec::Vec; use core::time::Duration; use socket2::{Domain, Socket, Type}; use std::io::Read; @@ -134,20 +134,29 @@ pub trait TcpTmSender { pub struct TcpTmtcGenericServer< TmError, TcError, - TmHandler: TcpTmSender, - TcHandler: TcpTcParser, + TmSource: TmPacketSource, + TcReceiver: ReceivesTc, + TmSender: TcpTmSender, + TcParser: TcpTcParser, > { - base: TcpTmtcServerBase, - tc_handler: TcHandler, - tm_handler: TmHandler, + pub(crate) listener: TcpListener, + pub(crate) inner_loop_delay: Duration, + pub(crate) tm_source: TmSource, + pub(crate) tm_buffer: Vec, + pub(crate) tc_receiver: TcReceiver, + pub(crate) tc_buffer: Vec, + tc_handler: TcParser, + tm_handler: TmSender, } impl< TmError: 'static, TcError: 'static, + TmSource: TmPacketSource, + TcReceiver: ReceivesTc, TmSender: TcpTmSender, TcParser: TcpTcParser, - > TcpTmtcGenericServer + > TcpTmtcGenericServer { /// Create a new generic TMTC server instance. /// @@ -165,25 +174,37 @@ impl< cfg: ServerConfig, tc_parser: TcParser, tm_sender: TmSender, - tm_source: Box>, - tc_receiver: Box>, - ) -> Result, std::io::Error> { + tm_source: TmSource, + tc_receiver: TcReceiver, + ) -> Result { + // Create a TCP listener bound to two addresses. + let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; + socket.set_reuse_address(cfg.reuse_addr)?; + socket.set_reuse_port(cfg.reuse_port)?; + let addr = (cfg.addr).into(); + socket.bind(&addr)?; + socket.listen(128)?; Ok(Self { - base: TcpTmtcServerBase::new(cfg, tm_source, tc_receiver)?, tc_handler: tc_parser, tm_handler: tm_sender, + listener: socket.into(), + inner_loop_delay: cfg.inner_loop_delay, + tm_source, + tm_buffer: vec![0; cfg.tm_buffer_size], + tc_receiver, + tc_buffer: vec![0; cfg.tc_buffer_size], }) } /// Retrieve the internal [TcpListener] class. pub fn listener(&mut self) -> &mut TcpListener { - self.base.listener() + &mut self.listener } /// Can be used to retrieve the local assigned address of the TCP server. This is especially /// useful if using the port number 0 for OS auto-assignment. pub fn local_addr(&self) -> std::io::Result { - self.base.local_addr() + self.listener.local_addr() } /// This call is used to handle the next connection to a client. Right now, it performs @@ -205,20 +226,20 @@ impl< let mut connection_result = ConnectionResult::default(); let mut current_write_idx; let mut next_write_idx = 0; - let (mut stream, addr) = self.base.listener.accept()?; + let (mut stream, addr) = self.listener.accept()?; stream.set_nonblocking(true)?; connection_result.addr = Some(addr); current_write_idx = next_write_idx; loop { - let read_result = stream.read(&mut self.base.tc_buffer[current_write_idx..]); + let read_result = stream.read(&mut self.tc_buffer[current_write_idx..]); match read_result { Ok(0) => { // Connection closed by client. If any TC was read, parse for complete packets. // After that, break the outer loop. if current_write_idx > 0 { self.tc_handler.handle_tc_parsing( - &mut self.base.tc_buffer, - self.base.tc_receiver.as_mut(), + &mut self.tc_buffer, + &mut self.tc_receiver, &mut connection_result, current_write_idx, &mut next_write_idx, @@ -229,10 +250,10 @@ impl< Ok(read_len) => { current_write_idx += read_len; // TC buffer is full, we must parse for complete packets now. - if current_write_idx == self.base.tc_buffer.capacity() { + if current_write_idx == self.tc_buffer.capacity() { self.tc_handler.handle_tc_parsing( - &mut self.base.tc_buffer, - self.base.tc_receiver.as_mut(), + &mut self.tc_buffer, + &mut self.tc_receiver, &mut connection_result, current_write_idx, &mut next_write_idx, @@ -245,8 +266,8 @@ impl< // both UNIX and Windows. std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => { self.tc_handler.handle_tc_parsing( - &mut self.base.tc_buffer, - self.base.tc_receiver.as_mut(), + &mut self.tc_buffer, + &mut self.tc_receiver, &mut connection_result, current_write_idx, &mut next_write_idx, @@ -254,14 +275,14 @@ impl< current_write_idx = next_write_idx; if !self.tm_handler.handle_tm_sending( - &mut self.base.tm_buffer, - self.base.tm_source.as_mut(), + &mut self.tm_buffer, + &mut self.tm_source, &mut connection_result, &mut stream, )? { // No TC read, no TM was sent, but the client has not disconnected. // Perform an inner delay to avoid burning CPU time. - thread::sleep(self.base.inner_loop_delay); + thread::sleep(self.inner_loop_delay); } } _ => { @@ -271,8 +292,8 @@ impl< } } self.tm_handler.handle_tm_sending( - &mut self.base.tm_buffer, - self.base.tm_source.as_mut(), + &mut self.tm_buffer, + &mut self.tm_source, &mut connection_result, &mut stream, )?; @@ -280,47 +301,6 @@ impl< } } -pub(crate) struct TcpTmtcServerBase { - pub(crate) listener: TcpListener, - pub(crate) inner_loop_delay: Duration, - pub(crate) tm_source: Box>, - pub(crate) tm_buffer: Vec, - pub(crate) tc_receiver: Box>, - pub(crate) tc_buffer: Vec, -} - -impl TcpTmtcServerBase { - pub(crate) fn new( - cfg: ServerConfig, - tm_source: Box>, - tc_receiver: Box>, - ) -> Result { - // Create a TCP listener bound to two addresses. - let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; - socket.set_reuse_address(cfg.reuse_addr)?; - socket.set_reuse_port(cfg.reuse_port)?; - let addr = (cfg.addr).into(); - socket.bind(&addr)?; - socket.listen(128)?; - Ok(Self { - listener: socket.into(), - inner_loop_delay: cfg.inner_loop_delay, - tm_source, - tm_buffer: vec![0; cfg.tm_buffer_size], - tc_receiver, - tc_buffer: vec![0; cfg.tc_buffer_size], - }) - } - - pub(crate) fn listener(&mut self) -> &mut TcpListener { - &mut self.listener - } - - pub(crate) fn local_addr(&self) -> std::io::Result { - self.listener.local_addr() - } -} - #[cfg(test)] pub(crate) mod tests { use std::sync::Mutex; diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index c256578..c124a47 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -88,16 +88,31 @@ impl TcpTmSender for SpacepacketsTmSender { /// [spacepackets::PacketId]s as part of the server configuration for that purpose. /// /// ## Example -/// /// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs) /// also serves as the example application for this module. -pub struct TcpSpacepacketsServer { - generic_server: - TcpTmtcGenericServer, +pub struct TcpSpacepacketsServer< + TmError, + TcError: 'static, + TmSource: TmPacketSource, + TcReceiver: ReceivesTc, +> { + generic_server: TcpTmtcGenericServer< + TmError, + TcError, + TmSource, + TcReceiver, + SpacepacketsTmSender, + SpacepacketsTcParser, + >, } -impl TcpSpacepacketsServer { - /// Create a new TCP TMTC server which exchanges CCSDS space packets. +impl< + TmError: 'static, + TcError: 'static, + TmSource: TmPacketSource, + TcReceiver: ReceivesTc, + > TcpSpacepacketsServer +{ /// /// ## Parameter /// @@ -110,8 +125,8 @@ impl TcpSpacepacketsServer /// parsing. This mechanism is used to have a start marker for finding CCSDS packets. pub fn new( cfg: ServerConfig, - tm_source: Box>, - tc_receiver: Box>, + tm_source: TmSource, + tc_receiver: TcReceiver, packet_id_lookup: Box, ) -> Result { Ok(Self { @@ -179,11 +194,11 @@ mod tests { tc_receiver: SyncTcCacher, tm_source: SyncTmSource, packet_id_lookup: HashSet, - ) -> TcpSpacepacketsServer<(), ()> { + ) -> TcpSpacepacketsServer<(), (), SyncTmSource, SyncTcCacher> { TcpSpacepacketsServer::new( ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), - Box::new(tm_source), - Box::new(tc_receiver), + tm_source, + tc_receiver, Box::new(packet_id_lookup), ) .expect("TCP server generation failed") diff --git a/satrs-core/tests/tcp_servers.rs b/satrs-core/tests/tcp_servers.rs index d66c1fb..251eead 100644 --- a/satrs-core/tests/tcp_servers.rs +++ b/satrs-core/tests/tcp_servers.rs @@ -94,8 +94,8 @@ fn test_cobs_server() { tm_source.add_tm(&INVERTED_PACKET); let mut tcp_server = TcpTmtcInCobsServer::new( ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), - Box::new(tm_source), - Box::new(tc_receiver.clone()), + tm_source, + tc_receiver.clone(), ) .expect("TCP server generation failed"); let dest_addr = tcp_server @@ -176,8 +176,8 @@ fn test_ccsds_server() { packet_id_lookup.insert(TEST_PACKET_ID_0); let mut tcp_server = TcpSpacepacketsServer::new( ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), - Box::new(tm_source), - Box::new(tc_receiver.clone()), + tm_source, + tc_receiver.clone(), Box::new(packet_id_lookup), ) .expect("TCP server generation failed"); diff --git a/satrs-example/src/tcp.rs b/satrs-example/src/tcp.rs index a25804a..8b7feff 100644 --- a/satrs-example/src/tcp.rs +++ b/satrs-example/src/tcp.rs @@ -72,7 +72,12 @@ impl TmPacketSourceCore for SyncTcpTmSource { } pub struct TcpTask { - server: TcpSpacepacketsServer<(), CcsdsError>, + server: TcpSpacepacketsServer< + (), + CcsdsError, + SyncTcpTmSource, + CcsdsDistributor, + >, } impl TcpTask { @@ -84,8 +89,8 @@ impl TcpTask { Ok(Self { server: TcpSpacepacketsServer::new( cfg, - Box::new(tm_source), - Box::new(tc_receiver), + tm_source, + tc_receiver, Box::new(PACKET_ID_LOOKUP), )?, })