From 35cef32ebfea2ef4f927d2f3655e2b606a406212 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 26 Sep 2023 15:57:51 +0200 Subject: [PATCH] link corrections --- satrs-core/src/hal/std/mod.rs | 2 +- ...with_cobs_server.rs => tcp_cobs_server.rs} | 2 +- satrs-core/src/hal/std/tcp_server.rs | 4 +- .../src/hal/std/tcp_spacepackets_server.rs | 119 ++++-------------- .../{tcp_server_cobs.rs => tcp_servers.rs} | 96 +++++++++++++- 5 files changed, 118 insertions(+), 105 deletions(-) rename satrs-core/src/hal/std/{tcp_with_cobs_server.rs => tcp_cobs_server.rs} (99%) rename satrs-core/tests/{tcp_server_cobs.rs => tcp_servers.rs} (60%) diff --git a/satrs-core/src/hal/std/mod.rs b/satrs-core/src/hal/std/mod.rs index 17ec012..50c19d7 100644 --- a/satrs-core/src/hal/std/mod.rs +++ b/satrs-core/src/hal/std/mod.rs @@ -2,5 +2,5 @@ pub mod tcp_server; pub mod udp_server; +mod tcp_cobs_server; mod tcp_spacepackets_server; -mod tcp_with_cobs_server; diff --git a/satrs-core/src/hal/std/tcp_with_cobs_server.rs b/satrs-core/src/hal/std/tcp_cobs_server.rs similarity index 99% rename from satrs-core/src/hal/std/tcp_with_cobs_server.rs rename to satrs-core/src/hal/std/tcp_cobs_server.rs index 60231c6..4a22a8a 100644 --- a/satrs-core/src/hal/std/tcp_with_cobs_server.rs +++ b/satrs-core/src/hal/std/tcp_cobs_server.rs @@ -109,7 +109,7 @@ impl TcpTmSender for CobsTmSender { /// /// ## Example /// -/// The [TCP COBS integration](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_server_cobs.rs) +/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs) /// test also serves as the example application for this module. pub struct TcpTmtcInCobsServer { generic_server: TcpTmtcGenericServer, diff --git a/satrs-core/src/hal/std/tcp_server.rs b/satrs-core/src/hal/std/tcp_server.rs index 2e8bb01..e9fe657 100644 --- a/satrs-core/src/hal/std/tcp_server.rs +++ b/satrs-core/src/hal/std/tcp_server.rs @@ -12,10 +12,10 @@ use crate::tmtc::{ReceivesTc, TmPacketSource}; use thiserror::Error; // Re-export the TMTC in COBS server. +pub use crate::hal::std::tcp_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer}; pub use crate::hal::std::tcp_spacepackets_server::{ - CcsdsTcParser, CcsdsTmSender, TcpSpacepacketsServer, + SpacepacketsTcParser, SpacepacketsTmSender, TcpSpacepacketsServer, }; -pub use crate::hal::std::tcp_with_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer}; /// Configuration struct for the generic TCP TMTC server /// diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index 827f774..b9fc86b 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -15,18 +15,18 @@ use super::tcp_server::{ ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer, }; -/// Concrete [TcpTcParser] implementation for the []. -pub struct CcsdsTcParser { +/// Concrete [TcpTcParser] implementation for the [TcpSpacepacketsServer]. +pub struct SpacepacketsTcParser { packet_id_lookup: Box, } -impl CcsdsTcParser { +impl SpacepacketsTcParser { pub fn new(packet_id_lookup: Box) -> Self { Self { packet_id_lookup } } } -impl TcpTcParser for CcsdsTcParser { +impl TcpTcParser for SpacepacketsTcParser { fn handle_tc_parsing( &mut self, tc_buffer: &mut [u8], @@ -47,11 +47,11 @@ impl TcpTcParser for CcsdsTcParser } } -/// Concrete [TcpTmSender] implementation for the []. +/// Concrete [TcpTmSender] implementation for the [TcpSpacepacketsServer]. #[derive(Default)] -pub struct CcsdsTmSender {} +pub struct SpacepacketsTmSender {} -impl TcpTmSender for CcsdsTmSender { +impl TcpTmSender for SpacepacketsTmSender { fn handle_tm_sending( &mut self, tm_buffer: &mut [u8], @@ -78,24 +78,26 @@ impl TcpTmSender for CcsdsTmSender { } } -/// TCP TMTC server implementation for exchange of tightly stuffed CCSDS space packets. +/// TCP TMTC server implementation for exchange of tightly stuffed +/// [CCSDS space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf). /// -/// This serves only works if CCSDS space packets are the only packet type being exchanged. -/// It uses the CCSDS [spacepackets::PacketId] as the packet delimiter and start marker when -/// parsing for packets. The user specifies a set of expected [spacepackets::PacketId]s as part -/// of the server configuration for that purpose. +/// This serves only works if +/// [CCSDS 133.0-B-2 space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf) are the only +/// packet type being exchanged. It uses the CCSDS [spacepackets::PacketId] as the packet delimiter +/// and start marker when parsing for packets. The user specifies a set of expected +/// [spacepackets::PacketId]s as part of the server configuration for that purpose. /// /// ## Example /// -/// The [TCP integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_server_cobs.rs) +/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs) /// also serves as the example application for this module. pub struct TcpSpacepacketsServer { - generic_server: TcpTmtcGenericServer, + generic_server: + TcpTmtcGenericServer, } impl TcpSpacepacketsServer { - /// Create a new TCP TMTC server which exchanges TMTC packets encoded with - /// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). + /// Create a new TCP TMTC server which exchanges CCSDS space packets. /// /// ## Parameter /// @@ -104,6 +106,8 @@ impl TcpSpacepacketsServer /// then sent back to the client. /// * `tc_receiver` - Any received telecommands which were decoded successfully will be /// forwarded to this TC receiver. + /// * `packet_id_lookup` - This lookup table contains the relevant packets IDs for packet + /// parsing. This mechanism is used to have a start marker for finding CCSDS packets. pub fn new( cfg: ServerConfig, tm_source: Box>, @@ -113,8 +117,8 @@ impl TcpSpacepacketsServer Ok(Self { generic_server: TcpTmtcGenericServer::new( cfg, - CcsdsTcParser::new(packet_id_lookup), - CcsdsTmSender::default(), + SpacepacketsTcParser::new(packet_id_lookup), + SpacepacketsTmSender::default(), tm_source, tc_receiver, )?, @@ -241,85 +245,6 @@ mod tests { assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len_ping]); } - #[test] - fn test_basic_tc_and_tm() { - let mut buffer: [u8; 32] = [0; 32]; - let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); - let tc_receiver = SyncTcCacher::default(); - let mut tm_source = SyncTmSource::default(); - let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); - let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true); - let tm_packet_len = verif_tm - .write_to_bytes(&mut buffer) - .expect("writing packet failed"); - tm_source.add_tm(&buffer[..tm_packet_len]); - let tm_vec = buffer[..tm_packet_len].to_vec(); - let mut packet_id_lookup = HashSet::new(); - packet_id_lookup.insert(TEST_PACKET_ID_0); - let mut tcp_server = generic_tmtc_server( - &auto_port_addr, - tc_receiver.clone(), - tm_source, - packet_id_lookup, - ); - let dest_addr = tcp_server - .local_addr() - .expect("retrieving dest addr failed"); - let conn_handled: Arc = Default::default(); - let set_if_done = conn_handled.clone(); - // Call the connection handler in separate thread, does block. - thread::spawn(move || { - let result = tcp_server.handle_next_connection(); - if result.is_err() { - panic!("handling connection failed: {:?}", result.unwrap_err()); - } - let conn_result = result.unwrap(); - assert_eq!(conn_result.num_received_tcs, 1); - assert_eq!(conn_result.num_sent_tms, 1); - set_if_done.store(true, Ordering::Relaxed); - }); - let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); - let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); - let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); - stream - .set_read_timeout(Some(Duration::from_millis(10))) - .expect("setting reas timeout failed"); - let packet_len = ping_tc - .write_to_bytes(&mut buffer) - .expect("writing packet failed"); - stream - .write_all(&buffer[..packet_len]) - .expect("writing to TCP server failed"); - - // Done with writing. - stream - .shutdown(std::net::Shutdown::Write) - .expect("shutting down write failed"); - let mut read_buf: [u8; 16] = [0; 16]; - let mut read_len_total = 0; - // Timeout ensures this does not block forever. - while read_len_total < tm_packet_len { - let read_len = stream.read(&mut read_buf).expect("read failed"); - read_len_total += read_len; - assert_eq!(read_buf[..read_len], tm_vec); - } - drop(stream); - - // A certain amount of time is allowed for the transaction to complete. - for _ in 0..3 { - if !conn_handled.load(Ordering::Relaxed) { - thread::sleep(Duration::from_millis(5)); - } - } - if !conn_handled.load(Ordering::Relaxed) { - panic!("connection was not handled properly"); - } - // Check that TC has arrived. - let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); - assert_eq!(tc_queue.len(), 1); - assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len]); - } - #[test] fn test_multi_tc_multi_tm() { let mut buffer: [u8; 32] = [0; 32]; diff --git a/satrs-core/tests/tcp_server_cobs.rs b/satrs-core/tests/tcp_servers.rs similarity index 60% rename from satrs-core/tests/tcp_server_cobs.rs rename to satrs-core/tests/tcp_servers.rs index 5956fb3..b3e7993 100644 --- a/satrs-core/tests/tcp_server_cobs.rs +++ b/satrs-core/tests/tcp_servers.rs @@ -21,11 +21,16 @@ use std::{ thread, }; +use hashbrown::HashSet; use satrs_core::{ encoding::cobs::encode_packet_with_cobs, - hal::std::tcp_server::{ServerConfig, TcpTmtcInCobsServer}, + hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer, TcpTmtcInCobsServer}, tmtc::{ReceivesTcCore, TmPacketSourceCore}, }; +use spacepackets::{ + ecss::{tc::PusTcCreator, SerializablePusPacket}, + PacketId, SpHeader, +}; use std::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec}; #[derive(Default, Clone)] @@ -79,15 +84,16 @@ impl TmPacketSourceCore for SyncTmSource { const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5]; const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 4, 1]; +const AUTO_PORT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); -fn main() { - let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); +#[test] +fn test_cobs_server() { let tc_receiver = SyncTcCacher::default(); let mut tm_source = SyncTmSource::default(); // Insert a telemetry packet which will be read back by the client at a later stage. tm_source.add_tm(&INVERTED_PACKET); let mut tcp_server = TcpTmtcInCobsServer::new( - ServerConfig::new(auto_port_addr, Duration::from_millis(2), 1024, 1024), + ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), Box::new(tm_source), Box::new(tc_receiver.clone()), ) @@ -154,3 +160,85 @@ fn main() { assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET); drop(tc_queue); } + +const TEST_APID_0: u16 = 0x02; +const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0); + +#[test] +fn test_ccsds_server() { + let mut buffer: [u8; 32] = [0; 32]; + let tc_receiver = SyncTcCacher::default(); + let mut tm_source = SyncTmSource::default(); + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true); + let tm_packet_len = verif_tm + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + tm_source.add_tm(&buffer[..tm_packet_len]); + let tm_vec = buffer[..tm_packet_len].to_vec(); + let mut packet_id_lookup = HashSet::new(); + packet_id_lookup.insert(TEST_PACKET_ID_0); + let mut tcp_server = TcpSpacepacketsServer::new( + ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), + Box::new(tm_source), + Box::new(tc_receiver.clone()), + Box::new(packet_id_lookup), + ) + .expect("TCP server generation failed"); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!(conn_result.num_received_tcs, 1); + assert_eq!(conn_result.num_sent_tms, 1); + set_if_done.store(true, Ordering::Relaxed); + }); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting reas timeout failed"); + let packet_len = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + stream + .write_all(&buffer[..packet_len]) + .expect("writing to TCP server failed"); + + // Done with writing. + stream + .shutdown(std::net::Shutdown::Write) + .expect("shutting down write failed"); + let mut read_buf: [u8; 16] = [0; 16]; + let mut read_len_total = 0; + // Timeout ensures this does not block forever. + while read_len_total < tm_packet_len { + let read_len = stream.read(&mut read_buf).expect("read failed"); + read_len_total += read_len; + assert_eq!(read_buf[..read_len], tm_vec); + } + drop(stream); + + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that TC has arrived. + let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); + assert_eq!(tc_queue.len(), 1); + assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len]); +}