this intermediate struct is not necessary #86

Merged
muellerr merged 5 commits from simplify-some-tcp-components into main 2023-10-01 15:00:13 +02:00
Showing only changes of commit 7654670967 - Show all commits

View File

@ -137,7 +137,13 @@ pub struct TcpTmtcGenericServer<
TmHandler: TcpTmSender<TmError, TcError>, TmHandler: TcpTmSender<TmError, TcError>,
TcHandler: TcpTcParser<TmError, TcError>, TcHandler: TcpTcParser<TmError, TcError>,
> { > {
base: TcpTmtcServerBase<TmError, TcError>, // base: TcpTmtcServerBase<TmError, TcError>,
pub(crate) listener: TcpListener,
pub(crate) inner_loop_delay: Duration,
pub(crate) tm_source: Box<dyn TmPacketSource<Error = TmError>>,
pub(crate) tm_buffer: Vec<u8>,
pub(crate) tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
pub(crate) tc_buffer: Vec<u8>,
tc_handler: TcHandler, tc_handler: TcHandler,
tm_handler: TmHandler, tm_handler: TmHandler,
} }
@ -168,22 +174,34 @@ impl<
tm_source: Box<dyn TmPacketSource<Error = TmError>>, tm_source: Box<dyn TmPacketSource<Error = TmError>>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>, tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
) -> Result<TcpTmtcGenericServer<TmError, TcError, TmSender, TcParser>, std::io::Error> { ) -> Result<TcpTmtcGenericServer<TmError, TcError, TmSender, TcParser>, std::io::Error> {
// Create a TCP listener bound to two addresses.
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
socket.set_reuse_address(cfg.reuse_addr)?;
socket.set_reuse_port(cfg.reuse_port)?;
let addr = (cfg.addr).into();
socket.bind(&addr)?;
socket.listen(128)?;
Ok(Self { Ok(Self {
base: TcpTmtcServerBase::new(cfg, tm_source, tc_receiver)?,
tc_handler: tc_parser, tc_handler: tc_parser,
tm_handler: tm_sender, tm_handler: tm_sender,
listener: socket.into(),
inner_loop_delay: cfg.inner_loop_delay,
tm_source,
tm_buffer: vec![0; cfg.tm_buffer_size],
tc_receiver,
tc_buffer: vec![0; cfg.tc_buffer_size],
}) })
} }
/// Retrieve the internal [TcpListener] class. /// Retrieve the internal [TcpListener] class.
pub fn listener(&mut self) -> &mut TcpListener { pub fn listener(&mut self) -> &mut TcpListener {
self.base.listener() &mut self.listener
} }
/// Can be used to retrieve the local assigned address of the TCP server. This is especially /// Can be used to retrieve the local assigned address of the TCP server. This is especially
/// useful if using the port number 0 for OS auto-assignment. /// useful if using the port number 0 for OS auto-assignment.
pub fn local_addr(&self) -> std::io::Result<SocketAddr> { pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.base.local_addr() self.listener.local_addr()
} }
/// This call is used to handle the next connection to a client. Right now, it performs /// This call is used to handle the next connection to a client. Right now, it performs
@ -205,20 +223,20 @@ impl<
let mut connection_result = ConnectionResult::default(); let mut connection_result = ConnectionResult::default();
let mut current_write_idx; let mut current_write_idx;
let mut next_write_idx = 0; let mut next_write_idx = 0;
let (mut stream, addr) = self.base.listener.accept()?; let (mut stream, addr) = self.listener.accept()?;
stream.set_nonblocking(true)?; stream.set_nonblocking(true)?;
connection_result.addr = Some(addr); connection_result.addr = Some(addr);
current_write_idx = next_write_idx; current_write_idx = next_write_idx;
loop { loop {
let read_result = stream.read(&mut self.base.tc_buffer[current_write_idx..]); let read_result = stream.read(&mut self.tc_buffer[current_write_idx..]);
match read_result { match read_result {
Ok(0) => { Ok(0) => {
// Connection closed by client. If any TC was read, parse for complete packets. // Connection closed by client. If any TC was read, parse for complete packets.
// After that, break the outer loop. // After that, break the outer loop.
if current_write_idx > 0 { if current_write_idx > 0 {
self.tc_handler.handle_tc_parsing( self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer, &mut self.tc_buffer,
self.base.tc_receiver.as_mut(), self.tc_receiver.as_mut(),
&mut connection_result, &mut connection_result,
current_write_idx, current_write_idx,
&mut next_write_idx, &mut next_write_idx,
@ -229,10 +247,10 @@ impl<
Ok(read_len) => { Ok(read_len) => {
current_write_idx += read_len; current_write_idx += read_len;
// TC buffer is full, we must parse for complete packets now. // TC buffer is full, we must parse for complete packets now.
if current_write_idx == self.base.tc_buffer.capacity() { if current_write_idx == self.tc_buffer.capacity() {
self.tc_handler.handle_tc_parsing( self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer, &mut self.tc_buffer,
self.base.tc_receiver.as_mut(), self.tc_receiver.as_mut(),
&mut connection_result, &mut connection_result,
current_write_idx, current_write_idx,
&mut next_write_idx, &mut next_write_idx,
@ -245,8 +263,8 @@ impl<
// both UNIX and Windows. // both UNIX and Windows.
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => { std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
self.tc_handler.handle_tc_parsing( self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer, &mut self.tc_buffer,
self.base.tc_receiver.as_mut(), self.tc_receiver.as_mut(),
&mut connection_result, &mut connection_result,
current_write_idx, current_write_idx,
&mut next_write_idx, &mut next_write_idx,
@ -254,14 +272,14 @@ impl<
current_write_idx = next_write_idx; current_write_idx = next_write_idx;
if !self.tm_handler.handle_tm_sending( if !self.tm_handler.handle_tm_sending(
&mut self.base.tm_buffer, &mut self.tm_buffer,
self.base.tm_source.as_mut(), self.tm_source.as_mut(),
&mut connection_result, &mut connection_result,
&mut stream, &mut stream,
)? { )? {
// No TC read, no TM was sent, but the client has not disconnected. // No TC read, no TM was sent, but the client has not disconnected.
// Perform an inner delay to avoid burning CPU time. // Perform an inner delay to avoid burning CPU time.
thread::sleep(self.base.inner_loop_delay); thread::sleep(self.inner_loop_delay);
} }
} }
_ => { _ => {
@ -271,8 +289,8 @@ impl<
} }
} }
self.tm_handler.handle_tm_sending( self.tm_handler.handle_tm_sending(
&mut self.base.tm_buffer, &mut self.tm_buffer,
self.base.tm_source.as_mut(), self.tm_source.as_mut(),
&mut connection_result, &mut connection_result,
&mut stream, &mut stream,
)?; )?;
@ -280,47 +298,6 @@ impl<
} }
} }
pub(crate) struct TcpTmtcServerBase<TmError, TcError> {
pub(crate) listener: TcpListener,
pub(crate) inner_loop_delay: Duration,
pub(crate) tm_source: Box<dyn TmPacketSource<Error = TmError>>,
pub(crate) tm_buffer: Vec<u8>,
pub(crate) tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
pub(crate) tc_buffer: Vec<u8>,
}
impl<TmError, TcError> TcpTmtcServerBase<TmError, TcError> {
pub(crate) fn new(
cfg: ServerConfig,
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
) -> Result<Self, std::io::Error> {
// Create a TCP listener bound to two addresses.
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
socket.set_reuse_address(cfg.reuse_addr)?;
socket.set_reuse_port(cfg.reuse_port)?;
let addr = (cfg.addr).into();
socket.bind(&addr)?;
socket.listen(128)?;
Ok(Self {
listener: socket.into(),
inner_loop_delay: cfg.inner_loop_delay,
tm_source,
tm_buffer: vec![0; cfg.tm_buffer_size],
tc_receiver,
tc_buffer: vec![0; cfg.tc_buffer_size],
})
}
pub(crate) fn listener(&mut self) -> &mut TcpListener {
&mut self.listener
}
pub(crate) fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.listener.local_addr()
}
}
#[cfg(test)] #[cfg(test)]
pub(crate) mod tests { pub(crate) mod tests {
use std::sync::Mutex; use std::sync::Mutex;