From 5aa339286acc97084dcddf284e38d113a2bf7bfe Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 18 Sep 2023 00:18:01 +0200 Subject: [PATCH] move generic server --- satrs-core/src/hal/std/tcp_server.rs | 167 ++++++++++++++++- .../src/hal/std/tcp_with_cobs_server.rs | 173 +----------------- 2 files changed, 169 insertions(+), 171 deletions(-) diff --git a/satrs-core/src/hal/std/tcp_server.rs b/satrs-core/src/hal/std/tcp_server.rs index 49592c0..436cbfc 100644 --- a/satrs-core/src/hal/std/tcp_server.rs +++ b/satrs-core/src/hal/std/tcp_server.rs @@ -3,8 +3,10 @@ use alloc::vec; use alloc::{boxed::Box, vec::Vec}; use core::time::Duration; use socket2::{Domain, Socket, Type}; -use std::net::SocketAddr; +use std::io::Read; use std::net::TcpListener; +use std::net::{SocketAddr, TcpStream}; +use std::thread; use crate::tmtc::{ReceivesTc, TmPacketSource}; use thiserror::Error; @@ -85,6 +87,169 @@ pub struct ConnectionResult { pub num_sent_tms: u32, } +pub trait TcpTcHandler { + fn handle_tc_parsing( + &mut self, + tc_buffer: &mut [u8], + tc_receiver: &mut dyn ReceivesTc, + conn_result: &mut ConnectionResult, + current_write_idx: usize, + next_write_idx: &mut usize, + ) -> Result<(), TcpTmtcError>; +} +pub trait TcpTmHandler { + fn handle_tm_sending( + &mut self, + tm_buffer: &mut [u8], + tm_source: &mut dyn TmPacketSource, + conn_result: &mut ConnectionResult, + stream: &mut TcpStream, + ) -> Result>; +} + +pub struct TcpTmtcGenericServer< + TmError, + TcError, + TmHandler: TcpTmHandler, + TcHandler: TcpTcHandler, +> { + base: TcpTmtcServerBase, + tc_handler: TcHandler, + tm_handler: TmHandler, +} + +impl< + TmError: 'static, + TcError: 'static, + TmHandler: TcpTmHandler, + TcHandler: TcpTcHandler, + > TcpTmtcGenericServer +{ + /// Create a new TMTC server which exchanges TMTC packets encoded with + /// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). + /// + /// ## Parameter + /// + /// * `cfg` - Configuration of the server. + /// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are + /// then sent back to the client. + /// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded + /// to this TC receiver. + pub fn new( + cfg: ServerConfig, + tc_handler: TcHandler, + tm_handler: TmHandler, + tm_source: Box + Send>, + tc_receiver: Box + Send>, + ) -> Result, std::io::Error> { + Ok(Self { + base: TcpTmtcServerBase::new(cfg, tm_source, tc_receiver)?, + tc_handler, + tm_handler, + }) + } + + /// Retrieve the internal [TcpListener] class. + pub fn listener(&mut self) -> &mut TcpListener { + self.base.listener() + } + + /// Can be used to retrieve the local assigned address of the TCP server. This is especially + /// useful if using the port number 0 for OS auto-assignment. + pub fn local_addr(&self) -> std::io::Result { + self.base.local_addr() + } + + /// This call is used to handle the next connection to a client. Right now, it performs + /// the following steps: + /// + /// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API + /// until a client connects. + /// 2. It reads all the telecommands from the client, which are expected to be COBS + /// encoded packets. + /// 3. After reading and parsing all telecommands, it sends back all telemetry it can retrieve + /// from the user specified [TmPacketSource] back to the client. + pub fn handle_next_connection( + &mut self, + ) -> Result> { + let mut connection_result = ConnectionResult::default(); + let mut current_write_idx; + let mut next_write_idx = 0; + let (mut stream, addr) = self.base.listener.accept()?; + stream.set_nonblocking(true)?; + connection_result.addr = Some(addr); + current_write_idx = next_write_idx; + loop { + let read_result = stream.read(&mut self.base.tc_buffer[current_write_idx..]); + match read_result { + Ok(0) => { + // Connection closed by client. If any TC was read, parse for complete packets. + // After that, break the outer loop. + if current_write_idx > 0 { + self.tc_handler.handle_tc_parsing( + &mut self.base.tc_buffer, + self.base.tc_receiver.as_mut(), + &mut connection_result, + current_write_idx, + &mut next_write_idx, + )?; + } + break; + } + Ok(read_len) => { + current_write_idx += read_len; + // TC buffer is full, we must parse for complete packets now. + if current_write_idx == self.base.tc_buffer.capacity() { + self.tc_handler.handle_tc_parsing( + &mut self.base.tc_buffer, + self.base.tc_receiver.as_mut(), + &mut connection_result, + current_write_idx, + &mut next_write_idx, + )?; + current_write_idx = next_write_idx; + } + } + Err(e) => match e.kind() { + // As per [TcpStream::set_read_timeout] documentation, this should work for + // both UNIX and Windows. + std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => { + self.tc_handler.handle_tc_parsing( + &mut self.base.tc_buffer, + self.base.tc_receiver.as_mut(), + &mut connection_result, + current_write_idx, + &mut next_write_idx, + )?; + current_write_idx = next_write_idx; + + if !self.tm_handler.handle_tm_sending( + &mut self.base.tm_buffer, + self.base.tm_source.as_mut(), + &mut connection_result, + &mut stream, + )? { + // No TC read, no TM was sent, but the client has not disconnected. + // Perform an inner delay to avoid burning CPU time. + thread::sleep(self.base.inner_loop_delay); + } + } + _ => { + return Err(TcpTmtcError::Io(e)); + } + }, + } + } + self.tm_handler.handle_tm_sending( + &mut self.base.tm_buffer, + self.base.tm_source.as_mut(), + &mut connection_result, + &mut stream, + )?; + Ok(connection_result) + } +} + pub(crate) struct TcpTmtcServerBase { pub(crate) listener: TcpListener, pub(crate) inner_loop_delay: Duration, diff --git a/satrs-core/src/hal/std/tcp_with_cobs_server.rs b/satrs-core/src/hal/std/tcp_with_cobs_server.rs index c013c4c..ad1913a 100644 --- a/satrs-core/src/hal/std/tcp_with_cobs_server.rs +++ b/satrs-core/src/hal/std/tcp_with_cobs_server.rs @@ -3,32 +3,18 @@ use alloc::vec; use cobs::decode_in_place; use cobs::encode; use delegate::delegate; -use std::io::Read; use std::io::Write; use std::net::SocketAddr; use std::net::TcpListener; use std::net::TcpStream; -use std::println; -use std::thread; use std::vec::Vec; -use crate::hal::std::tcp_server::{ServerConfig, TcpTmtcServerBase}; use crate::tmtc::ReceivesTc; use crate::tmtc::TmPacketSource; -use super::tcp_server::ConnectionResult; -use super::tcp_server::TcpTmtcError; - -pub trait TcpTcHandler { - fn handle_tc_parsing( - &mut self, - tc_buffer: &mut [u8], - tc_receiver: &mut dyn ReceivesTc, - conn_result: &mut ConnectionResult, - current_write_idx: usize, - next_write_idx: &mut usize, - ) -> Result<(), TcpTmtcError>; -} +use crate::hal::std::tcp_server::{ + ConnectionResult, ServerConfig, TcpTcHandler, TcpTmHandler, TcpTmtcError, TcpTmtcGenericServer, +}; #[derive(Default)] struct CobsTcParser {} @@ -53,16 +39,6 @@ impl TcpTcHandler for CobsTcParser { } } -pub trait TcpTmHandler { - fn handle_tm_sending( - &mut self, - tm_buffer: &mut [u8], - tm_source: &mut dyn TmPacketSource, - conn_result: &mut ConnectionResult, - stream: &mut TcpStream, - ) -> Result>; -} - struct CobsTmParser { tm_encoding_buffer: Vec, } @@ -114,149 +90,6 @@ impl TcpTmHandler for CobsTmParser { } } -pub struct TcpTmtcGenericServer< - TmError, - TcError, - TmHandler: TcpTmHandler, - TcHandler: TcpTcHandler, -> { - base: TcpTmtcServerBase, - tc_handler: TcHandler, - tm_handler: TmHandler, -} - -impl< - TmError: 'static, - TcError: 'static, - TmHandler: TcpTmHandler, - TcHandler: TcpTcHandler, - > TcpTmtcGenericServer -{ - /// Create a new TMTC server which exchanges TMTC packets encoded with - /// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). - /// - /// ## Parameter - /// - /// * `cfg` - Configuration of the server. - /// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are - /// then sent back to the client. - /// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded - /// to this TC receiver. - pub fn new( - cfg: ServerConfig, - tc_handler: TcHandler, - tm_handler: TmHandler, - tm_source: Box + Send>, - tc_receiver: Box + Send>, - ) -> Result, std::io::Error> { - Ok(Self { - base: TcpTmtcServerBase::new(cfg, tm_source, tc_receiver)?, - tc_handler, - tm_handler, - }) - } - - /// Retrieve the internal [TcpListener] class. - pub fn listener(&mut self) -> &mut TcpListener { - self.base.listener() - } - - /// Can be used to retrieve the local assigned address of the TCP server. This is especially - /// useful if using the port number 0 for OS auto-assignment. - pub fn local_addr(&self) -> std::io::Result { - self.base.local_addr() - } - - /// This call is used to handle the next connection to a client. Right now, it performs - /// the following steps: - /// - /// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API - /// until a client connects. - /// 2. It reads all the telecommands from the client, which are expected to be COBS - /// encoded packets. - /// 3. After reading and parsing all telecommands, it sends back all telemetry it can retrieve - /// from the user specified [TmPacketSource] back to the client. - pub fn handle_next_connection( - &mut self, - ) -> Result> { - let mut connection_result = ConnectionResult::default(); - let mut current_write_idx; - let mut next_write_idx = 0; - let (mut stream, addr) = self.base.listener.accept()?; - stream.set_nonblocking(true)?; - connection_result.addr = Some(addr); - current_write_idx = next_write_idx; - loop { - let read_result = stream.read(&mut self.base.tc_buffer[current_write_idx..]); - match read_result { - Ok(0) => { - // Connection closed by client. If any TC was read, parse for complete packets. - // After that, break the outer loop. - if current_write_idx > 0 { - self.tc_handler.handle_tc_parsing( - &mut self.base.tc_buffer, - self.base.tc_receiver.as_mut(), - &mut connection_result, - current_write_idx, - &mut next_write_idx, - )?; - } - break; - } - Ok(read_len) => { - current_write_idx += read_len; - // TC buffer is full, we must parse for complete packets now. - if current_write_idx == self.base.tc_buffer.capacity() { - self.tc_handler.handle_tc_parsing( - &mut self.base.tc_buffer, - self.base.tc_receiver.as_mut(), - &mut connection_result, - current_write_idx, - &mut next_write_idx, - )?; - current_write_idx = next_write_idx; - } - } - Err(e) => match e.kind() { - // As per [TcpStream::set_read_timeout] documentation, this should work for - // both UNIX and Windows. - std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => { - self.tc_handler.handle_tc_parsing( - &mut self.base.tc_buffer, - self.base.tc_receiver.as_mut(), - &mut connection_result, - current_write_idx, - &mut next_write_idx, - )?; - current_write_idx = next_write_idx; - - if !self.tm_handler.handle_tm_sending( - &mut self.base.tm_buffer, - self.base.tm_source.as_mut(), - &mut connection_result, - &mut stream, - )? { - // No TC read, no TM was sent, but the client has not disconnected. - // Perform an inner delay to avoid burning CPU time. - thread::sleep(self.base.inner_loop_delay); - } - } - _ => { - return Err(TcpTmtcError::Io(e)); - } - }, - } - } - self.tm_handler.handle_tm_sending( - &mut self.base.tm_buffer, - self.base.tm_source.as_mut(), - &mut connection_result, - &mut stream, - )?; - Ok(connection_result) - } -} - /// TCP TMTC server implementation for exchange of generic TMTC packets which are framed with the /// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). ///