TCP Server #77

Merged
muellerr merged 52 commits from tcp-server into main 2023-09-21 18:11:38 +02:00
2 changed files with 98 additions and 22 deletions
Showing only changes of commit 0e6d903942 - Show all commits

View File

@ -4,7 +4,6 @@ use std::net::SocketAddr;
use std::net::{TcpListener, ToSocketAddrs};
use crate::tmtc::{ReceivesTc, TmPacketSource};
use core::fmt::Display;
use thiserror::Error;
// Re-export the TMTC in COBS server.
@ -13,7 +12,7 @@ pub use crate::hal::host::tcp_with_cobs_server::{
};
#[derive(Error, Debug)]
pub enum TcpTmtcError<TmError: Display, TcError: Display> {
pub enum TcpTmtcError<TmError, TcError> {
#[error("TM retrieval error: {0}")]
TmError(TmError),
#[error("TC retrieval error: {0}")]
@ -33,9 +32,9 @@ pub struct ConnectionResult {
pub(crate) struct TcpTmtcServerBase<TcError, TmError> {
pub(crate) listener: TcpListener,
pub(crate) tm_source: Box<dyn TmPacketSource<Error = TmError>>,
pub(crate) tm_source: Box<dyn TmPacketSource<Error = TmError> + Send>,
pub(crate) tm_buffer: Vec<u8>,
pub(crate) tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
pub(crate) tc_receiver: Box<dyn ReceivesTc<Error = TcError> + Send>,
pub(crate) tc_buffer: Vec<u8>,
}
@ -43,9 +42,9 @@ impl<TcError, TmError> TcpTmtcServerBase<TcError, TmError> {
pub(crate) fn new<A: ToSocketAddrs>(
addr: A,
tm_buffer_size: usize,
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
tm_source: Box<dyn TmPacketSource<Error = TmError> + Send>,
tc_buffer_size: usize,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError> + Send>,
) -> Result<Self, std::io::Error> {
Ok(Self {
listener: TcpListener::bind(addr)?,

View File

@ -3,7 +3,6 @@ use alloc::vec;
use cobs::decode_in_place;
use cobs::encode;
use cobs::max_encoding_length;
use core::fmt::Display;
use std::io::Read;
use std::io::Write;
use std::net::ToSocketAddrs;
@ -25,19 +24,18 @@ use super::tcp_server::TcpTmtcError;
///
/// The server wil use the [parse_buffer_for_cobs_encoded_packets] function to parse for packets
/// and pass them to a generic TC receiver.
///
pub struct TcpTmtcInCobsServer<TcError, TmError> {
base: TcpTmtcServerBase<TcError, TmError>,
tm_encoding_buffer: Vec<u8>,
}
impl<TcError: 'static + Display, TmError: 'static + Display> TcpTmtcInCobsServer<TcError, TmError> {
impl<TcError: 'static, TmError: 'static> TcpTmtcInCobsServer<TcError, TmError> {
pub fn new<A: ToSocketAddrs>(
addr: A,
tm_buffer_size: usize,
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
tm_source: Box<dyn TmPacketSource<Error = TmError> + Send>,
tc_buffer_size: usize,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError> + Send>,
) -> Result<Self, std::io::Error> {
Ok(Self {
base: TcpTmtcServerBase::new(
@ -170,20 +168,31 @@ pub fn parse_buffer_for_cobs_encoded_packets<E>(
#[cfg(test)]
mod tests {
use crate::tmtc::ReceivesTcCore;
use alloc::vec::Vec;
use core::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use std::{
io::Write,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
sync::Mutex,
thread,
};
use crate::tmtc::{ReceivesTcCore, TmPacketSource};
use alloc::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec};
use cobs::encode;
use super::parse_buffer_for_cobs_encoded_packets;
use super::{parse_buffer_for_cobs_encoded_packets, TcpTmtcInCobsServer};
const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5];
#[derive(Default)]
struct TestSender {
struct TestTcSender {
received_tcs: Vec<Vec<u8>>,
}
impl ReceivesTcCore for TestSender {
impl ReceivesTcCore for TestTcSender {
type Error = ();
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
@ -192,6 +201,32 @@ mod tests {
}
}
#[derive(Default, Clone)]
struct TmSource {
shared_tm_source: Arc<Mutex<VecDeque<Vec<u8>>>>,
}
impl TmSource {
fn new() -> Self {
Self {
shared_tm_source: Default::default(),
}
}
fn add_tm(&mut self, tm: &[u8]) {
let mut shared_tm_source = self.shared_tm_source.lock().unwrap();
shared_tm_source.push_back(tm.to_vec());
}
}
impl TmPacketSource for TmSource {
type Error = ();
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
Ok(0)
}
}
fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
encoded_buf[*current_idx] = 0;
*current_idx += 1;
@ -202,7 +237,7 @@ mod tests {
#[test]
fn test_parsing_simple_packet() {
let mut test_sender = TestSender::default();
let mut test_sender = TestTcSender::default();
let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx);
@ -221,7 +256,7 @@ mod tests {
#[test]
fn test_parsing_consecutive_packets() {
let mut test_sender = TestSender::default();
let mut test_sender = TestTcSender::default();
let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx);
@ -250,7 +285,7 @@ mod tests {
#[test]
fn test_split_tail_packet_only() {
let mut test_sender = TestSender::default();
let mut test_sender = TestTcSender::default();
let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx);
@ -268,7 +303,7 @@ mod tests {
}
fn generic_test_split_packet(cut_off: usize) {
let mut test_sender = TestSender::default();
let mut test_sender = TestTcSender::default();
let mut encoded_buf: [u8; 16] = [0; 16];
let inverted_packet: [u8; 5] = [5, 4, 3, 2, 1];
assert!(cut_off < inverted_packet.len() + 1);
@ -319,7 +354,7 @@ mod tests {
#[test]
fn test_zero_at_end() {
let mut test_sender = TestSender::default();
let mut test_sender = TestTcSender::default();
let mut encoded_buf: [u8; 16] = [0; 16];
let mut next_write_idx = 0;
let mut current_idx = 0;
@ -344,7 +379,7 @@ mod tests {
#[test]
fn test_all_zeroes() {
let mut test_sender = TestSender::default();
let mut test_sender = TestTcSender::default();
let mut all_zeroes: [u8; 5] = [0; 5];
let mut next_write_idx = 0;
let packets = parse_buffer_for_cobs_encoded_packets(
@ -358,4 +393,46 @@ mod tests {
assert!(test_sender.received_tcs.is_empty());
assert_eq!(next_write_idx, 0);
}
#[test]
fn test_server_basic() {
let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777);
let tc_receiver = TestTcSender::default();
let tm_source = TmSource::default();
let mut tcp_server = TcpTmtcInCobsServer::new(
dest_addr,
1024,
Box::new(tm_source),
1024,
Box::new(tc_receiver),
)
.expect("TCP server generation failed");
let conn_handled: Arc<AtomicBool> = Default::default();
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_next_connection();
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
set_if_done.store(true, Ordering::Relaxed);
});
// Send TC to server now.
let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx);
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
stream
.write_all(&encoded_buf)
.expect("writing to TCP server failed");
// A certain amount of time is allowed for the transaction to complete.
for _ in 0..3 {
if !conn_handled.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(1));
}
}
if !conn_handled.load(Ordering::Relaxed) {
panic!("connection was not handled properly");
}
}
}