From ee29961f621108c073498f291fcb987469865656 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Sat, 20 Apr 2024 11:21:02 +0200 Subject: [PATCH 1/5] add first client unittests --- src/interface/tcp_spp_client.rs | 353 ++++++++++++++++++++++++-------- src/main.rs | 3 +- src/pus/action.rs | 10 +- 3 files changed, 281 insertions(+), 85 deletions(-) diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 8e41c43..4044dc6 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -2,6 +2,7 @@ use std::io::{self, Read}; use std::net::TcpStream as StdTcpStream; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::mpsc; +use std::time::Duration; use mio::net::TcpStream as MioTcpStream; use mio::{Events, Interest, Poll, Token}; @@ -82,6 +83,91 @@ impl TcpSppClientCommon { } } +pub struct TcpSppClientStd { + common: TcpSppClientCommon, + read_and_idle_delay: Duration, + // Optional to allow periodic reconnection attempts on the TCP server. + stream: Option, +} + +impl TcpSppClientStd { + pub fn new( + id: ComponentId, + tc_source_tx: mpsc::Sender, + tm_tcp_client_rx: mpsc::Receiver, + valid_ids: &'static [PacketId], + read_timeout: Duration, + port: u16, + ) -> io::Result { + let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); + let mut client = Self { + common: TcpSppClientCommon { + id, + read_buf: [0; 4096], + tm_tcp_client_rx, + server_addr, + tc_source_tx, + validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), + }, + read_and_idle_delay: read_timeout, + stream: None, + }; + client.attempt_connect(true)?; + Ok(client) + } + + pub fn attempt_connect(&mut self, log_error: bool) -> io::Result { + Ok(match StdTcpStream::connect(self.common.server_addr) { + Ok(stream) => { + stream.set_read_timeout(Some(self.read_and_idle_delay))?; + self.stream = Some(stream); + true + } + Err(e) => { + if log_error { + log::warn!("error connecting to server: {}", e); + } + false + } + }) + } + + pub fn operation(&mut self) -> Result<(), ClientError> { + if let Some(client) = &mut self.stream { + // Write TM first before blocking on the read call. + self.common.write_to_server(client)?; + match client.read(&mut self.common.read_buf) { + // Not sure if this can happen or this is actually an error condition.. + Ok(0) => { + log::info!("server closed connection"); + self.stream = None; + } + Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?, + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut + { + self.common.write_to_server(client)?; + return Ok(()); + } + log::warn!("server error: {e:?}"); + if e.kind() == io::ErrorKind::ConnectionReset { + self.stream = None; + } + return Err(e.into()); + } + } + } else { + if self.attempt_connect(false)? { + log::info!("reconnected to server succesfully"); + return self.operation(); + } + std::thread::sleep(self.read_and_idle_delay); + } + + Ok(()) + } +} + #[derive(Debug, PartialEq, Eq)] pub enum ConnectionStatus { Unknown, @@ -90,6 +176,7 @@ pub enum ConnectionStatus { TryingReconnect, } +/// Currently not used, not behaving as expected.. #[allow(dead_code)] pub struct TcpSppClientMio { common: TcpSppClientCommon, @@ -155,11 +242,10 @@ impl TcpSppClientMio { for event in events { if event.token() == Token(0) { if event.is_readable() { - log::warn!("TCP client is readable"); self.read_from_server()?; } + // For some reason, we only get this once.. if event.is_writable() { - log::warn!("TCP client is writable"); self.common.write_to_server(self.client.as_mut().unwrap())?; } } @@ -208,90 +294,193 @@ impl TcpSppClientMio { } } -pub struct TcpSppClientStd { - common: TcpSppClientCommon, - // Optional to allow periodic reconnection attempts on the TCP server. - stream: Option, -} +#[cfg(test)] +mod tests { + use ops_sat_rs::config::EXPERIMENT_APID; + use satrs::spacepackets::{PacketSequenceCtrl, PacketType, SequenceFlags, SpHeader}; + use std::{ + io::Write, + net::{TcpListener, TcpStream}, + thread, + time::Duration, + }; -impl TcpSppClientStd { - pub fn new( - id: ComponentId, - tc_source_tx: mpsc::Sender, - tm_tcp_client_rx: mpsc::Receiver, - valid_ids: &'static [PacketId], - port: u16, - ) -> io::Result { - let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); - let client = match StdTcpStream::connect(server_addr) { - Ok(stream) => { - stream.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?; - Some(stream) - } - Err(e) => { - log::warn!("error connecting to server: {}", e); - None - } - }; - Ok(Self { - common: TcpSppClientCommon { - id, - read_buf: [0; 4096], - tm_tcp_client_rx, - server_addr, - tc_source_tx, - validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), - }, - stream: client, - }) + use super::*; + + const VALID_IDS: &[PacketId] = &[PacketId::new_for_tc(true, EXPERIMENT_APID)]; + + const TEST_TC: SpHeader = SpHeader::new( + PacketId::new(PacketType::Tc, true, EXPERIMENT_APID), + PacketSequenceCtrl::new(SequenceFlags::Unsegmented, 0), + 1, + ); + const TEST_TM: SpHeader = SpHeader::new( + PacketId::new(PacketType::Tm, true, EXPERIMENT_APID), + PacketSequenceCtrl::new(SequenceFlags::Unsegmented, 0), + 1, + ); + + struct TcpServerTestbench { + tcp_server: TcpListener, } - pub fn attempt_connect(&mut self, log_error: bool) -> io::Result { - Ok(match StdTcpStream::connect(self.common.server_addr) { - Ok(stream) => { - stream.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?; - self.stream = Some(stream); - true - } - Err(e) => { - if log_error { - log::warn!("error connecting to server: {}", e); - } - false - } - }) - } - - pub fn operation(&mut self) -> Result<(), ClientError> { - if let Some(client) = &mut self.stream { - match client.read(&mut self.common.read_buf) { - // Not sure if this can happen or this is actually an error condition.. - Ok(0) => { - log::info!("server closed connection"); - self.stream = None; - } - Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?, - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut - { - self.common.write_to_server(client)?; - return Ok(()); - } - log::warn!("server error: {e:?}"); - if e.kind() == io::ErrorKind::ConnectionReset { - self.stream = None; - } - return Err(e.into()); - } - } - } else { - if self.attempt_connect(false)? { - log::info!("reconnected to server succesfully"); - return self.operation(); - } - std::thread::sleep(STOP_CHECK_FREQUENCY); + impl TcpServerTestbench { + fn new() -> Self { + let tcp_server = TcpListener::bind("127.0.0.1:0").unwrap(); + tcp_server + .set_nonblocking(true) + .expect("setting TCP server non-blocking failed"); + Self { tcp_server } } - Ok(()) + fn local_addr(&self) -> SocketAddr { + self.tcp_server.local_addr().unwrap() + } + + fn attempt_connection(&mut self, limit: u32) -> Result { + for _ in 0..limit { + match self.tcp_server.accept() { + Ok((stream, _)) => { + return Ok(stream); + } + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + thread::sleep(Duration::from_millis(10)); + continue; + } + panic!("TCP server accept error: {:?}", e); + } + } + } + Err(()) + } + } + // This test just simplifies that the client properly connects to a server. + #[test] + fn basic_client_test() { + let (tc_source_tx, _tc_source_rx) = mpsc::channel(); + let (_tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); + let mut tcp_server = TcpServerTestbench::new(); + let local_addr = tcp_server.local_addr(); + let jh0 = thread::spawn(move || { + tcp_server + .attempt_connection(3) + .expect("no client connection detected"); + }); + let mut spp_client = TcpSppClientStd::new( + 1, + tc_source_tx, + tm_tcp_client_rx, + VALID_IDS, + Duration::from_millis(30), + local_addr.port(), + ) + .expect("creating TCP SPP client failed"); + spp_client.operation().unwrap(); + jh0.join().unwrap(); + } + + // This test verifies that TM is sent to the server properly. + #[test] + fn basic_client_tm_test() { + let (tc_source_tx, _tc_source_rx) = mpsc::channel(); + let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); + let mut tcp_server = TcpServerTestbench::new(); + let local_addr = tcp_server.local_addr(); + let mut buf: [u8; 7] = [0; 7]; + TEST_TM + .write_to_be_bytes(&mut buf) + .expect("writing TM failed"); + let jh0 = thread::spawn(move || { + let mut read_expected_data = false; + let mut read_buf: [u8; 64] = [0; 64]; + let mut stream = tcp_server + .attempt_connection(3) + .expect("no client connection detected"); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting read timeout failed"); + for _ in 0..5 { + match stream.read(&mut read_buf) { + Ok(0) => {} + Ok(len) => { + assert_eq!(&buf, &read_buf[0..len]); + read_expected_data = true; + break; + } + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + continue; + } + panic!("TCP server read error: {:?}", e); + } + } + if read_expected_data { + break; + } + } + if !read_expected_data { + panic!("did not receive expected data"); + } + }); + tm_tcp_client_tx + .send(PacketAsVec::new(0, buf.to_vec())) + .unwrap(); + let mut spp_client = TcpSppClientStd::new( + 1, + tc_source_tx, + tm_tcp_client_rx, + VALID_IDS, + Duration::from_millis(30), + local_addr.port(), + ) + .expect("creating TCP SPP client failed"); + spp_client.operation().unwrap(); + + jh0.join().unwrap(); + } + + // Test that the client can read telecommands from the server. + #[test] + fn basic_client_tc_test() { + let (tc_source_tx, tc_source_rx) = mpsc::channel(); + let (_tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); + let mut tcp_server = TcpServerTestbench::new(); + let local_addr = tcp_server.local_addr(); + let mut buf: [u8; 8] = [0; 8]; + TEST_TC + .write_to_be_bytes(&mut buf) + .expect("writing TM failed"); + let jh0 = thread::spawn(move || { + let mut stream = tcp_server + .attempt_connection(3) + .expect("no client connection detected"); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting read timeout failed"); + stream.write_all(&buf).expect("writing TC failed"); + }); + + let mut spp_client = TcpSppClientStd::new( + 1, + tc_source_tx, + tm_tcp_client_rx, + VALID_IDS, + Duration::from_millis(30), + local_addr.port(), + ) + .expect("creating TCP SPP client failed"); + let mut received_packet = false; + (0..3).for_each(|_| { + spp_client.operation().unwrap(); + if let Ok(packet) = tc_source_rx.try_recv() { + assert_eq!(packet.packet, buf.to_vec()); + received_packet = true; + } + }); + if !received_packet { + panic!("did not receive expected data"); + } + jh0.join().unwrap(); } } diff --git a/src/main.rs b/src/main.rs index 36e4fc2..ee9b48d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ use log::info; use ops_sat_rs::config::{ cfg_file::create_app_config, components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, - tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, + tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY}, VALID_PACKET_ID_LIST, }; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; @@ -171,6 +171,7 @@ fn main() { tc_source_tx, tm_tcp_client_rx, VALID_PACKET_ID_LIST, + STOP_CHECK_FREQUENCY, app_cfg.tcp_spp_server_port, ) .expect("creating TCP SPP client failed"); diff --git a/src/pus/action.rs b/src/pus/action.rs index cd2472f..62166e5 100644 --- a/src/pus/action.rs +++ b/src/pus/action.rs @@ -275,8 +275,8 @@ mod tests { use satrs::pus::test_util::{ TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1, }; - use satrs::pus::verification; use satrs::pus::verification::test_util::TestVerificationReporter; + use satrs::pus::{verification, TcInMemory}; use satrs::request::MessageMetadata; use satrs::ComponentId; use satrs::{ @@ -429,7 +429,13 @@ mod tests { .verif_reporter() .check_next_is_acceptance_success(id, accepted_token.request_id()); self.pus_packet_tx - .send(EcssTcAndToken::new(tc.to_vec().unwrap(), accepted_token)) + .send(EcssTcAndToken::new( + TcInMemory::Vec(PacketAsVec::new( + self.service.service_helper.id(), + tc.to_vec().unwrap(), + )), + accepted_token, + )) .unwrap(); } } From 96d5802c4f7df7812383f93bcbe99c90b6b91f19 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 22 Apr 2024 10:38:18 +0200 Subject: [PATCH 2/5] added tmtc test --- src/interface/tcp_spp_client.rs | 117 ++++++++++++++++++++++++++------ 1 file changed, 96 insertions(+), 21 deletions(-) diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 4044dc6..52a744d 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -354,7 +354,39 @@ mod tests { } Err(()) } + + fn try_reading_one_packet( + &mut self, + stream: &mut TcpStream, + limit: u32, + read_buf: &mut [u8], + ) -> usize { + let mut read_data = 0; + + for _ in 0..limit { + match stream.read(read_buf) { + Ok(0) => {} + Ok(len) => { + // assert_eq!(&tm_buf, &read_buf[0..len]); + // read_bufd_expected_data = true; + read_data = len; + break; + } + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + continue; + } + panic!("TCP server read error: {:?}", e); + } + } + if read_data > 0 { + break; + } + } + read_data + } } + // This test just simplifies that the client properly connects to a server. #[test] fn basic_client_test() { @@ -392,7 +424,6 @@ mod tests { .write_to_be_bytes(&mut buf) .expect("writing TM failed"); let jh0 = thread::spawn(move || { - let mut read_expected_data = false; let mut read_buf: [u8; 64] = [0; 64]; let mut stream = tcp_server .attempt_connection(3) @@ -400,27 +431,12 @@ mod tests { stream .set_read_timeout(Some(Duration::from_millis(10))) .expect("setting read timeout failed"); - for _ in 0..5 { - match stream.read(&mut read_buf) { - Ok(0) => {} - Ok(len) => { - assert_eq!(&buf, &read_buf[0..len]); - read_expected_data = true; - break; - } - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock { - continue; - } - panic!("TCP server read error: {:?}", e); - } - } - if read_expected_data { - break; - } - } - if !read_expected_data { + + let read_bytes = tcp_server.try_reading_one_packet(&mut stream, 5, &mut read_buf); + if read_bytes == 0 { panic!("did not receive expected data"); + } else { + assert_eq!(&buf, &read_buf[0..read_bytes]); } }); tm_tcp_client_tx @@ -483,4 +499,63 @@ mod tests { } jh0.join().unwrap(); } + + // Test that the client can both read telecommands from the server and send back + // telemetry to the server. + #[test] + fn basic_client_tmtc_test() { + let (tc_source_tx, tc_source_rx) = mpsc::channel(); + let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); + let mut tcp_server = TcpServerTestbench::new(); + let local_addr = tcp_server.local_addr(); + let mut tc_buf: [u8; 8] = [0; 8]; + let mut tm_buf: [u8; 8] = [0; 8]; + TEST_TC + .write_to_be_bytes(&mut tc_buf) + .expect("writing TM failed"); + TEST_TM + .write_to_be_bytes(&mut tm_buf) + .expect("writing TM failed"); + let jh0 = thread::spawn(move || { + let mut read_buf: [u8; 64] = [0; 64]; + let mut stream = tcp_server + .attempt_connection(3) + .expect("no client connection detected"); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting read timeout failed"); + stream.write_all(&tc_buf).expect("writing TC failed"); + let read_bytes = tcp_server.try_reading_one_packet(&mut stream, 5, &mut read_buf); + if read_bytes == 0 { + panic!("did not receive expected data"); + } else { + assert_eq!(&tm_buf, &read_buf[0..read_bytes]); + } + }); + tm_tcp_client_tx + .send(PacketAsVec::new(0, tm_buf.to_vec())) + .unwrap(); + + let mut spp_client = TcpSppClientStd::new( + 1, + tc_source_tx, + tm_tcp_client_rx, + VALID_IDS, + Duration::from_millis(30), + local_addr.port(), + ) + .expect("creating TCP SPP client failed"); + let mut received_packet = false; + (0..3).for_each(|_| { + spp_client.operation().unwrap(); + if let Ok(packet) = tc_source_rx.try_recv() { + assert_eq!(packet.packet, tc_buf.to_vec()); + received_packet = true; + } + }); + if !received_packet { + panic!("did not receive expected data"); + } + jh0.join().unwrap(); + } } From 9c74246eb3d62bd62f60b07885106b26447ad68b Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 22 Apr 2024 10:39:44 +0200 Subject: [PATCH 3/5] added test stub --- Cargo.lock | 38 ++++++++++++++++----------------- src/interface/tcp_spp_client.rs | 6 ++++++ 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 18d474e..2952816 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -138,9 +138,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "cc" -version = "1.0.94" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" +checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b" [[package]] name = "cfg-if" @@ -251,7 +251,7 @@ checksum = "d150dea618e920167e5973d70ae6ece4385b7164e0d799fe7c122dd0a5d912ad" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -465,7 +465,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -640,7 +640,7 @@ checksum = "474b583ffa521fbc362ae71da11c9fe29a6b60af47744e067550b6eef4f60d43" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -681,7 +681,7 @@ checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -731,9 +731,9 @@ dependencies = [ [[package]] name = "spacepackets" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08e05169d37db5d0f8527f4abcacb72f6d178654879892c0e37ab5f04df85e3e" +checksum = "fa9f4d7df5fa3bc25ecfc95f1f612fc3d16c566df538d3d3c82db0e523096216" dependencies = [ "chrono", "crc", @@ -764,7 +764,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -780,9 +780,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.59" +version = "2.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a6531ffc7b071655e4ce2e04bd464c4830bb585a61cabb96cf808f05172615a" +checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3" dependencies = [ "proc-macro2", "quote", @@ -791,22 +791,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.58" +version = "1.0.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" +checksum = "f0126ad08bff79f29fc3ae6a55cc72352056dfff61e3ff8bb7129476d44b23aa" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.58" +version = "1.0.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" +checksum = "d1cd413b5d558b4c5bf3680e324a6fa5014e7b7c067a51e69dbdf47eb7148b66" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -899,7 +899,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", "wasm-bindgen-shared", ] @@ -921,7 +921,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -1116,5 +1116,5 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 52a744d..280a68b 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -558,4 +558,10 @@ mod tests { } jh0.join().unwrap(); } + + #[test] + fn test_broken_connection() { + // TODO: Verify the client re-connects automatically if the server is dropped and then set + // up again. + } } From 51473e7060b40bfe18a04cede479d4215c51b710 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 22 Apr 2024 15:38:53 +0200 Subject: [PATCH 4/5] that should suffice --- src/interface/tcp_spp_client.rs | 130 ++++++++++++++++++++++++++++---- 1 file changed, 114 insertions(+), 16 deletions(-) diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 280a68b..fe34e52 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -25,6 +25,12 @@ pub enum ClientError { Io(#[from] io::Error), } +#[derive(Debug)] +pub enum ClientResult { + Ok, + ConnectionLost, +} + #[allow(dead_code)] pub struct TcpSppClientCommon { id: ComponentId, @@ -73,6 +79,7 @@ impl TcpSppClientCommon { Err(e) => match e { mpsc::TryRecvError::Empty => break, mpsc::TryRecvError::Disconnected => { + println!("god fuckikng damn it"); log::error!("TM sender to TCP client has disconnected"); break; } @@ -132,7 +139,12 @@ impl TcpSppClientStd { }) } - pub fn operation(&mut self) -> Result<(), ClientError> { + #[allow(dead_code)] + pub fn connected(&self) -> bool { + self.stream.is_some() + } + + pub fn operation(&mut self) -> Result { if let Some(client) = &mut self.stream { // Write TM first before blocking on the read call. self.common.write_to_server(client)?; @@ -141,17 +153,19 @@ impl TcpSppClientStd { Ok(0) => { log::info!("server closed connection"); self.stream = None; + return Ok(ClientResult::ConnectionLost); } Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?, Err(e) => { if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut { self.common.write_to_server(client)?; - return Ok(()); + return Ok(ClientResult::ConnectionLost); } log::warn!("server error: {e:?}"); if e.kind() == io::ErrorKind::ConnectionReset { self.stream = None; + return Ok(ClientResult::ConnectionLost); } return Err(e.into()); } @@ -164,7 +178,7 @@ impl TcpSppClientStd { std::thread::sleep(self.read_and_idle_delay); } - Ok(()) + Ok(ClientResult::Ok) } } @@ -301,6 +315,7 @@ mod tests { use std::{ io::Write, net::{TcpListener, TcpStream}, + sync::{atomic::AtomicBool, Arc}, thread, time::Duration, }; @@ -320,13 +335,18 @@ mod tests { 1, ); + fn init() { + let _ = env_logger::builder().is_test(true).try_init(); + } + struct TcpServerTestbench { tcp_server: TcpListener, } impl TcpServerTestbench { - fn new() -> Self { - let tcp_server = TcpListener::bind("127.0.0.1:0").unwrap(); + fn new(port: u16) -> Self { + let tcp_server = + TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)).unwrap(); tcp_server .set_nonblocking(true) .expect("setting TCP server non-blocking failed"); @@ -337,7 +357,7 @@ mod tests { self.tcp_server.local_addr().unwrap() } - fn attempt_connection(&mut self, limit: u32) -> Result { + fn check_for_connections(&mut self, limit: u32) -> Result { for _ in 0..limit { match self.tcp_server.accept() { Ok((stream, _)) => { @@ -392,11 +412,11 @@ mod tests { fn basic_client_test() { let (tc_source_tx, _tc_source_rx) = mpsc::channel(); let (_tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); - let mut tcp_server = TcpServerTestbench::new(); + let mut tcp_server = TcpServerTestbench::new(0); let local_addr = tcp_server.local_addr(); let jh0 = thread::spawn(move || { tcp_server - .attempt_connection(3) + .check_for_connections(3) .expect("no client connection detected"); }); let mut spp_client = TcpSppClientStd::new( @@ -417,7 +437,7 @@ mod tests { fn basic_client_tm_test() { let (tc_source_tx, _tc_source_rx) = mpsc::channel(); let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); - let mut tcp_server = TcpServerTestbench::new(); + let mut tcp_server = TcpServerTestbench::new(0); let local_addr = tcp_server.local_addr(); let mut buf: [u8; 7] = [0; 7]; TEST_TM @@ -426,7 +446,7 @@ mod tests { let jh0 = thread::spawn(move || { let mut read_buf: [u8; 64] = [0; 64]; let mut stream = tcp_server - .attempt_connection(3) + .check_for_connections(3) .expect("no client connection detected"); stream .set_read_timeout(Some(Duration::from_millis(10))) @@ -461,7 +481,7 @@ mod tests { fn basic_client_tc_test() { let (tc_source_tx, tc_source_rx) = mpsc::channel(); let (_tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); - let mut tcp_server = TcpServerTestbench::new(); + let mut tcp_server = TcpServerTestbench::new(0); let local_addr = tcp_server.local_addr(); let mut buf: [u8; 8] = [0; 8]; TEST_TC @@ -469,7 +489,7 @@ mod tests { .expect("writing TM failed"); let jh0 = thread::spawn(move || { let mut stream = tcp_server - .attempt_connection(3) + .check_for_connections(3) .expect("no client connection detected"); stream .set_read_timeout(Some(Duration::from_millis(10))) @@ -486,6 +506,7 @@ mod tests { local_addr.port(), ) .expect("creating TCP SPP client failed"); + assert!(spp_client.connected()); let mut received_packet = false; (0..3).for_each(|_| { spp_client.operation().unwrap(); @@ -506,7 +527,7 @@ mod tests { fn basic_client_tmtc_test() { let (tc_source_tx, tc_source_rx) = mpsc::channel(); let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); - let mut tcp_server = TcpServerTestbench::new(); + let mut tcp_server = TcpServerTestbench::new(0); let local_addr = tcp_server.local_addr(); let mut tc_buf: [u8; 8] = [0; 8]; let mut tm_buf: [u8; 8] = [0; 8]; @@ -519,7 +540,7 @@ mod tests { let jh0 = thread::spawn(move || { let mut read_buf: [u8; 64] = [0; 64]; let mut stream = tcp_server - .attempt_connection(3) + .check_for_connections(3) .expect("no client connection detected"); stream .set_read_timeout(Some(Duration::from_millis(10))) @@ -545,6 +566,7 @@ mod tests { local_addr.port(), ) .expect("creating TCP SPP client failed"); + assert!(spp_client.connected()); let mut received_packet = false; (0..3).for_each(|_| { spp_client.operation().unwrap(); @@ -561,7 +583,83 @@ mod tests { #[test] fn test_broken_connection() { - // TODO: Verify the client re-connects automatically if the server is dropped and then set - // up again. + init(); + let (tc_source_tx, _tc_source_rx) = mpsc::channel(); + let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); + let mut tcp_server = TcpServerTestbench::new(0); + let local_port = tcp_server.local_addr().port(); + let drop_signal = Arc::new(AtomicBool::new(false)); + let drop_signal_0 = drop_signal.clone(); + let mut tc_buf: [u8; 8] = [0; 8]; + let mut tm_buf: [u8; 8] = [0; 8]; + TEST_TC + .write_to_be_bytes(&mut tc_buf) + .expect("writing TM failed"); + TEST_TM + .write_to_be_bytes(&mut tm_buf) + .expect("writing TM failed"); + + let mut jh0 = thread::spawn(move || { + tcp_server + .check_for_connections(3) + .expect("no client connection detected"); + drop_signal_0.store(true, std::sync::atomic::Ordering::Relaxed); + }); + let mut spp_client = TcpSppClientStd::new( + 1, + tc_source_tx, + tm_tcp_client_rx, + VALID_IDS, + Duration::from_millis(30), + local_port, + ) + .expect("creating TCP SPP client failed"); + while !drop_signal.load(std::sync::atomic::Ordering::Relaxed) { + std::thread::sleep(Duration::from_millis(100)); + } + tm_tcp_client_tx + .send(PacketAsVec::new(0, tm_buf.to_vec())) + .unwrap(); + match spp_client.operation() { + Ok(ClientResult::ConnectionLost) => (), + Ok(ClientResult::Ok) => { + panic!("expected operation error"); + } + Err(ClientError::Io(e)) => { + println!("io error: {:?}", e); + if e.kind() != io::ErrorKind::ConnectionReset + && e.kind() != io::ErrorKind::ConnectionAborted + { + panic!("expected some disconnet error"); + } + } + _ => { + panic!("unexpected error") + } + }; + assert!(!spp_client.connected()); + jh0.join().unwrap(); + // spp_client.operation(); + tcp_server = TcpServerTestbench::new(local_port); + tm_tcp_client_tx + .send(PacketAsVec::new(0, tm_buf.to_vec())) + .unwrap(); + jh0 = thread::spawn(move || { + let mut stream = tcp_server + .check_for_connections(3) + .expect("no client connection detected"); + let mut read_buf: [u8; 64] = [0; 64]; + let read_bytes = tcp_server.try_reading_one_packet(&mut stream, 5, &mut read_buf); + if read_bytes == 0 { + panic!("did not receive expected data"); + } else { + assert_eq!(&tm_buf, &read_buf[0..read_bytes]); + } + }); + let result = spp_client.operation(); + println!("{:?}", result); + assert!(!spp_client.connected()); + assert!(result.is_ok()); + jh0.join().unwrap(); } } From e82139ac9182d06866737e9a2542c39cf0da0c3a Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 22 Apr 2024 15:41:28 +0200 Subject: [PATCH 5/5] README --- README.md | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ae2cbf8..492f072 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ You can also find some more general documentation about OPS-SAT there. [podman](https://podman.io/) installed - [`cross`](https://github.com/cross-rs/cross) package installed -## Build +## Build for Target Hardware You might need to set the [`CROSS_CONTAINER_ENGINE`](https://github.com/cross-rs/cross/wiki/FAQ#explicitly-choose-the-container-engine) and [`CROSS_ROOTLESS_CONTAINER_ENGINE`](https://github.com/cross-rs/cross/blob/main/docs/environment_variables.md#configuring-cross-with-environment-variables) @@ -29,6 +29,23 @@ cross build cross build --release ``` +## Build for Host + +The software was designed to be runnable and testable on a host computer. +You can use the regular cargo workflow for this. + +### Running + +```sh +cargo run +``` + +### Testing + +```sh +cargo test +``` + ## Commanding Infrastructure Commanding of the `ops-sat-rs` application is possible by different means.