this should do the job
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good

This commit is contained in:
Robin Müller 2023-09-15 18:34:05 +02:00
parent 28801a8952
commit 13cacb0b53
Signed by: muellerr
GPG Key ID: A649FB78196E3849

View File

@ -1,32 +1,68 @@
use alloc::boxed::Box; use alloc::boxed::Box;
use alloc::vec; use alloc::vec;
use cobs::decode_in_place; use cobs::decode_in_place;
use core::fmt::Display;
use std::io::Write;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::vec::Vec; use std::vec::Vec;
use std::{io::Read, net::TcpListener}; use std::{io::Read, net::TcpListener};
use thiserror::Error;
use crate::tmtc::ReceivesTc; use crate::tmtc::ReceivesTc;
pub struct TcpTcServer<E> { pub trait TmPacketSource {
listener: TcpListener, type Error;
tc_receiver: Box<dyn ReceivesTc<Error = E>>, fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error>;
reader_vec: Vec<u8>,
} }
impl<E: 'static> TcpTcServer<E> { pub struct TcpTmtcServer<TcError, TmError> {
listener: TcpListener,
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
tm_buffer: Vec<u8>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
tc_buffer: Vec<u8>,
num_received_tcs: u32,
num_sent_tms: u32,
}
#[derive(Error, Debug)]
pub enum TcpTmtcError<TmError: Display, TcError: Display> {
#[error("TM retrieval error: {0}")]
TmError(TmError),
#[error("TC retrieval error: {0}")]
TcError(TcError),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
}
impl<TcError: 'static + Display, TmError: 'static + Display> TcpTmtcServer<TcError, TmError> {
pub fn new<A: ToSocketAddrs>( pub fn new<A: ToSocketAddrs>(
addr: A, addr: A,
tc_receiver: Box<dyn ReceivesTc<Error = E>>, tm_buffer_size: usize,
reader_vec_size: usize, tm_source: Box<dyn TmPacketSource<Error = TmError>>,
tc_buffer_size: usize,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
Ok(TcpTcServer { Ok(Self {
listener: TcpListener::bind(addr)?, listener: TcpListener::bind(addr)?,
tm_source,
tm_buffer: vec![0; tm_buffer_size],
tc_receiver, tc_receiver,
reader_vec: vec![0; reader_vec_size], tc_buffer: vec![0; tc_buffer_size],
num_received_tcs: 0,
num_sent_tms: 0,
}) })
} }
pub fn handle_connections(&mut self) -> Result<(), std::io::Error> { pub fn number_of_received_tcs(&self) -> u32 {
self.num_received_tcs
}
pub fn number_of_sent_tms(&self) -> u32 {
self.num_sent_tms
}
pub fn handle_connections(&mut self) -> Result<(), TcpTmtcError<TmError, TcError>> {
let mut current_write_idx; let mut current_write_idx;
let mut next_write_idx = 0; let mut next_write_idx = 0;
for stream in self.listener.incoming() { for stream in self.listener.incoming() {
@ -34,27 +70,43 @@ impl<E: 'static> TcpTcServer<E> {
next_write_idx = 0; next_write_idx = 0;
let mut stream = stream?; let mut stream = stream?;
loop { loop {
let read_len = stream.read(&mut self.reader_vec[current_write_idx..])?; let read_len = stream.read(&mut self.tc_buffer[current_write_idx..])?;
if read_len > 0 { if read_len > 0 {
current_write_idx += read_len; current_write_idx += read_len;
if current_write_idx == self.reader_vec.capacity() { if current_write_idx == self.tc_buffer.capacity() {
// Reader vec full, need to parse for packets. // Reader vec full, need to parse for packets.
let parse_result = parse_buffer_for_cobs_encoded_packets( self.num_received_tcs += parse_buffer_for_cobs_encoded_packets(
&mut self.reader_vec, &mut self.tc_buffer[..current_write_idx],
self.tc_receiver.as_mut(), self.tc_receiver.as_mut(),
&mut next_write_idx, &mut next_write_idx,
); )
.map_err(|e| TcpTmtcError::TcError(e))?;
} }
current_write_idx = next_write_idx;
continue; continue;
} }
break; break;
} }
if current_write_idx > 0 { if current_write_idx > 0 {
let parse_result = parse_buffer_for_cobs_encoded_packets( self.num_received_tcs += parse_buffer_for_cobs_encoded_packets(
&mut self.reader_vec[..current_write_idx], &mut self.tc_buffer[..current_write_idx],
self.tc_receiver.as_mut(), self.tc_receiver.as_mut(),
&mut next_write_idx, &mut next_write_idx,
); )
.map_err(|e| TcpTmtcError::TcError(e))?;
}
loop {
// Write TM until TM source is exhausted. For now, there is no limit for the amount
// of TM written this way.
let read_tm_len = self
.tm_source
.retrieve_packet(&mut self.tm_buffer)
.map_err(|e| TcpTmtcError::TmError(e))?;
if read_tm_len == 0 {
break;
}
self.num_sent_tms += 1;
stream.write_all(&self.tm_buffer[..read_tm_len])?;
} }
} }
Ok(()) Ok(())