diff --git a/satrs-core/src/hal/host/tcp_server.rs b/satrs-core/src/hal/host/tcp_server.rs index c518bad..11818c5 100644 --- a/satrs-core/src/hal/host/tcp_server.rs +++ b/satrs-core/src/hal/host/tcp_server.rs @@ -4,7 +4,6 @@ use std::net::SocketAddr; use std::net::{TcpListener, ToSocketAddrs}; use crate::tmtc::{ReceivesTc, TmPacketSource}; -use core::fmt::Display; use thiserror::Error; // Re-export the TMTC in COBS server. @@ -13,7 +12,7 @@ pub use crate::hal::host::tcp_with_cobs_server::{ }; #[derive(Error, Debug)] -pub enum TcpTmtcError { +pub enum TcpTmtcError { #[error("TM retrieval error: {0}")] TmError(TmError), #[error("TC retrieval error: {0}")] @@ -33,9 +32,9 @@ pub struct ConnectionResult { pub(crate) struct TcpTmtcServerBase { pub(crate) listener: TcpListener, - pub(crate) tm_source: Box>, + pub(crate) tm_source: Box + Send>, pub(crate) tm_buffer: Vec, - pub(crate) tc_receiver: Box>, + pub(crate) tc_receiver: Box + Send>, pub(crate) tc_buffer: Vec, } @@ -43,9 +42,9 @@ impl TcpTmtcServerBase { pub(crate) fn new( addr: A, tm_buffer_size: usize, - tm_source: Box>, + tm_source: Box + Send>, tc_buffer_size: usize, - tc_receiver: Box>, + tc_receiver: Box + Send>, ) -> Result { Ok(Self { listener: TcpListener::bind(addr)?, diff --git a/satrs-core/src/hal/host/tcp_with_cobs_server.rs b/satrs-core/src/hal/host/tcp_with_cobs_server.rs index 539e491..0dcb1b6 100644 --- a/satrs-core/src/hal/host/tcp_with_cobs_server.rs +++ b/satrs-core/src/hal/host/tcp_with_cobs_server.rs @@ -3,7 +3,6 @@ use alloc::vec; use cobs::decode_in_place; use cobs::encode; use cobs::max_encoding_length; -use core::fmt::Display; use std::io::Read; use std::io::Write; use std::net::ToSocketAddrs; @@ -25,19 +24,18 @@ use super::tcp_server::TcpTmtcError; /// /// The server wil use the [parse_buffer_for_cobs_encoded_packets] function to parse for packets /// and pass them to a generic TC receiver. -/// pub struct TcpTmtcInCobsServer { base: TcpTmtcServerBase, tm_encoding_buffer: Vec, } -impl TcpTmtcInCobsServer { +impl TcpTmtcInCobsServer { pub fn new( addr: A, tm_buffer_size: usize, - tm_source: Box>, + tm_source: Box + Send>, tc_buffer_size: usize, - tc_receiver: Box>, + tc_receiver: Box + Send>, ) -> Result { Ok(Self { base: TcpTmtcServerBase::new( @@ -170,20 +168,31 @@ pub fn parse_buffer_for_cobs_encoded_packets( #[cfg(test)] mod tests { - use crate::tmtc::ReceivesTcCore; - use alloc::vec::Vec; + use core::{ + sync::atomic::{AtomicBool, Ordering}, + time::Duration, + }; + use std::{ + io::Write, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, + sync::Mutex, + thread, + }; + + use crate::tmtc::{ReceivesTcCore, TmPacketSource}; + use alloc::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec}; use cobs::encode; - use super::parse_buffer_for_cobs_encoded_packets; + use super::{parse_buffer_for_cobs_encoded_packets, TcpTmtcInCobsServer}; const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5]; #[derive(Default)] - struct TestSender { + struct TestTcSender { received_tcs: Vec>, } - impl ReceivesTcCore for TestSender { + impl ReceivesTcCore for TestTcSender { type Error = (); fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { @@ -192,6 +201,32 @@ mod tests { } } + #[derive(Default, Clone)] + struct TmSource { + shared_tm_source: Arc>>>, + } + + impl TmSource { + fn new() -> Self { + Self { + shared_tm_source: Default::default(), + } + } + + fn add_tm(&mut self, tm: &[u8]) { + let mut shared_tm_source = self.shared_tm_source.lock().unwrap(); + shared_tm_source.push_back(tm.to_vec()); + } + } + + impl TmPacketSource for TmSource { + type Error = (); + + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { + Ok(0) + } + } + fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) { encoded_buf[*current_idx] = 0; *current_idx += 1; @@ -202,7 +237,7 @@ mod tests { #[test] fn test_parsing_simple_packet() { - let mut test_sender = TestSender::default(); + let mut test_sender = TestTcSender::default(); let mut encoded_buf: [u8; 16] = [0; 16]; let mut current_idx = 0; encode_simple_packet(&mut encoded_buf, &mut current_idx); @@ -221,7 +256,7 @@ mod tests { #[test] fn test_parsing_consecutive_packets() { - let mut test_sender = TestSender::default(); + let mut test_sender = TestTcSender::default(); let mut encoded_buf: [u8; 16] = [0; 16]; let mut current_idx = 0; encode_simple_packet(&mut encoded_buf, &mut current_idx); @@ -250,7 +285,7 @@ mod tests { #[test] fn test_split_tail_packet_only() { - let mut test_sender = TestSender::default(); + let mut test_sender = TestTcSender::default(); let mut encoded_buf: [u8; 16] = [0; 16]; let mut current_idx = 0; encode_simple_packet(&mut encoded_buf, &mut current_idx); @@ -268,7 +303,7 @@ mod tests { } fn generic_test_split_packet(cut_off: usize) { - let mut test_sender = TestSender::default(); + let mut test_sender = TestTcSender::default(); let mut encoded_buf: [u8; 16] = [0; 16]; let inverted_packet: [u8; 5] = [5, 4, 3, 2, 1]; assert!(cut_off < inverted_packet.len() + 1); @@ -319,7 +354,7 @@ mod tests { #[test] fn test_zero_at_end() { - let mut test_sender = TestSender::default(); + let mut test_sender = TestTcSender::default(); let mut encoded_buf: [u8; 16] = [0; 16]; let mut next_write_idx = 0; let mut current_idx = 0; @@ -344,7 +379,7 @@ mod tests { #[test] fn test_all_zeroes() { - let mut test_sender = TestSender::default(); + let mut test_sender = TestTcSender::default(); let mut all_zeroes: [u8; 5] = [0; 5]; let mut next_write_idx = 0; let packets = parse_buffer_for_cobs_encoded_packets( @@ -358,4 +393,46 @@ mod tests { assert!(test_sender.received_tcs.is_empty()); assert_eq!(next_write_idx, 0); } + + #[test] + fn test_server_basic() { + let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777); + let tc_receiver = TestTcSender::default(); + let tm_source = TmSource::default(); + let mut tcp_server = TcpTmtcInCobsServer::new( + dest_addr, + 1024, + Box::new(tm_source), + 1024, + Box::new(tc_receiver), + ) + .expect("TCP server generation failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + set_if_done.store(true, Ordering::Relaxed); + }); + // Send TC to server now. + let mut encoded_buf: [u8; 16] = [0; 16]; + let mut current_idx = 0; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + stream + .write_all(&encoded_buf) + .expect("writing to TCP server failed"); + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(1)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + } }