From ee29961f621108c073498f291fcb987469865656 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Sat, 20 Apr 2024 11:21:02 +0200 Subject: [PATCH] add first client unittests --- src/interface/tcp_spp_client.rs | 353 ++++++++++++++++++++++++-------- src/main.rs | 3 +- src/pus/action.rs | 10 +- 3 files changed, 281 insertions(+), 85 deletions(-) diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 8e41c43..4044dc6 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -2,6 +2,7 @@ use std::io::{self, Read}; use std::net::TcpStream as StdTcpStream; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::mpsc; +use std::time::Duration; use mio::net::TcpStream as MioTcpStream; use mio::{Events, Interest, Poll, Token}; @@ -82,6 +83,91 @@ impl TcpSppClientCommon { } } +pub struct TcpSppClientStd { + common: TcpSppClientCommon, + read_and_idle_delay: Duration, + // Optional to allow periodic reconnection attempts on the TCP server. + stream: Option, +} + +impl TcpSppClientStd { + pub fn new( + id: ComponentId, + tc_source_tx: mpsc::Sender, + tm_tcp_client_rx: mpsc::Receiver, + valid_ids: &'static [PacketId], + read_timeout: Duration, + port: u16, + ) -> io::Result { + let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); + let mut client = Self { + common: TcpSppClientCommon { + id, + read_buf: [0; 4096], + tm_tcp_client_rx, + server_addr, + tc_source_tx, + validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), + }, + read_and_idle_delay: read_timeout, + stream: None, + }; + client.attempt_connect(true)?; + Ok(client) + } + + pub fn attempt_connect(&mut self, log_error: bool) -> io::Result { + Ok(match StdTcpStream::connect(self.common.server_addr) { + Ok(stream) => { + stream.set_read_timeout(Some(self.read_and_idle_delay))?; + self.stream = Some(stream); + true + } + Err(e) => { + if log_error { + log::warn!("error connecting to server: {}", e); + } + false + } + }) + } + + pub fn operation(&mut self) -> Result<(), ClientError> { + if let Some(client) = &mut self.stream { + // Write TM first before blocking on the read call. + self.common.write_to_server(client)?; + match client.read(&mut self.common.read_buf) { + // Not sure if this can happen or this is actually an error condition.. + Ok(0) => { + log::info!("server closed connection"); + self.stream = None; + } + Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?, + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut + { + self.common.write_to_server(client)?; + return Ok(()); + } + log::warn!("server error: {e:?}"); + if e.kind() == io::ErrorKind::ConnectionReset { + self.stream = None; + } + return Err(e.into()); + } + } + } else { + if self.attempt_connect(false)? { + log::info!("reconnected to server succesfully"); + return self.operation(); + } + std::thread::sleep(self.read_and_idle_delay); + } + + Ok(()) + } +} + #[derive(Debug, PartialEq, Eq)] pub enum ConnectionStatus { Unknown, @@ -90,6 +176,7 @@ pub enum ConnectionStatus { TryingReconnect, } +/// Currently not used, not behaving as expected.. #[allow(dead_code)] pub struct TcpSppClientMio { common: TcpSppClientCommon, @@ -155,11 +242,10 @@ impl TcpSppClientMio { for event in events { if event.token() == Token(0) { if event.is_readable() { - log::warn!("TCP client is readable"); self.read_from_server()?; } + // For some reason, we only get this once.. if event.is_writable() { - log::warn!("TCP client is writable"); self.common.write_to_server(self.client.as_mut().unwrap())?; } } @@ -208,90 +294,193 @@ impl TcpSppClientMio { } } -pub struct TcpSppClientStd { - common: TcpSppClientCommon, - // Optional to allow periodic reconnection attempts on the TCP server. - stream: Option, -} +#[cfg(test)] +mod tests { + use ops_sat_rs::config::EXPERIMENT_APID; + use satrs::spacepackets::{PacketSequenceCtrl, PacketType, SequenceFlags, SpHeader}; + use std::{ + io::Write, + net::{TcpListener, TcpStream}, + thread, + time::Duration, + }; -impl TcpSppClientStd { - pub fn new( - id: ComponentId, - tc_source_tx: mpsc::Sender, - tm_tcp_client_rx: mpsc::Receiver, - valid_ids: &'static [PacketId], - port: u16, - ) -> io::Result { - let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); - let client = match StdTcpStream::connect(server_addr) { - Ok(stream) => { - stream.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?; - Some(stream) - } - Err(e) => { - log::warn!("error connecting to server: {}", e); - None - } - }; - Ok(Self { - common: TcpSppClientCommon { - id, - read_buf: [0; 4096], - tm_tcp_client_rx, - server_addr, - tc_source_tx, - validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), - }, - stream: client, - }) + use super::*; + + const VALID_IDS: &[PacketId] = &[PacketId::new_for_tc(true, EXPERIMENT_APID)]; + + const TEST_TC: SpHeader = SpHeader::new( + PacketId::new(PacketType::Tc, true, EXPERIMENT_APID), + PacketSequenceCtrl::new(SequenceFlags::Unsegmented, 0), + 1, + ); + const TEST_TM: SpHeader = SpHeader::new( + PacketId::new(PacketType::Tm, true, EXPERIMENT_APID), + PacketSequenceCtrl::new(SequenceFlags::Unsegmented, 0), + 1, + ); + + struct TcpServerTestbench { + tcp_server: TcpListener, } - pub fn attempt_connect(&mut self, log_error: bool) -> io::Result { - Ok(match StdTcpStream::connect(self.common.server_addr) { - Ok(stream) => { - stream.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?; - self.stream = Some(stream); - true - } - Err(e) => { - if log_error { - log::warn!("error connecting to server: {}", e); - } - false - } - }) - } - - pub fn operation(&mut self) -> Result<(), ClientError> { - if let Some(client) = &mut self.stream { - match client.read(&mut self.common.read_buf) { - // Not sure if this can happen or this is actually an error condition.. - Ok(0) => { - log::info!("server closed connection"); - self.stream = None; - } - Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?, - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut - { - self.common.write_to_server(client)?; - return Ok(()); - } - log::warn!("server error: {e:?}"); - if e.kind() == io::ErrorKind::ConnectionReset { - self.stream = None; - } - return Err(e.into()); - } - } - } else { - if self.attempt_connect(false)? { - log::info!("reconnected to server succesfully"); - return self.operation(); - } - std::thread::sleep(STOP_CHECK_FREQUENCY); + impl TcpServerTestbench { + fn new() -> Self { + let tcp_server = TcpListener::bind("127.0.0.1:0").unwrap(); + tcp_server + .set_nonblocking(true) + .expect("setting TCP server non-blocking failed"); + Self { tcp_server } } - Ok(()) + fn local_addr(&self) -> SocketAddr { + self.tcp_server.local_addr().unwrap() + } + + fn attempt_connection(&mut self, limit: u32) -> Result { + for _ in 0..limit { + match self.tcp_server.accept() { + Ok((stream, _)) => { + return Ok(stream); + } + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + thread::sleep(Duration::from_millis(10)); + continue; + } + panic!("TCP server accept error: {:?}", e); + } + } + } + Err(()) + } + } + // This test just simplifies that the client properly connects to a server. + #[test] + fn basic_client_test() { + let (tc_source_tx, _tc_source_rx) = mpsc::channel(); + let (_tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); + let mut tcp_server = TcpServerTestbench::new(); + let local_addr = tcp_server.local_addr(); + let jh0 = thread::spawn(move || { + tcp_server + .attempt_connection(3) + .expect("no client connection detected"); + }); + let mut spp_client = TcpSppClientStd::new( + 1, + tc_source_tx, + tm_tcp_client_rx, + VALID_IDS, + Duration::from_millis(30), + local_addr.port(), + ) + .expect("creating TCP SPP client failed"); + spp_client.operation().unwrap(); + jh0.join().unwrap(); + } + + // This test verifies that TM is sent to the server properly. + #[test] + fn basic_client_tm_test() { + let (tc_source_tx, _tc_source_rx) = mpsc::channel(); + let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); + let mut tcp_server = TcpServerTestbench::new(); + let local_addr = tcp_server.local_addr(); + let mut buf: [u8; 7] = [0; 7]; + TEST_TM + .write_to_be_bytes(&mut buf) + .expect("writing TM failed"); + let jh0 = thread::spawn(move || { + let mut read_expected_data = false; + let mut read_buf: [u8; 64] = [0; 64]; + let mut stream = tcp_server + .attempt_connection(3) + .expect("no client connection detected"); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting read timeout failed"); + for _ in 0..5 { + match stream.read(&mut read_buf) { + Ok(0) => {} + Ok(len) => { + assert_eq!(&buf, &read_buf[0..len]); + read_expected_data = true; + break; + } + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + continue; + } + panic!("TCP server read error: {:?}", e); + } + } + if read_expected_data { + break; + } + } + if !read_expected_data { + panic!("did not receive expected data"); + } + }); + tm_tcp_client_tx + .send(PacketAsVec::new(0, buf.to_vec())) + .unwrap(); + let mut spp_client = TcpSppClientStd::new( + 1, + tc_source_tx, + tm_tcp_client_rx, + VALID_IDS, + Duration::from_millis(30), + local_addr.port(), + ) + .expect("creating TCP SPP client failed"); + spp_client.operation().unwrap(); + + jh0.join().unwrap(); + } + + // Test that the client can read telecommands from the server. + #[test] + fn basic_client_tc_test() { + let (tc_source_tx, tc_source_rx) = mpsc::channel(); + let (_tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); + let mut tcp_server = TcpServerTestbench::new(); + let local_addr = tcp_server.local_addr(); + let mut buf: [u8; 8] = [0; 8]; + TEST_TC + .write_to_be_bytes(&mut buf) + .expect("writing TM failed"); + let jh0 = thread::spawn(move || { + let mut stream = tcp_server + .attempt_connection(3) + .expect("no client connection detected"); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting read timeout failed"); + stream.write_all(&buf).expect("writing TC failed"); + }); + + let mut spp_client = TcpSppClientStd::new( + 1, + tc_source_tx, + tm_tcp_client_rx, + VALID_IDS, + Duration::from_millis(30), + local_addr.port(), + ) + .expect("creating TCP SPP client failed"); + let mut received_packet = false; + (0..3).for_each(|_| { + spp_client.operation().unwrap(); + if let Ok(packet) = tc_source_rx.try_recv() { + assert_eq!(packet.packet, buf.to_vec()); + received_packet = true; + } + }); + if !received_packet { + panic!("did not receive expected data"); + } + jh0.join().unwrap(); } } diff --git a/src/main.rs b/src/main.rs index 36e4fc2..ee9b48d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ use log::info; use ops_sat_rs::config::{ cfg_file::create_app_config, components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, - tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, + tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY}, VALID_PACKET_ID_LIST, }; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; @@ -171,6 +171,7 @@ fn main() { tc_source_tx, tm_tcp_client_rx, VALID_PACKET_ID_LIST, + STOP_CHECK_FREQUENCY, app_cfg.tcp_spp_server_port, ) .expect("creating TCP SPP client failed"); diff --git a/src/pus/action.rs b/src/pus/action.rs index cd2472f..62166e5 100644 --- a/src/pus/action.rs +++ b/src/pus/action.rs @@ -275,8 +275,8 @@ mod tests { use satrs::pus::test_util::{ TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1, }; - use satrs::pus::verification; use satrs::pus::verification::test_util::TestVerificationReporter; + use satrs::pus::{verification, TcInMemory}; use satrs::request::MessageMetadata; use satrs::ComponentId; use satrs::{ @@ -429,7 +429,13 @@ mod tests { .verif_reporter() .check_next_is_acceptance_success(id, accepted_token.request_id()); self.pus_packet_tx - .send(EcssTcAndToken::new(tc.to_vec().unwrap(), accepted_token)) + .send(EcssTcAndToken::new( + TcInMemory::Vec(PacketAsVec::new( + self.service.service_helper.id(), + tc.to_vec().unwrap(), + )), + accepted_token, + )) .unwrap(); } }