Client tests #12

Merged
muellerr merged 6 commits from add-tcp-client-unittests into main 2024-04-22 15:41:57 +02:00
3 changed files with 281 additions and 85 deletions
Showing only changes of commit ee29961f62 - Show all commits

View File

@ -2,6 +2,7 @@ use std::io::{self, Read};
use std::net::TcpStream as StdTcpStream;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::mpsc;
use std::time::Duration;
use mio::net::TcpStream as MioTcpStream;
use mio::{Events, Interest, Poll, Token};
@ -82,6 +83,91 @@ impl TcpSppClientCommon {
}
}
pub struct TcpSppClientStd {
common: TcpSppClientCommon,
read_and_idle_delay: Duration,
// Optional to allow periodic reconnection attempts on the TCP server.
stream: Option<StdTcpStream>,
}
impl TcpSppClientStd {
pub fn new(
id: ComponentId,
tc_source_tx: mpsc::Sender<PacketAsVec>,
tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>,
valid_ids: &'static [PacketId],
read_timeout: Duration,
port: u16,
) -> io::Result<Self> {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let mut client = Self {
common: TcpSppClientCommon {
id,
read_buf: [0; 4096],
tm_tcp_client_rx,
server_addr,
tc_source_tx,
validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()),
},
read_and_idle_delay: read_timeout,
stream: None,
};
client.attempt_connect(true)?;
Ok(client)
}
pub fn attempt_connect(&mut self, log_error: bool) -> io::Result<bool> {
Ok(match StdTcpStream::connect(self.common.server_addr) {
Ok(stream) => {
stream.set_read_timeout(Some(self.read_and_idle_delay))?;
self.stream = Some(stream);
true
}
Err(e) => {
if log_error {
log::warn!("error connecting to server: {}", e);
}
false
}
})
}
pub fn operation(&mut self) -> Result<(), ClientError> {
if let Some(client) = &mut self.stream {
// Write TM first before blocking on the read call.
self.common.write_to_server(client)?;
match client.read(&mut self.common.read_buf) {
// Not sure if this can happen or this is actually an error condition..
Ok(0) => {
log::info!("server closed connection");
self.stream = None;
}
Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?,
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut
{
self.common.write_to_server(client)?;
return Ok(());
}
log::warn!("server error: {e:?}");
if e.kind() == io::ErrorKind::ConnectionReset {
self.stream = None;
}
return Err(e.into());
}
}
} else {
if self.attempt_connect(false)? {
log::info!("reconnected to server succesfully");
return self.operation();
}
std::thread::sleep(self.read_and_idle_delay);
}
Ok(())
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum ConnectionStatus {
Unknown,
@ -90,6 +176,7 @@ pub enum ConnectionStatus {
TryingReconnect,
}
/// Currently not used, not behaving as expected..
#[allow(dead_code)]
pub struct TcpSppClientMio {
common: TcpSppClientCommon,
@ -155,11 +242,10 @@ impl TcpSppClientMio {
for event in events {
if event.token() == Token(0) {
if event.is_readable() {
log::warn!("TCP client is readable");
self.read_from_server()?;
}
// For some reason, we only get this once..
if event.is_writable() {
log::warn!("TCP client is writable");
self.common.write_to_server(self.client.as_mut().unwrap())?;
}
}
@ -208,90 +294,193 @@ impl TcpSppClientMio {
}
}
pub struct TcpSppClientStd {
common: TcpSppClientCommon,
// Optional to allow periodic reconnection attempts on the TCP server.
stream: Option<StdTcpStream>,
}
#[cfg(test)]
mod tests {
use ops_sat_rs::config::EXPERIMENT_APID;
use satrs::spacepackets::{PacketSequenceCtrl, PacketType, SequenceFlags, SpHeader};
use std::{
io::Write,
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
impl TcpSppClientStd {
pub fn new(
id: ComponentId,
tc_source_tx: mpsc::Sender<PacketAsVec>,
tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>,
valid_ids: &'static [PacketId],
port: u16,
) -> io::Result<Self> {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let client = match StdTcpStream::connect(server_addr) {
Ok(stream) => {
stream.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?;
Some(stream)
}
Err(e) => {
log::warn!("error connecting to server: {}", e);
None
}
};
Ok(Self {
common: TcpSppClientCommon {
id,
read_buf: [0; 4096],
tm_tcp_client_rx,
server_addr,
tc_source_tx,
validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()),
},
stream: client,
})
use super::*;
const VALID_IDS: &[PacketId] = &[PacketId::new_for_tc(true, EXPERIMENT_APID)];
const TEST_TC: SpHeader = SpHeader::new(
PacketId::new(PacketType::Tc, true, EXPERIMENT_APID),
PacketSequenceCtrl::new(SequenceFlags::Unsegmented, 0),
1,
);
const TEST_TM: SpHeader = SpHeader::new(
PacketId::new(PacketType::Tm, true, EXPERIMENT_APID),
PacketSequenceCtrl::new(SequenceFlags::Unsegmented, 0),
1,
);
struct TcpServerTestbench {
tcp_server: TcpListener,
}
pub fn attempt_connect(&mut self, log_error: bool) -> io::Result<bool> {
Ok(match StdTcpStream::connect(self.common.server_addr) {
Ok(stream) => {
stream.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?;
self.stream = Some(stream);
true
}
Err(e) => {
if log_error {
log::warn!("error connecting to server: {}", e);
}
false
}
})
}
pub fn operation(&mut self) -> Result<(), ClientError> {
if let Some(client) = &mut self.stream {
match client.read(&mut self.common.read_buf) {
// Not sure if this can happen or this is actually an error condition..
Ok(0) => {
log::info!("server closed connection");
self.stream = None;
}
Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?,
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut
{
self.common.write_to_server(client)?;
return Ok(());
}
log::warn!("server error: {e:?}");
if e.kind() == io::ErrorKind::ConnectionReset {
self.stream = None;
}
return Err(e.into());
}
}
} else {
if self.attempt_connect(false)? {
log::info!("reconnected to server succesfully");
return self.operation();
}
std::thread::sleep(STOP_CHECK_FREQUENCY);
impl TcpServerTestbench {
fn new() -> Self {
let tcp_server = TcpListener::bind("127.0.0.1:0").unwrap();
tcp_server
.set_nonblocking(true)
.expect("setting TCP server non-blocking failed");
Self { tcp_server }
}
Ok(())
fn local_addr(&self) -> SocketAddr {
self.tcp_server.local_addr().unwrap()
}
fn attempt_connection(&mut self, limit: u32) -> Result<TcpStream, ()> {
for _ in 0..limit {
match self.tcp_server.accept() {
Ok((stream, _)) => {
return Ok(stream);
}
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
thread::sleep(Duration::from_millis(10));
continue;
}
panic!("TCP server accept error: {:?}", e);
}
}
}
Err(())
}
}
// This test just simplifies that the client properly connects to a server.
#[test]
fn basic_client_test() {
let (tc_source_tx, _tc_source_rx) = mpsc::channel();
let (_tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let mut tcp_server = TcpServerTestbench::new();
let local_addr = tcp_server.local_addr();
let jh0 = thread::spawn(move || {
tcp_server
.attempt_connection(3)
.expect("no client connection detected");
});
let mut spp_client = TcpSppClientStd::new(
1,
tc_source_tx,
tm_tcp_client_rx,
VALID_IDS,
Duration::from_millis(30),
local_addr.port(),
)
.expect("creating TCP SPP client failed");
spp_client.operation().unwrap();
jh0.join().unwrap();
}
// This test verifies that TM is sent to the server properly.
#[test]
fn basic_client_tm_test() {
let (tc_source_tx, _tc_source_rx) = mpsc::channel();
let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let mut tcp_server = TcpServerTestbench::new();
let local_addr = tcp_server.local_addr();
let mut buf: [u8; 7] = [0; 7];
TEST_TM
.write_to_be_bytes(&mut buf)
.expect("writing TM failed");
let jh0 = thread::spawn(move || {
let mut read_expected_data = false;
let mut read_buf: [u8; 64] = [0; 64];
let mut stream = tcp_server
.attempt_connection(3)
.expect("no client connection detected");
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("setting read timeout failed");
for _ in 0..5 {
match stream.read(&mut read_buf) {
Ok(0) => {}
Ok(len) => {
assert_eq!(&buf, &read_buf[0..len]);
read_expected_data = true;
break;
}
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
continue;
}
panic!("TCP server read error: {:?}", e);
}
}
if read_expected_data {
break;
}
}
if !read_expected_data {
panic!("did not receive expected data");
}
});
tm_tcp_client_tx
.send(PacketAsVec::new(0, buf.to_vec()))
.unwrap();
let mut spp_client = TcpSppClientStd::new(
1,
tc_source_tx,
tm_tcp_client_rx,
VALID_IDS,
Duration::from_millis(30),
local_addr.port(),
)
.expect("creating TCP SPP client failed");
spp_client.operation().unwrap();
jh0.join().unwrap();
}
// Test that the client can read telecommands from the server.
#[test]
fn basic_client_tc_test() {
let (tc_source_tx, tc_source_rx) = mpsc::channel();
let (_tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let mut tcp_server = TcpServerTestbench::new();
let local_addr = tcp_server.local_addr();
let mut buf: [u8; 8] = [0; 8];
TEST_TC
.write_to_be_bytes(&mut buf)
.expect("writing TM failed");
let jh0 = thread::spawn(move || {
let mut stream = tcp_server
.attempt_connection(3)
.expect("no client connection detected");
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("setting read timeout failed");
stream.write_all(&buf).expect("writing TC failed");
});
let mut spp_client = TcpSppClientStd::new(
1,
tc_source_tx,
tm_tcp_client_rx,
VALID_IDS,
Duration::from_millis(30),
local_addr.port(),
)
.expect("creating TCP SPP client failed");
let mut received_packet = false;
(0..3).for_each(|_| {
spp_client.operation().unwrap();
if let Ok(packet) = tc_source_rx.try_recv() {
assert_eq!(packet.packet, buf.to_vec());
received_packet = true;
}
});
if !received_packet {
panic!("did not receive expected data");
}
jh0.join().unwrap();
}
}

View File

@ -9,7 +9,7 @@ use log::info;
use ops_sat_rs::config::{
cfg_file::create_app_config,
components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER},
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK},
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY},
VALID_PACKET_ID_LIST,
};
use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
@ -171,6 +171,7 @@ fn main() {
tc_source_tx,
tm_tcp_client_rx,
VALID_PACKET_ID_LIST,
STOP_CHECK_FREQUENCY,
app_cfg.tcp_spp_server_port,
)
.expect("creating TCP SPP client failed");

View File

@ -275,8 +275,8 @@ mod tests {
use satrs::pus::test_util::{
TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1,
};
use satrs::pus::verification;
use satrs::pus::verification::test_util::TestVerificationReporter;
use satrs::pus::{verification, TcInMemory};
use satrs::request::MessageMetadata;
use satrs::ComponentId;
use satrs::{
@ -429,7 +429,13 @@ mod tests {
.verif_reporter()
.check_next_is_acceptance_success(id, accepted_token.request_id());
self.pus_packet_tx
.send(EcssTcAndToken::new(tc.to_vec().unwrap(), accepted_token))
.send(EcssTcAndToken::new(
TcInMemory::Vec(PacketAsVec::new(
self.service.service_helper.id(),
tc.to_vec().unwrap(),
)),
accepted_token,
))
.unwrap();
}
}