From 175d995a0ea7129800163c54632b4174baf86a0b Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 21 Sep 2023 18:59:00 +0200 Subject: [PATCH 1/8] those impls are easy.. --- satrs-core/src/encoding/ccsds.rs | 2 +- .../src/hal/std/tcp_spacepackets_server.rs | 69 +++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/satrs-core/src/encoding/ccsds.rs b/satrs-core/src/encoding/ccsds.rs index f8f775f..6bea03d 100644 --- a/satrs-core/src/encoding/ccsds.rs +++ b/satrs-core/src/encoding/ccsds.rs @@ -64,7 +64,7 @@ impl PacketIdLookup for [PacketId] { pub fn parse_buffer_for_ccsds_space_packets( buf: &mut [u8], packet_id_lookup: &(impl PacketIdLookup + ?Sized), - tc_receiver: &mut impl ReceivesTcCore, + tc_receiver: &mut (impl ReceivesTcCore + ?Sized), next_write_idx: &mut usize, ) -> Result { *next_write_idx = 0; diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index 8b13789..6c5c3c5 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -1 +1,70 @@ +use std::{io::Write, net::TcpStream}; +use alloc::boxed::Box; + +use crate::{ + encoding::{ccsds::PacketIdLookup, parse_buffer_for_ccsds_space_packets}, + tmtc::{ReceivesTc, TmPacketSource}, +}; + +use super::tcp_server::{ConnectionResult, TcpTcParser, TcpTmSender, TcpTmtcError}; + +/// Concrete [TcpTcParser] implementation for the []. +pub struct CcsdsTcParser { + packet_id_lookup: Box, +} + +impl TcpTcParser for CcsdsTcParser { + fn handle_tc_parsing( + &mut self, + tc_buffer: &mut [u8], + tc_receiver: &mut dyn ReceivesTc, + conn_result: &mut ConnectionResult, + current_write_idx: usize, + next_write_idx: &mut usize, + ) -> Result<(), TcpTmtcError> { + // Reader vec full, need to parse for packets. + conn_result.num_received_tcs += parse_buffer_for_ccsds_space_packets( + &mut tc_buffer[..current_write_idx], + self.packet_id_lookup.as_ref(), + tc_receiver.upcast_mut(), + next_write_idx, + ) + .map_err(|e| TcpTmtcError::TcError(e))?; + Ok(()) + } +} + +/// Concrete [TcpTmSender] implementation for the []. +#[derive(Default)] +pub struct CcsdsTmSender {} + +impl TcpTmSender for CcsdsTmSender { + fn handle_tm_sending( + &mut self, + tm_buffer: &mut [u8], + tm_source: &mut dyn TmPacketSource, + conn_result: &mut ConnectionResult, + stream: &mut TcpStream, + ) -> Result> { + let mut tm_was_sent = false; + loop { + // Write TM until TM source is exhausted. For now, there is no limit for the amount + // of TM written this way. + let read_tm_len = tm_source + .retrieve_packet(tm_buffer) + .map_err(|e| TcpTmtcError::TmError(e))?; + + if read_tm_len == 0 { + return Ok(tm_was_sent); + } + tm_was_sent = true; + conn_result.num_sent_tms += 1; + + stream.write_all(&tm_buffer[..read_tm_len])?; + } + } +} + +#[cfg(test)] +mod tests {} From 536c5f6949ce239d63cc6f02fa944f0bed1fb569 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 21 Sep 2023 19:10:51 +0200 Subject: [PATCH 2/8] this is better --- satrs-core/src/hal/std/tcp_server.rs | 4 ++-- satrs-core/src/hal/std/tcp_spacepackets_server.rs | 4 ++-- satrs-core/src/hal/std/tcp_with_cobs_server.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/satrs-core/src/hal/std/tcp_server.rs b/satrs-core/src/hal/std/tcp_server.rs index dcccf66..6e17a4e 100644 --- a/satrs-core/src/hal/std/tcp_server.rs +++ b/satrs-core/src/hal/std/tcp_server.rs @@ -90,7 +90,7 @@ pub trait TcpTcParser { fn handle_tc_parsing( &mut self, tc_buffer: &mut [u8], - tc_receiver: &mut dyn ReceivesTc, + tc_receiver: &mut (impl ReceivesTc + ?Sized), conn_result: &mut ConnectionResult, current_write_idx: usize, next_write_idx: &mut usize, @@ -105,7 +105,7 @@ pub trait TcpTmSender { fn handle_tm_sending( &mut self, tm_buffer: &mut [u8], - tm_source: &mut dyn TmPacketSource, + tm_source: &mut (impl TmPacketSource + ?Sized), conn_result: &mut ConnectionResult, stream: &mut TcpStream, ) -> Result>; diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index 6c5c3c5..1018364 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -18,7 +18,7 @@ impl TcpTcParser for CcsdsTcParser fn handle_tc_parsing( &mut self, tc_buffer: &mut [u8], - tc_receiver: &mut dyn ReceivesTc, + tc_receiver: &mut (impl ReceivesTc + ?Sized), conn_result: &mut ConnectionResult, current_write_idx: usize, next_write_idx: &mut usize, @@ -43,7 +43,7 @@ impl TcpTmSender for CcsdsTmSender { fn handle_tm_sending( &mut self, tm_buffer: &mut [u8], - tm_source: &mut dyn TmPacketSource, + tm_source: &mut (impl TmPacketSource + ?Sized), conn_result: &mut ConnectionResult, stream: &mut TcpStream, ) -> Result> { diff --git a/satrs-core/src/hal/std/tcp_with_cobs_server.rs b/satrs-core/src/hal/std/tcp_with_cobs_server.rs index fc44ebf..10d72d0 100644 --- a/satrs-core/src/hal/std/tcp_with_cobs_server.rs +++ b/satrs-core/src/hal/std/tcp_with_cobs_server.rs @@ -24,7 +24,7 @@ impl TcpTcParser for CobsTcParser { fn handle_tc_parsing( &mut self, tc_buffer: &mut [u8], - tc_receiver: &mut dyn ReceivesTc, + tc_receiver: &mut (impl ReceivesTc + ?Sized), conn_result: &mut ConnectionResult, current_write_idx: usize, next_write_idx: &mut usize, @@ -59,7 +59,7 @@ impl TcpTmSender for CobsTmSender { fn handle_tm_sending( &mut self, tm_buffer: &mut [u8], - tm_source: &mut dyn TmPacketSource, + tm_source: &mut (impl TmPacketSource + ?Sized), conn_result: &mut ConnectionResult, stream: &mut TcpStream, ) -> Result> { From c96de203b849de7e9248f4fc83abc439786ca838 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 26 Sep 2023 10:08:14 +0200 Subject: [PATCH 3/8] what is this? --- satrs-core/src/hal/std/tcp_server.rs | 59 +++++++++++++ .../src/hal/std/tcp_spacepackets_server.rs | 85 ++++++++++++++++++- .../src/hal/std/tcp_with_cobs_server.rs | 52 +----------- 3 files changed, 142 insertions(+), 54 deletions(-) diff --git a/satrs-core/src/hal/std/tcp_server.rs b/satrs-core/src/hal/std/tcp_server.rs index 6e17a4e..2e8bb01 100644 --- a/satrs-core/src/hal/std/tcp_server.rs +++ b/satrs-core/src/hal/std/tcp_server.rs @@ -12,6 +12,9 @@ use crate::tmtc::{ReceivesTc, TmPacketSource}; use thiserror::Error; // Re-export the TMTC in COBS server. +pub use crate::hal::std::tcp_spacepackets_server::{ + CcsdsTcParser, CcsdsTmSender, TcpSpacepacketsServer, +}; pub use crate::hal::std::tcp_with_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer}; /// Configuration struct for the generic TCP TMTC server @@ -317,3 +320,59 @@ impl TcpTmtcServerBase { self.listener.local_addr() } } + +#[cfg(test)] +pub(crate) mod tests { + use std::sync::Mutex; + + use alloc::{collections::VecDeque, sync::Arc, vec::Vec}; + + use crate::tmtc::{ReceivesTcCore, TmPacketSourceCore}; + + #[derive(Default, Clone)] + pub(crate) struct SyncTcCacher { + pub(crate) tc_queue: Arc>>>, + } + impl ReceivesTcCore for SyncTcCacher { + type Error = (); + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed"); + tc_queue.push_back(tc_raw.to_vec()); + Ok(()) + } + } + + #[derive(Default, Clone)] + pub(crate) struct SyncTmSource { + tm_queue: Arc>>>, + } + + impl SyncTmSource { + pub(crate) fn add_tm(&mut self, tm: &[u8]) { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec"); + tm_queue.push_back(tm.to_vec()); + } + } + + impl TmPacketSourceCore for SyncTmSource { + type Error = (); + + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed"); + if !tm_queue.is_empty() { + let next_vec = tm_queue.front().unwrap(); + if buffer.len() < next_vec.len() { + panic!( + "provided buffer too small, must be at least {} bytes", + next_vec.len() + ); + } + let next_vec = tm_queue.pop_front().unwrap(); + buffer[0..next_vec.len()].copy_from_slice(&next_vec); + return Ok(next_vec.len()); + } + Ok(0) + } + } +} diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index 1018364..2226f35 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -1,4 +1,5 @@ -use std::{io::Write, net::TcpStream}; +use delegate::delegate; +use std::{io::Write, net::{TcpStream, TcpListener, SocketAddr}}; use alloc::boxed::Box; @@ -7,11 +8,19 @@ use crate::{ tmtc::{ReceivesTc, TmPacketSource}, }; -use super::tcp_server::{ConnectionResult, TcpTcParser, TcpTmSender, TcpTmtcError}; +use super::tcp_server::{ + ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer, +}; /// Concrete [TcpTcParser] implementation for the []. pub struct CcsdsTcParser { - packet_id_lookup: Box, + packet_id_lookup: Box, +} + +impl CcsdsTcParser { + pub fn new(packet_id_lookup: Box) -> Self { + Self { packet_id_lookup } + } } impl TcpTcParser for CcsdsTcParser { @@ -66,5 +75,73 @@ impl TcpTmSender for CcsdsTmSender { } } +/// TCP TMTC server implementation for exchange of tightly stuffed CCSDS space packets. +/// +/// This serves only works if CCSDS space packets are the only packet type being exchanged. +/// It uses the CCSDS [spacepackets::PacketId] as the packet delimiter and start marker when +/// parsing for packets. The user specifies a set of expected [spacepackets::PacketId]s as part +/// of the server configuration for that purpose. +/// +/// ## Example +/// +/// The [TCP integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_server_cobs.rs) +/// also serves as the example application for this module. +pub struct TcpSpacepacketsServer { + generic_server: TcpTmtcGenericServer, +} + +impl TcpSpacepacketsServer { + /// Create a new TCP TMTC server which exchanges TMTC packets encoded with + /// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). + /// + /// ## Parameter + /// + /// * `cfg` - Configuration of the server. + /// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are + /// then sent back to the client. + /// * `tc_receiver` - Any received telecommands which were decoded successfully will be + /// forwarded to this TC receiver. + pub fn new( + cfg: ServerConfig, + tm_source: Box>, + tc_receiver: Box>, + packet_id_lookup: Box + ) -> Result> { + Ok(Self { + generic_server: TcpTmtcGenericServer::new( + cfg, + CcsdsTcParser::new(packet_id_lookup), + CcsdsTmSender::default(), + tm_source, + tc_receiver, + )?, + }) + } + + delegate! { + to self.generic_server { + pub fn listener(&mut self) -> &mut TcpListener; + + /// Can be used to retrieve the local assigned address of the TCP server. This is especially + /// useful if using the port number 0 for OS auto-assignment. + pub fn local_addr(&self) -> std::io::Result; + + /// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call. + pub fn handle_next_connection( + &mut self, + ) -> Result>; + } + } +} + #[cfg(test)] -mod tests {} +mod tests { + use super::TcpSpacepacketsServer; + + #[test] + fn test_basic() { + let + let server = TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup) + + } +} diff --git a/satrs-core/src/hal/std/tcp_with_cobs_server.rs b/satrs-core/src/hal/std/tcp_with_cobs_server.rs index 10d72d0..f019eaf 100644 --- a/satrs-core/src/hal/std/tcp_with_cobs_server.rs +++ b/satrs-core/src/hal/std/tcp_with_cobs_server.rs @@ -173,61 +173,13 @@ mod tests { use crate::{ encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET}, - hal::std::tcp_server::ServerConfig, - tmtc::{ReceivesTcCore, TmPacketSourceCore}, + hal::std::tcp_server::{ServerConfig, tests::{SyncTcCacher, SyncTmSource}}, }; - use alloc::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec}; + use alloc::{boxed::Box, sync::Arc}; use cobs::encode; use super::TcpTmtcInCobsServer; - #[derive(Default, Clone)] - struct SyncTcCacher { - tc_queue: Arc>>>, - } - impl ReceivesTcCore for SyncTcCacher { - type Error = (); - - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { - let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed"); - tc_queue.push_back(tc_raw.to_vec()); - Ok(()) - } - } - - #[derive(Default, Clone)] - struct SyncTmSource { - tm_queue: Arc>>>, - } - - impl SyncTmSource { - pub(crate) fn add_tm(&mut self, tm: &[u8]) { - let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec"); - tm_queue.push_back(tm.to_vec()); - } - } - - impl TmPacketSourceCore for SyncTmSource { - type Error = (); - - fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { - let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed"); - if !tm_queue.is_empty() { - let next_vec = tm_queue.front().unwrap(); - if buffer.len() < next_vec.len() { - panic!( - "provided buffer too small, must be at least {} bytes", - next_vec.len() - ); - } - let next_vec = tm_queue.pop_front().unwrap(); - buffer[0..next_vec.len()].copy_from_slice(&next_vec); - return Ok(next_vec.len()); - } - Ok(0) - } - } - fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) { encode_packet(&SIMPLE_PACKET, encoded_buf, current_idx) } From 683ae899f5410c4f3439e2cb21b71107456e0ea6 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 26 Sep 2023 10:23:34 +0200 Subject: [PATCH 4/8] Add first CCSDS server tests --- .../src/hal/std/tcp_spacepackets_server.rs | 51 +++++++++++++++++-- .../src/hal/std/tcp_with_cobs_server.rs | 1 - 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index 2226f35..c791221 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -136,12 +136,57 @@ impl TcpSpacepacketsServer #[cfg(test)] mod tests { + use core::time::Duration; + use std::net::{SocketAddr, IpAddr, Ipv4Addr}; + + use alloc::boxed::Box; + use hashbrown::HashSet; + use spacepackets::PacketId; + + use crate::hal::std::tcp_server::{tests::{SyncTmSource, SyncTcCacher}, ServerConfig}; + use super::TcpSpacepacketsServer; + const APID_0: u16 = 0x02; + const PACKET_ID_0: PacketId= PacketId::const_tc(true, APID_0); + + fn generic_tmtc_server( + addr: &SocketAddr, + tc_receiver: SyncTcCacher, + tm_source: SyncTmSource, + packet_id_lookup: HashSet + ) -> TcpSpacepacketsServer<(), ()> { + TcpSpacepacketsServer::new( + ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), + Box::new(tm_source), + Box::new(tc_receiver), + Box::new(packet_id_lookup) + ) + .expect("TCP server generation failed") + } #[test] fn test_basic() { - let - let server = TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup) - + let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let tc_receiver = SyncTcCacher::default(); + let tm_source = SyncTmSource::default(); + let mut packet_id_lookup = HashSet::new(); + packet_id_lookup.insert(PACKET_ID_0); + let server = generic_tmtc_server(&auto_port_addr, tc_receiver, tm_source, packet_id_lookup); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!(conn_result.num_received_tcs, 1); + assert_eq!(conn_result.num_sent_tms, 0); + set_if_done.store(true, Ordering::Relaxed); + }); } } diff --git a/satrs-core/src/hal/std/tcp_with_cobs_server.rs b/satrs-core/src/hal/std/tcp_with_cobs_server.rs index f019eaf..dccf19c 100644 --- a/satrs-core/src/hal/std/tcp_with_cobs_server.rs +++ b/satrs-core/src/hal/std/tcp_with_cobs_server.rs @@ -167,7 +167,6 @@ mod tests { use std::{ io::{Read, Write}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, - sync::Mutex, thread, }; From a62df6dbf820a9ce4bd8dc3b98e4e610ee4a803b Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 26 Sep 2023 12:48:25 +0200 Subject: [PATCH 5/8] small bugfix, test works --- satrs-core/src/encoding/ccsds.rs | 2 +- .../src/hal/std/tcp_spacepackets_server.rs | 69 +++++++++++++++---- 2 files changed, 56 insertions(+), 15 deletions(-) diff --git a/satrs-core/src/encoding/ccsds.rs b/satrs-core/src/encoding/ccsds.rs index 6bea03d..ec7da4c 100644 --- a/satrs-core/src/encoding/ccsds.rs +++ b/satrs-core/src/encoding/ccsds.rs @@ -80,7 +80,7 @@ pub fn parse_buffer_for_ccsds_space_packets( let length_field = u16::from_be_bytes(buf[current_idx + 4..current_idx + 6].try_into().unwrap()); let packet_size = length_field + 7; - if (current_idx + packet_size as usize) < buf_len { + if (current_idx + packet_size as usize) <= buf_len { tc_receiver.pass_tc(&buf[current_idx..current_idx + packet_size as usize])?; packets_found += 1; } else { diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index c791221..1c8b1a4 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -1,5 +1,8 @@ use delegate::delegate; -use std::{io::Write, net::{TcpStream, TcpListener, SocketAddr}}; +use std::{ + io::Write, + net::{SocketAddr, TcpListener, TcpStream}, +}; use alloc::boxed::Box; @@ -105,7 +108,7 @@ impl TcpSpacepacketsServer cfg: ServerConfig, tm_source: Box>, tc_receiver: Box>, - packet_id_lookup: Box + packet_id_lookup: Box, ) -> Result> { Ok(Self { generic_server: TcpTmtcGenericServer::new( @@ -136,42 +139,56 @@ impl TcpSpacepacketsServer #[cfg(test)] mod tests { - use core::time::Duration; - use std::net::{SocketAddr, IpAddr, Ipv4Addr}; + use core::{ + sync::atomic::{AtomicBool, Ordering}, + time::Duration, + }; + use std::{ + io::Write, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, + thread, + }; - use alloc::boxed::Box; + use alloc::{boxed::Box, sync::Arc}; use hashbrown::HashSet; - use spacepackets::PacketId; + use spacepackets::{ + ecss::{tc::PusTcCreator, SerializablePusPacket}, + PacketId, SpHeader, + }; - use crate::hal::std::tcp_server::{tests::{SyncTmSource, SyncTcCacher}, ServerConfig}; + use crate::hal::std::tcp_server::{ + tests::{SyncTcCacher, SyncTmSource}, + ServerConfig, + }; use super::TcpSpacepacketsServer; - const APID_0: u16 = 0x02; - const PACKET_ID_0: PacketId= PacketId::const_tc(true, APID_0); + const TEST_APID_0: u16 = 0x02; + const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0); fn generic_tmtc_server( addr: &SocketAddr, tc_receiver: SyncTcCacher, tm_source: SyncTmSource, - packet_id_lookup: HashSet + packet_id_lookup: HashSet, ) -> TcpSpacepacketsServer<(), ()> { TcpSpacepacketsServer::new( ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), Box::new(tm_source), Box::new(tc_receiver), - Box::new(packet_id_lookup) + Box::new(packet_id_lookup), ) .expect("TCP server generation failed") } #[test] - fn test_basic() { + fn test_basic_tc_only() { let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); let tc_receiver = SyncTcCacher::default(); let tm_source = SyncTmSource::default(); let mut packet_id_lookup = HashSet::new(); - packet_id_lookup.insert(PACKET_ID_0); - let server = generic_tmtc_server(&auto_port_addr, tc_receiver, tm_source, packet_id_lookup); + packet_id_lookup.insert(TEST_PACKET_ID_0); + let mut tcp_server = + generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source, packet_id_lookup); let dest_addr = tcp_server .local_addr() .expect("retrieving dest addr failed"); @@ -188,5 +205,29 @@ mod tests { assert_eq!(conn_result.num_sent_tms, 0); set_if_done.store(true, Ordering::Relaxed); }); + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + let mut buffer: [u8; 32] = [0; 32]; + let packet_len_ping = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + stream + .write_all(&buffer[..packet_len_ping]) + .expect("writing to TCP server failed"); + drop(stream); + + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that TC has arrived. + let tc_queue = tc_receiver.tc_queue.lock().unwrap(); + assert_eq!(tc_queue.len(), 1); } } From f0ccc35e80fc9eb1be38c7df4f2dd96826263c0f Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 26 Sep 2023 12:51:11 +0200 Subject: [PATCH 6/8] cargo fmt --- satrs-core/src/hal/std/tcp_spacepackets_server.rs | 8 ++++++-- satrs-core/src/hal/std/tcp_with_cobs_server.rs | 5 ++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index 1c8b1a4..ec37092 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -187,8 +187,12 @@ mod tests { let tm_source = SyncTmSource::default(); let mut packet_id_lookup = HashSet::new(); packet_id_lookup.insert(TEST_PACKET_ID_0); - let mut tcp_server = - generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source, packet_id_lookup); + let mut tcp_server = generic_tmtc_server( + &auto_port_addr, + tc_receiver.clone(), + tm_source, + packet_id_lookup, + ); let dest_addr = tcp_server .local_addr() .expect("retrieving dest addr failed"); diff --git a/satrs-core/src/hal/std/tcp_with_cobs_server.rs b/satrs-core/src/hal/std/tcp_with_cobs_server.rs index dccf19c..60231c6 100644 --- a/satrs-core/src/hal/std/tcp_with_cobs_server.rs +++ b/satrs-core/src/hal/std/tcp_with_cobs_server.rs @@ -172,7 +172,10 @@ mod tests { use crate::{ encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET}, - hal::std::tcp_server::{ServerConfig, tests::{SyncTcCacher, SyncTmSource}}, + hal::std::tcp_server::{ + tests::{SyncTcCacher, SyncTmSource}, + ServerConfig, + }, }; use alloc::{boxed::Box, sync::Arc}; use cobs::encode; From 0117482da1d44cc869f863fe9695a43b38b54a05 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 26 Sep 2023 15:28:46 +0200 Subject: [PATCH 7/8] finished unittests for CCSDS server --- .../src/hal/std/tcp_spacepackets_server.rs | 205 +++++++++++++++++- 1 file changed, 203 insertions(+), 2 deletions(-) diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index ec37092..827f774 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -143,8 +143,10 @@ mod tests { sync::atomic::{AtomicBool, Ordering}, time::Duration, }; + #[allow(unused_imports)] + use std::println; use std::{ - io::Write, + io::{Read, Write}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, thread, }; @@ -165,6 +167,8 @@ mod tests { const TEST_APID_0: u16 = 0x02; const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0); + const TEST_APID_1: u16 = 0x10; + const TEST_PACKET_ID_1: PacketId = PacketId::const_tc(true, TEST_APID_1); fn generic_tmtc_server( addr: &SocketAddr, @@ -180,6 +184,7 @@ mod tests { ) .expect("TCP server generation failed") } + #[test] fn test_basic_tc_only() { let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); @@ -231,7 +236,203 @@ mod tests { panic!("connection was not handled properly"); } // Check that TC has arrived. - let tc_queue = tc_receiver.tc_queue.lock().unwrap(); + let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); assert_eq!(tc_queue.len(), 1); + assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len_ping]); + } + + #[test] + fn test_basic_tc_and_tm() { + let mut buffer: [u8; 32] = [0; 32]; + let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let tc_receiver = SyncTcCacher::default(); + let mut tm_source = SyncTmSource::default(); + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true); + let tm_packet_len = verif_tm + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + tm_source.add_tm(&buffer[..tm_packet_len]); + let tm_vec = buffer[..tm_packet_len].to_vec(); + let mut packet_id_lookup = HashSet::new(); + packet_id_lookup.insert(TEST_PACKET_ID_0); + let mut tcp_server = generic_tmtc_server( + &auto_port_addr, + tc_receiver.clone(), + tm_source, + packet_id_lookup, + ); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!(conn_result.num_received_tcs, 1); + assert_eq!(conn_result.num_sent_tms, 1); + set_if_done.store(true, Ordering::Relaxed); + }); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting reas timeout failed"); + let packet_len = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + stream + .write_all(&buffer[..packet_len]) + .expect("writing to TCP server failed"); + + // Done with writing. + stream + .shutdown(std::net::Shutdown::Write) + .expect("shutting down write failed"); + let mut read_buf: [u8; 16] = [0; 16]; + let mut read_len_total = 0; + // Timeout ensures this does not block forever. + while read_len_total < tm_packet_len { + let read_len = stream.read(&mut read_buf).expect("read failed"); + read_len_total += read_len; + assert_eq!(read_buf[..read_len], tm_vec); + } + drop(stream); + + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that TC has arrived. + let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); + assert_eq!(tc_queue.len(), 1); + assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len]); + } + + #[test] + fn test_multi_tc_multi_tm() { + let mut buffer: [u8; 32] = [0; 32]; + let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let tc_receiver = SyncTcCacher::default(); + let mut tm_source = SyncTmSource::default(); + + // Add telemetry + let mut total_tm_len = 0; + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true); + let tm_packet_len = verif_tm + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + total_tm_len += tm_packet_len; + let tm_0 = buffer[..tm_packet_len].to_vec(); + tm_source.add_tm(&tm_0); + let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap(); + let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 3, None, true); + let tm_packet_len = verif_tm + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + total_tm_len += tm_packet_len; + let tm_1 = buffer[..tm_packet_len].to_vec(); + tm_source.add_tm(&tm_1); + + // Set up server + let mut packet_id_lookup = HashSet::new(); + packet_id_lookup.insert(TEST_PACKET_ID_0); + packet_id_lookup.insert(TEST_PACKET_ID_1); + let mut tcp_server = generic_tmtc_server( + &auto_port_addr, + tc_receiver.clone(), + tm_source, + packet_id_lookup, + ); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!( + conn_result.num_received_tcs, 2, + "wrong number of received TCs" + ); + assert_eq!(conn_result.num_sent_tms, 2, "wrong number of sent TMs"); + set_if_done.store(true, Ordering::Relaxed); + }); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting reas timeout failed"); + + // Send telecommands + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + let packet_len = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let tc_0 = buffer[..packet_len].to_vec(); + stream + .write_all(&tc_0) + .expect("writing to TCP server failed"); + let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap(); + let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true); + let packet_len = action_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let tc_1 = buffer[..packet_len].to_vec(); + stream + .write_all(&tc_1) + .expect("writing to TCP server failed"); + + // Done with writing. + stream + .shutdown(std::net::Shutdown::Write) + .expect("shutting down write failed"); + let mut read_buf: [u8; 32] = [0; 32]; + let mut current_idx = 0; + let mut read_len_total = 0; + // Timeout ensures this does not block forever. + while read_len_total < total_tm_len { + let read_len = stream + .read(&mut read_buf[current_idx..]) + .expect("read failed"); + current_idx += read_len; + read_len_total += read_len; + } + drop(stream); + assert_eq!(read_buf[..tm_0.len()], tm_0); + assert_eq!(read_buf[tm_0.len()..tm_0.len() + tm_1.len()], tm_1); + + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that TC has arrived. + let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); + assert_eq!(tc_queue.len(), 2); + assert_eq!(tc_queue.pop_front().unwrap(), tc_0); + assert_eq!(tc_queue.pop_front().unwrap(), tc_1); } } From 35cef32ebfea2ef4f927d2f3655e2b606a406212 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 26 Sep 2023 15:57:51 +0200 Subject: [PATCH 8/8] link corrections --- satrs-core/src/hal/std/mod.rs | 2 +- ...with_cobs_server.rs => tcp_cobs_server.rs} | 2 +- satrs-core/src/hal/std/tcp_server.rs | 4 +- .../src/hal/std/tcp_spacepackets_server.rs | 119 ++++-------------- .../{tcp_server_cobs.rs => tcp_servers.rs} | 96 +++++++++++++- 5 files changed, 118 insertions(+), 105 deletions(-) rename satrs-core/src/hal/std/{tcp_with_cobs_server.rs => tcp_cobs_server.rs} (99%) rename satrs-core/tests/{tcp_server_cobs.rs => tcp_servers.rs} (60%) diff --git a/satrs-core/src/hal/std/mod.rs b/satrs-core/src/hal/std/mod.rs index 17ec012..50c19d7 100644 --- a/satrs-core/src/hal/std/mod.rs +++ b/satrs-core/src/hal/std/mod.rs @@ -2,5 +2,5 @@ pub mod tcp_server; pub mod udp_server; +mod tcp_cobs_server; mod tcp_spacepackets_server; -mod tcp_with_cobs_server; diff --git a/satrs-core/src/hal/std/tcp_with_cobs_server.rs b/satrs-core/src/hal/std/tcp_cobs_server.rs similarity index 99% rename from satrs-core/src/hal/std/tcp_with_cobs_server.rs rename to satrs-core/src/hal/std/tcp_cobs_server.rs index 60231c6..4a22a8a 100644 --- a/satrs-core/src/hal/std/tcp_with_cobs_server.rs +++ b/satrs-core/src/hal/std/tcp_cobs_server.rs @@ -109,7 +109,7 @@ impl TcpTmSender for CobsTmSender { /// /// ## Example /// -/// The [TCP COBS integration](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_server_cobs.rs) +/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs) /// test also serves as the example application for this module. pub struct TcpTmtcInCobsServer { generic_server: TcpTmtcGenericServer, diff --git a/satrs-core/src/hal/std/tcp_server.rs b/satrs-core/src/hal/std/tcp_server.rs index 2e8bb01..e9fe657 100644 --- a/satrs-core/src/hal/std/tcp_server.rs +++ b/satrs-core/src/hal/std/tcp_server.rs @@ -12,10 +12,10 @@ use crate::tmtc::{ReceivesTc, TmPacketSource}; use thiserror::Error; // Re-export the TMTC in COBS server. +pub use crate::hal::std::tcp_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer}; pub use crate::hal::std::tcp_spacepackets_server::{ - CcsdsTcParser, CcsdsTmSender, TcpSpacepacketsServer, + SpacepacketsTcParser, SpacepacketsTmSender, TcpSpacepacketsServer, }; -pub use crate::hal::std::tcp_with_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer}; /// Configuration struct for the generic TCP TMTC server /// diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index 827f774..b9fc86b 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -15,18 +15,18 @@ use super::tcp_server::{ ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer, }; -/// Concrete [TcpTcParser] implementation for the []. -pub struct CcsdsTcParser { +/// Concrete [TcpTcParser] implementation for the [TcpSpacepacketsServer]. +pub struct SpacepacketsTcParser { packet_id_lookup: Box, } -impl CcsdsTcParser { +impl SpacepacketsTcParser { pub fn new(packet_id_lookup: Box) -> Self { Self { packet_id_lookup } } } -impl TcpTcParser for CcsdsTcParser { +impl TcpTcParser for SpacepacketsTcParser { fn handle_tc_parsing( &mut self, tc_buffer: &mut [u8], @@ -47,11 +47,11 @@ impl TcpTcParser for CcsdsTcParser } } -/// Concrete [TcpTmSender] implementation for the []. +/// Concrete [TcpTmSender] implementation for the [TcpSpacepacketsServer]. #[derive(Default)] -pub struct CcsdsTmSender {} +pub struct SpacepacketsTmSender {} -impl TcpTmSender for CcsdsTmSender { +impl TcpTmSender for SpacepacketsTmSender { fn handle_tm_sending( &mut self, tm_buffer: &mut [u8], @@ -78,24 +78,26 @@ impl TcpTmSender for CcsdsTmSender { } } -/// TCP TMTC server implementation for exchange of tightly stuffed CCSDS space packets. +/// TCP TMTC server implementation for exchange of tightly stuffed +/// [CCSDS space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf). /// -/// This serves only works if CCSDS space packets are the only packet type being exchanged. -/// It uses the CCSDS [spacepackets::PacketId] as the packet delimiter and start marker when -/// parsing for packets. The user specifies a set of expected [spacepackets::PacketId]s as part -/// of the server configuration for that purpose. +/// This serves only works if +/// [CCSDS 133.0-B-2 space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf) are the only +/// packet type being exchanged. It uses the CCSDS [spacepackets::PacketId] as the packet delimiter +/// and start marker when parsing for packets. The user specifies a set of expected +/// [spacepackets::PacketId]s as part of the server configuration for that purpose. /// /// ## Example /// -/// The [TCP integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_server_cobs.rs) +/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs) /// also serves as the example application for this module. pub struct TcpSpacepacketsServer { - generic_server: TcpTmtcGenericServer, + generic_server: + TcpTmtcGenericServer, } impl TcpSpacepacketsServer { - /// Create a new TCP TMTC server which exchanges TMTC packets encoded with - /// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). + /// Create a new TCP TMTC server which exchanges CCSDS space packets. /// /// ## Parameter /// @@ -104,6 +106,8 @@ impl TcpSpacepacketsServer /// then sent back to the client. /// * `tc_receiver` - Any received telecommands which were decoded successfully will be /// forwarded to this TC receiver. + /// * `packet_id_lookup` - This lookup table contains the relevant packets IDs for packet + /// parsing. This mechanism is used to have a start marker for finding CCSDS packets. pub fn new( cfg: ServerConfig, tm_source: Box>, @@ -113,8 +117,8 @@ impl TcpSpacepacketsServer Ok(Self { generic_server: TcpTmtcGenericServer::new( cfg, - CcsdsTcParser::new(packet_id_lookup), - CcsdsTmSender::default(), + SpacepacketsTcParser::new(packet_id_lookup), + SpacepacketsTmSender::default(), tm_source, tc_receiver, )?, @@ -241,85 +245,6 @@ mod tests { assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len_ping]); } - #[test] - fn test_basic_tc_and_tm() { - let mut buffer: [u8; 32] = [0; 32]; - let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); - let tc_receiver = SyncTcCacher::default(); - let mut tm_source = SyncTmSource::default(); - let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); - let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true); - let tm_packet_len = verif_tm - .write_to_bytes(&mut buffer) - .expect("writing packet failed"); - tm_source.add_tm(&buffer[..tm_packet_len]); - let tm_vec = buffer[..tm_packet_len].to_vec(); - let mut packet_id_lookup = HashSet::new(); - packet_id_lookup.insert(TEST_PACKET_ID_0); - let mut tcp_server = generic_tmtc_server( - &auto_port_addr, - tc_receiver.clone(), - tm_source, - packet_id_lookup, - ); - let dest_addr = tcp_server - .local_addr() - .expect("retrieving dest addr failed"); - let conn_handled: Arc = Default::default(); - let set_if_done = conn_handled.clone(); - // Call the connection handler in separate thread, does block. - thread::spawn(move || { - let result = tcp_server.handle_next_connection(); - if result.is_err() { - panic!("handling connection failed: {:?}", result.unwrap_err()); - } - let conn_result = result.unwrap(); - assert_eq!(conn_result.num_received_tcs, 1); - assert_eq!(conn_result.num_sent_tms, 1); - set_if_done.store(true, Ordering::Relaxed); - }); - let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); - let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); - let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); - stream - .set_read_timeout(Some(Duration::from_millis(10))) - .expect("setting reas timeout failed"); - let packet_len = ping_tc - .write_to_bytes(&mut buffer) - .expect("writing packet failed"); - stream - .write_all(&buffer[..packet_len]) - .expect("writing to TCP server failed"); - - // Done with writing. - stream - .shutdown(std::net::Shutdown::Write) - .expect("shutting down write failed"); - let mut read_buf: [u8; 16] = [0; 16]; - let mut read_len_total = 0; - // Timeout ensures this does not block forever. - while read_len_total < tm_packet_len { - let read_len = stream.read(&mut read_buf).expect("read failed"); - read_len_total += read_len; - assert_eq!(read_buf[..read_len], tm_vec); - } - drop(stream); - - // A certain amount of time is allowed for the transaction to complete. - for _ in 0..3 { - if !conn_handled.load(Ordering::Relaxed) { - thread::sleep(Duration::from_millis(5)); - } - } - if !conn_handled.load(Ordering::Relaxed) { - panic!("connection was not handled properly"); - } - // Check that TC has arrived. - let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); - assert_eq!(tc_queue.len(), 1); - assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len]); - } - #[test] fn test_multi_tc_multi_tm() { let mut buffer: [u8; 32] = [0; 32]; diff --git a/satrs-core/tests/tcp_server_cobs.rs b/satrs-core/tests/tcp_servers.rs similarity index 60% rename from satrs-core/tests/tcp_server_cobs.rs rename to satrs-core/tests/tcp_servers.rs index 5956fb3..b3e7993 100644 --- a/satrs-core/tests/tcp_server_cobs.rs +++ b/satrs-core/tests/tcp_servers.rs @@ -21,11 +21,16 @@ use std::{ thread, }; +use hashbrown::HashSet; use satrs_core::{ encoding::cobs::encode_packet_with_cobs, - hal::std::tcp_server::{ServerConfig, TcpTmtcInCobsServer}, + hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer, TcpTmtcInCobsServer}, tmtc::{ReceivesTcCore, TmPacketSourceCore}, }; +use spacepackets::{ + ecss::{tc::PusTcCreator, SerializablePusPacket}, + PacketId, SpHeader, +}; use std::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec}; #[derive(Default, Clone)] @@ -79,15 +84,16 @@ impl TmPacketSourceCore for SyncTmSource { const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5]; const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 4, 1]; +const AUTO_PORT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); -fn main() { - let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); +#[test] +fn test_cobs_server() { let tc_receiver = SyncTcCacher::default(); let mut tm_source = SyncTmSource::default(); // Insert a telemetry packet which will be read back by the client at a later stage. tm_source.add_tm(&INVERTED_PACKET); let mut tcp_server = TcpTmtcInCobsServer::new( - ServerConfig::new(auto_port_addr, Duration::from_millis(2), 1024, 1024), + ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), Box::new(tm_source), Box::new(tc_receiver.clone()), ) @@ -154,3 +160,85 @@ fn main() { assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET); drop(tc_queue); } + +const TEST_APID_0: u16 = 0x02; +const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0); + +#[test] +fn test_ccsds_server() { + let mut buffer: [u8; 32] = [0; 32]; + let tc_receiver = SyncTcCacher::default(); + let mut tm_source = SyncTmSource::default(); + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true); + let tm_packet_len = verif_tm + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + tm_source.add_tm(&buffer[..tm_packet_len]); + let tm_vec = buffer[..tm_packet_len].to_vec(); + let mut packet_id_lookup = HashSet::new(); + packet_id_lookup.insert(TEST_PACKET_ID_0); + let mut tcp_server = TcpSpacepacketsServer::new( + ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), + Box::new(tm_source), + Box::new(tc_receiver.clone()), + Box::new(packet_id_lookup), + ) + .expect("TCP server generation failed"); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!(conn_result.num_received_tcs, 1); + assert_eq!(conn_result.num_sent_tms, 1); + set_if_done.store(true, Ordering::Relaxed); + }); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting reas timeout failed"); + let packet_len = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + stream + .write_all(&buffer[..packet_len]) + .expect("writing to TCP server failed"); + + // Done with writing. + stream + .shutdown(std::net::Shutdown::Write) + .expect("shutting down write failed"); + let mut read_buf: [u8; 16] = [0; 16]; + let mut read_len_total = 0; + // Timeout ensures this does not block forever. + while read_len_total < tm_packet_len { + let read_len = stream.read(&mut read_buf).expect("read failed"); + read_len_total += read_len; + assert_eq!(read_buf[..read_len], tm_vec); + } + drop(stream); + + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that TC has arrived. + let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); + assert_eq!(tc_queue.len(), 1); + assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len]); +}