diff --git a/satrs-core/src/encoding/ccsds.rs b/satrs-core/src/encoding/ccsds.rs index f8f775f..ec7da4c 100644 --- a/satrs-core/src/encoding/ccsds.rs +++ b/satrs-core/src/encoding/ccsds.rs @@ -64,7 +64,7 @@ impl PacketIdLookup for [PacketId] { pub fn parse_buffer_for_ccsds_space_packets( buf: &mut [u8], packet_id_lookup: &(impl PacketIdLookup + ?Sized), - tc_receiver: &mut impl ReceivesTcCore, + tc_receiver: &mut (impl ReceivesTcCore + ?Sized), next_write_idx: &mut usize, ) -> Result { *next_write_idx = 0; @@ -80,7 +80,7 @@ pub fn parse_buffer_for_ccsds_space_packets( let length_field = u16::from_be_bytes(buf[current_idx + 4..current_idx + 6].try_into().unwrap()); let packet_size = length_field + 7; - if (current_idx + packet_size as usize) < buf_len { + if (current_idx + packet_size as usize) <= buf_len { tc_receiver.pass_tc(&buf[current_idx..current_idx + packet_size as usize])?; packets_found += 1; } else { diff --git a/satrs-core/src/hal/std/mod.rs b/satrs-core/src/hal/std/mod.rs index 17ec012..50c19d7 100644 --- a/satrs-core/src/hal/std/mod.rs +++ b/satrs-core/src/hal/std/mod.rs @@ -2,5 +2,5 @@ pub mod tcp_server; pub mod udp_server; +mod tcp_cobs_server; mod tcp_spacepackets_server; -mod tcp_with_cobs_server; diff --git a/satrs-core/src/hal/std/tcp_with_cobs_server.rs b/satrs-core/src/hal/std/tcp_cobs_server.rs similarity index 87% rename from satrs-core/src/hal/std/tcp_with_cobs_server.rs rename to satrs-core/src/hal/std/tcp_cobs_server.rs index fc44ebf..4a22a8a 100644 --- a/satrs-core/src/hal/std/tcp_with_cobs_server.rs +++ b/satrs-core/src/hal/std/tcp_cobs_server.rs @@ -24,7 +24,7 @@ impl TcpTcParser for CobsTcParser { fn handle_tc_parsing( &mut self, tc_buffer: &mut [u8], - tc_receiver: &mut dyn ReceivesTc, + tc_receiver: &mut (impl ReceivesTc + ?Sized), conn_result: &mut ConnectionResult, current_write_idx: usize, next_write_idx: &mut usize, @@ -59,7 +59,7 @@ impl TcpTmSender for CobsTmSender { fn handle_tm_sending( &mut self, tm_buffer: &mut [u8], - tm_source: &mut dyn TmPacketSource, + tm_source: &mut (impl TmPacketSource + ?Sized), conn_result: &mut ConnectionResult, stream: &mut TcpStream, ) -> Result> { @@ -109,7 +109,7 @@ impl TcpTmSender for CobsTmSender { /// /// ## Example /// -/// The [TCP COBS integration](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_server_cobs.rs) +/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs) /// test also serves as the example application for this module. pub struct TcpTmtcInCobsServer { generic_server: TcpTmtcGenericServer, @@ -167,67 +167,21 @@ mod tests { use std::{ io::{Read, Write}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, - sync::Mutex, thread, }; use crate::{ encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET}, - hal::std::tcp_server::ServerConfig, - tmtc::{ReceivesTcCore, TmPacketSourceCore}, + hal::std::tcp_server::{ + tests::{SyncTcCacher, SyncTmSource}, + ServerConfig, + }, }; - use alloc::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec}; + use alloc::{boxed::Box, sync::Arc}; use cobs::encode; use super::TcpTmtcInCobsServer; - #[derive(Default, Clone)] - struct SyncTcCacher { - tc_queue: Arc>>>, - } - impl ReceivesTcCore for SyncTcCacher { - type Error = (); - - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { - let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed"); - tc_queue.push_back(tc_raw.to_vec()); - Ok(()) - } - } - - #[derive(Default, Clone)] - struct SyncTmSource { - tm_queue: Arc>>>, - } - - impl SyncTmSource { - pub(crate) fn add_tm(&mut self, tm: &[u8]) { - let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec"); - tm_queue.push_back(tm.to_vec()); - } - } - - impl TmPacketSourceCore for SyncTmSource { - type Error = (); - - fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { - let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed"); - if !tm_queue.is_empty() { - let next_vec = tm_queue.front().unwrap(); - if buffer.len() < next_vec.len() { - panic!( - "provided buffer too small, must be at least {} bytes", - next_vec.len() - ); - } - let next_vec = tm_queue.pop_front().unwrap(); - buffer[0..next_vec.len()].copy_from_slice(&next_vec); - return Ok(next_vec.len()); - } - Ok(0) - } - } - fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) { encode_packet(&SIMPLE_PACKET, encoded_buf, current_idx) } diff --git a/satrs-core/src/hal/std/tcp_server.rs b/satrs-core/src/hal/std/tcp_server.rs index dcccf66..e9fe657 100644 --- a/satrs-core/src/hal/std/tcp_server.rs +++ b/satrs-core/src/hal/std/tcp_server.rs @@ -12,7 +12,10 @@ use crate::tmtc::{ReceivesTc, TmPacketSource}; use thiserror::Error; // Re-export the TMTC in COBS server. -pub use crate::hal::std::tcp_with_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer}; +pub use crate::hal::std::tcp_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer}; +pub use crate::hal::std::tcp_spacepackets_server::{ + SpacepacketsTcParser, SpacepacketsTmSender, TcpSpacepacketsServer, +}; /// Configuration struct for the generic TCP TMTC server /// @@ -90,7 +93,7 @@ pub trait TcpTcParser { fn handle_tc_parsing( &mut self, tc_buffer: &mut [u8], - tc_receiver: &mut dyn ReceivesTc, + tc_receiver: &mut (impl ReceivesTc + ?Sized), conn_result: &mut ConnectionResult, current_write_idx: usize, next_write_idx: &mut usize, @@ -105,7 +108,7 @@ pub trait TcpTmSender { fn handle_tm_sending( &mut self, tm_buffer: &mut [u8], - tm_source: &mut dyn TmPacketSource, + tm_source: &mut (impl TmPacketSource + ?Sized), conn_result: &mut ConnectionResult, stream: &mut TcpStream, ) -> Result>; @@ -317,3 +320,59 @@ impl TcpTmtcServerBase { self.listener.local_addr() } } + +#[cfg(test)] +pub(crate) mod tests { + use std::sync::Mutex; + + use alloc::{collections::VecDeque, sync::Arc, vec::Vec}; + + use crate::tmtc::{ReceivesTcCore, TmPacketSourceCore}; + + #[derive(Default, Clone)] + pub(crate) struct SyncTcCacher { + pub(crate) tc_queue: Arc>>>, + } + impl ReceivesTcCore for SyncTcCacher { + type Error = (); + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed"); + tc_queue.push_back(tc_raw.to_vec()); + Ok(()) + } + } + + #[derive(Default, Clone)] + pub(crate) struct SyncTmSource { + tm_queue: Arc>>>, + } + + impl SyncTmSource { + pub(crate) fn add_tm(&mut self, tm: &[u8]) { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec"); + tm_queue.push_back(tm.to_vec()); + } + } + + impl TmPacketSourceCore for SyncTmSource { + type Error = (); + + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed"); + if !tm_queue.is_empty() { + let next_vec = tm_queue.front().unwrap(); + if buffer.len() < next_vec.len() { + panic!( + "provided buffer too small, must be at least {} bytes", + next_vec.len() + ); + } + let next_vec = tm_queue.pop_front().unwrap(); + buffer[0..next_vec.len()].copy_from_slice(&next_vec); + return Ok(next_vec.len()); + } + Ok(0) + } + } +} diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index 8b13789..b9fc86b 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -1 +1,363 @@ +use delegate::delegate; +use std::{ + io::Write, + net::{SocketAddr, TcpListener, TcpStream}, +}; +use alloc::boxed::Box; + +use crate::{ + encoding::{ccsds::PacketIdLookup, parse_buffer_for_ccsds_space_packets}, + tmtc::{ReceivesTc, TmPacketSource}, +}; + +use super::tcp_server::{ + ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer, +}; + +/// Concrete [TcpTcParser] implementation for the [TcpSpacepacketsServer]. +pub struct SpacepacketsTcParser { + packet_id_lookup: Box, +} + +impl SpacepacketsTcParser { + pub fn new(packet_id_lookup: Box) -> Self { + Self { packet_id_lookup } + } +} + +impl TcpTcParser for SpacepacketsTcParser { + fn handle_tc_parsing( + &mut self, + tc_buffer: &mut [u8], + tc_receiver: &mut (impl ReceivesTc + ?Sized), + conn_result: &mut ConnectionResult, + current_write_idx: usize, + next_write_idx: &mut usize, + ) -> Result<(), TcpTmtcError> { + // Reader vec full, need to parse for packets. + conn_result.num_received_tcs += parse_buffer_for_ccsds_space_packets( + &mut tc_buffer[..current_write_idx], + self.packet_id_lookup.as_ref(), + tc_receiver.upcast_mut(), + next_write_idx, + ) + .map_err(|e| TcpTmtcError::TcError(e))?; + Ok(()) + } +} + +/// Concrete [TcpTmSender] implementation for the [TcpSpacepacketsServer]. +#[derive(Default)] +pub struct SpacepacketsTmSender {} + +impl TcpTmSender for SpacepacketsTmSender { + fn handle_tm_sending( + &mut self, + tm_buffer: &mut [u8], + tm_source: &mut (impl TmPacketSource + ?Sized), + conn_result: &mut ConnectionResult, + stream: &mut TcpStream, + ) -> Result> { + let mut tm_was_sent = false; + loop { + // Write TM until TM source is exhausted. For now, there is no limit for the amount + // of TM written this way. + let read_tm_len = tm_source + .retrieve_packet(tm_buffer) + .map_err(|e| TcpTmtcError::TmError(e))?; + + if read_tm_len == 0 { + return Ok(tm_was_sent); + } + tm_was_sent = true; + conn_result.num_sent_tms += 1; + + stream.write_all(&tm_buffer[..read_tm_len])?; + } + } +} + +/// TCP TMTC server implementation for exchange of tightly stuffed +/// [CCSDS space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf). +/// +/// This serves only works if +/// [CCSDS 133.0-B-2 space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf) are the only +/// packet type being exchanged. It uses the CCSDS [spacepackets::PacketId] as the packet delimiter +/// and start marker when parsing for packets. The user specifies a set of expected +/// [spacepackets::PacketId]s as part of the server configuration for that purpose. +/// +/// ## Example +/// +/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs) +/// also serves as the example application for this module. +pub struct TcpSpacepacketsServer { + generic_server: + TcpTmtcGenericServer, +} + +impl TcpSpacepacketsServer { + /// Create a new TCP TMTC server which exchanges CCSDS space packets. + /// + /// ## Parameter + /// + /// * `cfg` - Configuration of the server. + /// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are + /// then sent back to the client. + /// * `tc_receiver` - Any received telecommands which were decoded successfully will be + /// forwarded to this TC receiver. + /// * `packet_id_lookup` - This lookup table contains the relevant packets IDs for packet + /// parsing. This mechanism is used to have a start marker for finding CCSDS packets. + pub fn new( + cfg: ServerConfig, + tm_source: Box>, + tc_receiver: Box>, + packet_id_lookup: Box, + ) -> Result> { + Ok(Self { + generic_server: TcpTmtcGenericServer::new( + cfg, + SpacepacketsTcParser::new(packet_id_lookup), + SpacepacketsTmSender::default(), + tm_source, + tc_receiver, + )?, + }) + } + + delegate! { + to self.generic_server { + pub fn listener(&mut self) -> &mut TcpListener; + + /// Can be used to retrieve the local assigned address of the TCP server. This is especially + /// useful if using the port number 0 for OS auto-assignment. + pub fn local_addr(&self) -> std::io::Result; + + /// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call. + pub fn handle_next_connection( + &mut self, + ) -> Result>; + } + } +} + +#[cfg(test)] +mod tests { + use core::{ + sync::atomic::{AtomicBool, Ordering}, + time::Duration, + }; + #[allow(unused_imports)] + use std::println; + use std::{ + io::{Read, Write}, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, + thread, + }; + + use alloc::{boxed::Box, sync::Arc}; + use hashbrown::HashSet; + use spacepackets::{ + ecss::{tc::PusTcCreator, SerializablePusPacket}, + PacketId, SpHeader, + }; + + use crate::hal::std::tcp_server::{ + tests::{SyncTcCacher, SyncTmSource}, + ServerConfig, + }; + + use super::TcpSpacepacketsServer; + + const TEST_APID_0: u16 = 0x02; + const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0); + const TEST_APID_1: u16 = 0x10; + const TEST_PACKET_ID_1: PacketId = PacketId::const_tc(true, TEST_APID_1); + + fn generic_tmtc_server( + addr: &SocketAddr, + tc_receiver: SyncTcCacher, + tm_source: SyncTmSource, + packet_id_lookup: HashSet, + ) -> TcpSpacepacketsServer<(), ()> { + TcpSpacepacketsServer::new( + ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), + Box::new(tm_source), + Box::new(tc_receiver), + Box::new(packet_id_lookup), + ) + .expect("TCP server generation failed") + } + + #[test] + fn test_basic_tc_only() { + let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let tc_receiver = SyncTcCacher::default(); + let tm_source = SyncTmSource::default(); + let mut packet_id_lookup = HashSet::new(); + packet_id_lookup.insert(TEST_PACKET_ID_0); + let mut tcp_server = generic_tmtc_server( + &auto_port_addr, + tc_receiver.clone(), + tm_source, + packet_id_lookup, + ); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!(conn_result.num_received_tcs, 1); + assert_eq!(conn_result.num_sent_tms, 0); + set_if_done.store(true, Ordering::Relaxed); + }); + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + let mut buffer: [u8; 32] = [0; 32]; + let packet_len_ping = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + stream + .write_all(&buffer[..packet_len_ping]) + .expect("writing to TCP server failed"); + drop(stream); + + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that TC has arrived. + let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); + assert_eq!(tc_queue.len(), 1); + assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len_ping]); + } + + #[test] + fn test_multi_tc_multi_tm() { + let mut buffer: [u8; 32] = [0; 32]; + let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let tc_receiver = SyncTcCacher::default(); + let mut tm_source = SyncTmSource::default(); + + // Add telemetry + let mut total_tm_len = 0; + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true); + let tm_packet_len = verif_tm + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + total_tm_len += tm_packet_len; + let tm_0 = buffer[..tm_packet_len].to_vec(); + tm_source.add_tm(&tm_0); + let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap(); + let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 3, None, true); + let tm_packet_len = verif_tm + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + total_tm_len += tm_packet_len; + let tm_1 = buffer[..tm_packet_len].to_vec(); + tm_source.add_tm(&tm_1); + + // Set up server + let mut packet_id_lookup = HashSet::new(); + packet_id_lookup.insert(TEST_PACKET_ID_0); + packet_id_lookup.insert(TEST_PACKET_ID_1); + let mut tcp_server = generic_tmtc_server( + &auto_port_addr, + tc_receiver.clone(), + tm_source, + packet_id_lookup, + ); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!( + conn_result.num_received_tcs, 2, + "wrong number of received TCs" + ); + assert_eq!(conn_result.num_sent_tms, 2, "wrong number of sent TMs"); + set_if_done.store(true, Ordering::Relaxed); + }); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting reas timeout failed"); + + // Send telecommands + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + let packet_len = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let tc_0 = buffer[..packet_len].to_vec(); + stream + .write_all(&tc_0) + .expect("writing to TCP server failed"); + let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap(); + let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true); + let packet_len = action_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let tc_1 = buffer[..packet_len].to_vec(); + stream + .write_all(&tc_1) + .expect("writing to TCP server failed"); + + // Done with writing. + stream + .shutdown(std::net::Shutdown::Write) + .expect("shutting down write failed"); + let mut read_buf: [u8; 32] = [0; 32]; + let mut current_idx = 0; + let mut read_len_total = 0; + // Timeout ensures this does not block forever. + while read_len_total < total_tm_len { + let read_len = stream + .read(&mut read_buf[current_idx..]) + .expect("read failed"); + current_idx += read_len; + read_len_total += read_len; + } + drop(stream); + assert_eq!(read_buf[..tm_0.len()], tm_0); + assert_eq!(read_buf[tm_0.len()..tm_0.len() + tm_1.len()], tm_1); + + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that TC has arrived. + let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); + assert_eq!(tc_queue.len(), 2); + assert_eq!(tc_queue.pop_front().unwrap(), tc_0); + assert_eq!(tc_queue.pop_front().unwrap(), tc_1); + } +} diff --git a/satrs-core/tests/tcp_server_cobs.rs b/satrs-core/tests/tcp_servers.rs similarity index 60% rename from satrs-core/tests/tcp_server_cobs.rs rename to satrs-core/tests/tcp_servers.rs index 5956fb3..b3e7993 100644 --- a/satrs-core/tests/tcp_server_cobs.rs +++ b/satrs-core/tests/tcp_servers.rs @@ -21,11 +21,16 @@ use std::{ thread, }; +use hashbrown::HashSet; use satrs_core::{ encoding::cobs::encode_packet_with_cobs, - hal::std::tcp_server::{ServerConfig, TcpTmtcInCobsServer}, + hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer, TcpTmtcInCobsServer}, tmtc::{ReceivesTcCore, TmPacketSourceCore}, }; +use spacepackets::{ + ecss::{tc::PusTcCreator, SerializablePusPacket}, + PacketId, SpHeader, +}; use std::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec}; #[derive(Default, Clone)] @@ -79,15 +84,16 @@ impl TmPacketSourceCore for SyncTmSource { const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5]; const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 4, 1]; +const AUTO_PORT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); -fn main() { - let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); +#[test] +fn test_cobs_server() { let tc_receiver = SyncTcCacher::default(); let mut tm_source = SyncTmSource::default(); // Insert a telemetry packet which will be read back by the client at a later stage. tm_source.add_tm(&INVERTED_PACKET); let mut tcp_server = TcpTmtcInCobsServer::new( - ServerConfig::new(auto_port_addr, Duration::from_millis(2), 1024, 1024), + ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), Box::new(tm_source), Box::new(tc_receiver.clone()), ) @@ -154,3 +160,85 @@ fn main() { assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET); drop(tc_queue); } + +const TEST_APID_0: u16 = 0x02; +const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0); + +#[test] +fn test_ccsds_server() { + let mut buffer: [u8; 32] = [0; 32]; + let tc_receiver = SyncTcCacher::default(); + let mut tm_source = SyncTmSource::default(); + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true); + let tm_packet_len = verif_tm + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + tm_source.add_tm(&buffer[..tm_packet_len]); + let tm_vec = buffer[..tm_packet_len].to_vec(); + let mut packet_id_lookup = HashSet::new(); + packet_id_lookup.insert(TEST_PACKET_ID_0); + let mut tcp_server = TcpSpacepacketsServer::new( + ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), + Box::new(tm_source), + Box::new(tc_receiver.clone()), + Box::new(packet_id_lookup), + ) + .expect("TCP server generation failed"); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!(conn_result.num_received_tcs, 1); + assert_eq!(conn_result.num_sent_tms, 1); + set_if_done.store(true, Ordering::Relaxed); + }); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting reas timeout failed"); + let packet_len = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + stream + .write_all(&buffer[..packet_len]) + .expect("writing to TCP server failed"); + + // Done with writing. + stream + .shutdown(std::net::Shutdown::Write) + .expect("shutting down write failed"); + let mut read_buf: [u8; 16] = [0; 16]; + let mut read_len_total = 0; + // Timeout ensures this does not block forever. + while read_len_total < tm_packet_len { + let read_len = stream.read(&mut read_buf).expect("read failed"); + read_len_total += read_len; + assert_eq!(read_buf[..read_len], tm_vec); + } + drop(stream); + + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that TC has arrived. + let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); + assert_eq!(tc_queue.len(), 1); + assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len]); +}