diff --git a/satrs-core/src/hal/host/tcp_server.rs b/satrs-core/src/hal/host/tcp_server.rs index 1b3bd7c..cfa3963 100644 --- a/satrs-core/src/hal/host/tcp_server.rs +++ b/satrs-core/src/hal/host/tcp_server.rs @@ -1,32 +1,68 @@ use alloc::boxed::Box; use alloc::vec; use cobs::decode_in_place; +use core::fmt::Display; +use std::io::Write; use std::net::ToSocketAddrs; use std::vec::Vec; use std::{io::Read, net::TcpListener}; +use thiserror::Error; use crate::tmtc::ReceivesTc; -pub struct TcpTcServer { - listener: TcpListener, - tc_receiver: Box>, - reader_vec: Vec, +pub trait TmPacketSource { + type Error; + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result; } -impl TcpTcServer { +pub struct TcpTmtcServer { + listener: TcpListener, + tm_source: Box>, + tm_buffer: Vec, + tc_receiver: Box>, + tc_buffer: Vec, + num_received_tcs: u32, + num_sent_tms: u32, +} + +#[derive(Error, Debug)] +pub enum TcpTmtcError { + #[error("TM retrieval error: {0}")] + TmError(TmError), + #[error("TC retrieval error: {0}")] + TcError(TcError), + #[error("io error: {0}")] + Io(#[from] std::io::Error), +} + +impl TcpTmtcServer { pub fn new( addr: A, - tc_receiver: Box>, - reader_vec_size: usize, + tm_buffer_size: usize, + tm_source: Box>, + tc_buffer_size: usize, + tc_receiver: Box>, ) -> Result { - Ok(TcpTcServer { + Ok(Self { listener: TcpListener::bind(addr)?, + tm_source, + tm_buffer: vec![0; tm_buffer_size], tc_receiver, - reader_vec: vec![0; reader_vec_size], + tc_buffer: vec![0; tc_buffer_size], + num_received_tcs: 0, + num_sent_tms: 0, }) } - pub fn handle_connections(&mut self) -> Result<(), std::io::Error> { + pub fn number_of_received_tcs(&self) -> u32 { + self.num_received_tcs + } + + pub fn number_of_sent_tms(&self) -> u32 { + self.num_sent_tms + } + + pub fn handle_connections(&mut self) -> Result<(), TcpTmtcError> { let mut current_write_idx; let mut next_write_idx = 0; for stream in self.listener.incoming() { @@ -34,27 +70,43 @@ impl TcpTcServer { next_write_idx = 0; let mut stream = stream?; loop { - let read_len = stream.read(&mut self.reader_vec[current_write_idx..])?; + let read_len = stream.read(&mut self.tc_buffer[current_write_idx..])?; if read_len > 0 { current_write_idx += read_len; - if current_write_idx == self.reader_vec.capacity() { + if current_write_idx == self.tc_buffer.capacity() { // Reader vec full, need to parse for packets. - let parse_result = parse_buffer_for_cobs_encoded_packets( - &mut self.reader_vec, + self.num_received_tcs += parse_buffer_for_cobs_encoded_packets( + &mut self.tc_buffer[..current_write_idx], self.tc_receiver.as_mut(), &mut next_write_idx, - ); + ) + .map_err(|e| TcpTmtcError::TcError(e))?; } + current_write_idx = next_write_idx; continue; } break; } if current_write_idx > 0 { - let parse_result = parse_buffer_for_cobs_encoded_packets( - &mut self.reader_vec[..current_write_idx], + self.num_received_tcs += parse_buffer_for_cobs_encoded_packets( + &mut self.tc_buffer[..current_write_idx], self.tc_receiver.as_mut(), &mut next_write_idx, - ); + ) + .map_err(|e| TcpTmtcError::TcError(e))?; + } + loop { + // Write TM until TM source is exhausted. For now, there is no limit for the amount + // of TM written this way. + let read_tm_len = self + .tm_source + .retrieve_packet(&mut self.tm_buffer) + .map_err(|e| TcpTmtcError::TmError(e))?; + if read_tm_len == 0 { + break; + } + self.num_sent_tms += 1; + stream.write_all(&self.tm_buffer[..read_tm_len])?; } } Ok(())