Client tests #12

Merged
muellerr merged 6 commits from add-tcp-client-unittests into main 2024-04-22 15:41:57 +02:00
5 changed files with 495 additions and 103 deletions

38
Cargo.lock generated
View File

@ -138,9 +138,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.94" version = "1.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
@ -251,7 +251,7 @@ checksum = "d150dea618e920167e5973d70ae6ece4385b7164e0d799fe7c122dd0a5d912ad"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.59", "syn 2.0.60",
] ]
[[package]] [[package]]
@ -465,7 +465,7 @@ dependencies = [
"proc-macro-crate", "proc-macro-crate",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.59", "syn 2.0.60",
] ]
[[package]] [[package]]
@ -640,7 +640,7 @@ checksum = "474b583ffa521fbc362ae71da11c9fe29a6b60af47744e067550b6eef4f60d43"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.59", "syn 2.0.60",
] ]
[[package]] [[package]]
@ -681,7 +681,7 @@ checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.59", "syn 2.0.60",
] ]
[[package]] [[package]]
@ -731,9 +731,9 @@ dependencies = [
[[package]] [[package]]
name = "spacepackets" name = "spacepackets"
version = "0.11.0" version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08e05169d37db5d0f8527f4abcacb72f6d178654879892c0e37ab5f04df85e3e" checksum = "fa9f4d7df5fa3bc25ecfc95f1f612fc3d16c566df538d3d3c82db0e523096216"
dependencies = [ dependencies = [
"chrono", "chrono",
"crc", "crc",
@ -764,7 +764,7 @@ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"rustversion", "rustversion",
"syn 2.0.59", "syn 2.0.60",
] ]
[[package]] [[package]]
@ -780,9 +780,9 @@ dependencies = [
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.59" version = "2.0.60"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a6531ffc7b071655e4ce2e04bd464c4830bb585a61cabb96cf808f05172615a" checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -791,22 +791,22 @@ dependencies = [
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.58" version = "1.0.59"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" checksum = "f0126ad08bff79f29fc3ae6a55cc72352056dfff61e3ff8bb7129476d44b23aa"
dependencies = [ dependencies = [
"thiserror-impl", "thiserror-impl",
] ]
[[package]] [[package]]
name = "thiserror-impl" name = "thiserror-impl"
version = "1.0.58" version = "1.0.59"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" checksum = "d1cd413b5d558b4c5bf3680e324a6fa5014e7b7c067a51e69dbdf47eb7148b66"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.59", "syn 2.0.60",
] ]
[[package]] [[package]]
@ -899,7 +899,7 @@ dependencies = [
"once_cell", "once_cell",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.59", "syn 2.0.60",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@ -921,7 +921,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.59", "syn 2.0.60",
"wasm-bindgen-backend", "wasm-bindgen-backend",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@ -1116,5 +1116,5 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.59", "syn 2.0.60",
] ]

View File

@ -11,7 +11,7 @@ You can also find some more general documentation about OPS-SAT there.
[podman](https://podman.io/) installed [podman](https://podman.io/) installed
- [`cross`](https://github.com/cross-rs/cross) package installed - [`cross`](https://github.com/cross-rs/cross) package installed
## Build ## Build for Target Hardware
You might need to set the [`CROSS_CONTAINER_ENGINE`](https://github.com/cross-rs/cross/wiki/FAQ#explicitly-choose-the-container-engine) You might need to set the [`CROSS_CONTAINER_ENGINE`](https://github.com/cross-rs/cross/wiki/FAQ#explicitly-choose-the-container-engine)
and [`CROSS_ROOTLESS_CONTAINER_ENGINE`](https://github.com/cross-rs/cross/blob/main/docs/environment_variables.md#configuring-cross-with-environment-variables) and [`CROSS_ROOTLESS_CONTAINER_ENGINE`](https://github.com/cross-rs/cross/blob/main/docs/environment_variables.md#configuring-cross-with-environment-variables)
@ -29,6 +29,23 @@ cross build
cross build --release cross build --release
``` ```
## Build for Host
The software was designed to be runnable and testable on a host computer.
You can use the regular cargo workflow for this.
### Running
```sh
cargo run
```
### Testing
```sh
cargo test
```
## Commanding Infrastructure ## Commanding Infrastructure
Commanding of the `ops-sat-rs` application is possible by different means. Commanding of the `ops-sat-rs` application is possible by different means.

View File

@ -2,6 +2,7 @@ use std::io::{self, Read};
use std::net::TcpStream as StdTcpStream; use std::net::TcpStream as StdTcpStream;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::mpsc; use std::sync::mpsc;
use std::time::Duration;
use mio::net::TcpStream as MioTcpStream; use mio::net::TcpStream as MioTcpStream;
use mio::{Events, Interest, Poll, Token}; use mio::{Events, Interest, Poll, Token};
@ -24,6 +25,12 @@ pub enum ClientError {
Io(#[from] io::Error), Io(#[from] io::Error),
} }
#[derive(Debug)]
pub enum ClientResult {
Ok,
ConnectionLost,
}
#[allow(dead_code)] #[allow(dead_code)]
pub struct TcpSppClientCommon { pub struct TcpSppClientCommon {
id: ComponentId, id: ComponentId,
@ -72,6 +79,7 @@ impl TcpSppClientCommon {
Err(e) => match e { Err(e) => match e {
mpsc::TryRecvError::Empty => break, mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => { mpsc::TryRecvError::Disconnected => {
println!("god fuckikng damn it");
log::error!("TM sender to TCP client has disconnected"); log::error!("TM sender to TCP client has disconnected");
break; break;
} }
@ -82,6 +90,98 @@ impl TcpSppClientCommon {
} }
} }
pub struct TcpSppClientStd {
common: TcpSppClientCommon,
read_and_idle_delay: Duration,
// Optional to allow periodic reconnection attempts on the TCP server.
stream: Option<StdTcpStream>,
}
impl TcpSppClientStd {
pub fn new(
id: ComponentId,
tc_source_tx: mpsc::Sender<PacketAsVec>,
tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>,
valid_ids: &'static [PacketId],
read_timeout: Duration,
port: u16,
) -> io::Result<Self> {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let mut client = Self {
common: TcpSppClientCommon {
id,
read_buf: [0; 4096],
tm_tcp_client_rx,
server_addr,
tc_source_tx,
validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()),
},
read_and_idle_delay: read_timeout,
stream: None,
};
client.attempt_connect(true)?;
Ok(client)
}
pub fn attempt_connect(&mut self, log_error: bool) -> io::Result<bool> {
Ok(match StdTcpStream::connect(self.common.server_addr) {
Ok(stream) => {
stream.set_read_timeout(Some(self.read_and_idle_delay))?;
self.stream = Some(stream);
true
}
Err(e) => {
if log_error {
log::warn!("error connecting to server: {}", e);
}
false
}
})
}
#[allow(dead_code)]
pub fn connected(&self) -> bool {
self.stream.is_some()
}
pub fn operation(&mut self) -> Result<ClientResult, ClientError> {
if let Some(client) = &mut self.stream {
// Write TM first before blocking on the read call.
self.common.write_to_server(client)?;
match client.read(&mut self.common.read_buf) {
// Not sure if this can happen or this is actually an error condition..
Ok(0) => {
log::info!("server closed connection");
self.stream = None;
return Ok(ClientResult::ConnectionLost);
}
Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?,
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut
{
self.common.write_to_server(client)?;
return Ok(ClientResult::ConnectionLost);
}
log::warn!("server error: {e:?}");
if e.kind() == io::ErrorKind::ConnectionReset {
self.stream = None;
return Ok(ClientResult::ConnectionLost);
}
return Err(e.into());
}
}
} else {
if self.attempt_connect(false)? {
log::info!("reconnected to server succesfully");
return self.operation();
}
std::thread::sleep(self.read_and_idle_delay);
}
Ok(ClientResult::Ok)
}
}
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum ConnectionStatus { pub enum ConnectionStatus {
Unknown, Unknown,
@ -90,6 +190,7 @@ pub enum ConnectionStatus {
TryingReconnect, TryingReconnect,
} }
/// Currently not used, not behaving as expected..
#[allow(dead_code)] #[allow(dead_code)]
pub struct TcpSppClientMio { pub struct TcpSppClientMio {
common: TcpSppClientCommon, common: TcpSppClientCommon,
@ -155,11 +256,10 @@ impl TcpSppClientMio {
for event in events { for event in events {
if event.token() == Token(0) { if event.token() == Token(0) {
if event.is_readable() { if event.is_readable() {
log::warn!("TCP client is readable");
self.read_from_server()?; self.read_from_server()?;
} }
// For some reason, we only get this once..
if event.is_writable() { if event.is_writable() {
log::warn!("TCP client is writable");
self.common.write_to_server(self.client.as_mut().unwrap())?; self.common.write_to_server(self.client.as_mut().unwrap())?;
} }
} }
@ -208,90 +308,358 @@ impl TcpSppClientMio {
} }
} }
pub struct TcpSppClientStd { #[cfg(test)]
common: TcpSppClientCommon, mod tests {
// Optional to allow periodic reconnection attempts on the TCP server. use ops_sat_rs::config::EXPERIMENT_APID;
stream: Option<StdTcpStream>, use satrs::spacepackets::{PacketSequenceCtrl, PacketType, SequenceFlags, SpHeader};
use std::{
io::Write,
net::{TcpListener, TcpStream},
sync::{atomic::AtomicBool, Arc},
thread,
time::Duration,
};
use super::*;
const VALID_IDS: &[PacketId] = &[PacketId::new_for_tc(true, EXPERIMENT_APID)];
const TEST_TC: SpHeader = SpHeader::new(
PacketId::new(PacketType::Tc, true, EXPERIMENT_APID),
PacketSequenceCtrl::new(SequenceFlags::Unsegmented, 0),
1,
);
const TEST_TM: SpHeader = SpHeader::new(
PacketId::new(PacketType::Tm, true, EXPERIMENT_APID),
PacketSequenceCtrl::new(SequenceFlags::Unsegmented, 0),
1,
);
fn init() {
let _ = env_logger::builder().is_test(true).try_init();
} }
impl TcpSppClientStd { struct TcpServerTestbench {
pub fn new( tcp_server: TcpListener,
id: ComponentId, }
tc_source_tx: mpsc::Sender<PacketAsVec>,
tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>, impl TcpServerTestbench {
valid_ids: &'static [PacketId], fn new(port: u16) -> Self {
port: u16, let tcp_server =
) -> io::Result<Self> { TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)).unwrap();
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); tcp_server
let client = match StdTcpStream::connect(server_addr) { .set_nonblocking(true)
Ok(stream) => { .expect("setting TCP server non-blocking failed");
stream.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?; Self { tcp_server }
Some(stream) }
fn local_addr(&self) -> SocketAddr {
self.tcp_server.local_addr().unwrap()
}
fn check_for_connections(&mut self, limit: u32) -> Result<TcpStream, ()> {
for _ in 0..limit {
match self.tcp_server.accept() {
Ok((stream, _)) => {
return Ok(stream);
} }
Err(e) => { Err(e) => {
log::warn!("error connecting to server: {}", e); if e.kind() == io::ErrorKind::WouldBlock {
None thread::sleep(Duration::from_millis(10));
continue;
}
panic!("TCP server accept error: {:?}", e);
}
}
}
Err(())
}
fn try_reading_one_packet(
&mut self,
stream: &mut TcpStream,
limit: u32,
read_buf: &mut [u8],
) -> usize {
let mut read_data = 0;
for _ in 0..limit {
match stream.read(read_buf) {
Ok(0) => {}
Ok(len) => {
// assert_eq!(&tm_buf, &read_buf[0..len]);
// read_bufd_expected_data = true;
read_data = len;
break;
}
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
continue;
}
panic!("TCP server read error: {:?}", e);
}
}
if read_data > 0 {
break;
}
}
read_data
}
}
// This test just simplifies that the client properly connects to a server.
#[test]
fn basic_client_test() {
let (tc_source_tx, _tc_source_rx) = mpsc::channel();
let (_tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let mut tcp_server = TcpServerTestbench::new(0);
let local_addr = tcp_server.local_addr();
let jh0 = thread::spawn(move || {
tcp_server
.check_for_connections(3)
.expect("no client connection detected");
});
let mut spp_client = TcpSppClientStd::new(
1,
tc_source_tx,
tm_tcp_client_rx,
VALID_IDS,
Duration::from_millis(30),
local_addr.port(),
)
.expect("creating TCP SPP client failed");
spp_client.operation().unwrap();
jh0.join().unwrap();
}
// This test verifies that TM is sent to the server properly.
#[test]
fn basic_client_tm_test() {
let (tc_source_tx, _tc_source_rx) = mpsc::channel();
let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let mut tcp_server = TcpServerTestbench::new(0);
let local_addr = tcp_server.local_addr();
let mut buf: [u8; 7] = [0; 7];
TEST_TM
.write_to_be_bytes(&mut buf)
.expect("writing TM failed");
let jh0 = thread::spawn(move || {
let mut read_buf: [u8; 64] = [0; 64];
let mut stream = tcp_server
.check_for_connections(3)
.expect("no client connection detected");
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("setting read timeout failed");
let read_bytes = tcp_server.try_reading_one_packet(&mut stream, 5, &mut read_buf);
if read_bytes == 0 {
panic!("did not receive expected data");
} else {
assert_eq!(&buf, &read_buf[0..read_bytes]);
}
});
tm_tcp_client_tx
.send(PacketAsVec::new(0, buf.to_vec()))
.unwrap();
let mut spp_client = TcpSppClientStd::new(
1,
tc_source_tx,
tm_tcp_client_rx,
VALID_IDS,
Duration::from_millis(30),
local_addr.port(),
)
.expect("creating TCP SPP client failed");
spp_client.operation().unwrap();
jh0.join().unwrap();
}
// Test that the client can read telecommands from the server.
#[test]
fn basic_client_tc_test() {
let (tc_source_tx, tc_source_rx) = mpsc::channel();
let (_tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let mut tcp_server = TcpServerTestbench::new(0);
let local_addr = tcp_server.local_addr();
let mut buf: [u8; 8] = [0; 8];
TEST_TC
.write_to_be_bytes(&mut buf)
.expect("writing TM failed");
let jh0 = thread::spawn(move || {
let mut stream = tcp_server
.check_for_connections(3)
.expect("no client connection detected");
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("setting read timeout failed");
stream.write_all(&buf).expect("writing TC failed");
});
let mut spp_client = TcpSppClientStd::new(
1,
tc_source_tx,
tm_tcp_client_rx,
VALID_IDS,
Duration::from_millis(30),
local_addr.port(),
)
.expect("creating TCP SPP client failed");
assert!(spp_client.connected());
let mut received_packet = false;
(0..3).for_each(|_| {
spp_client.operation().unwrap();
if let Ok(packet) = tc_source_rx.try_recv() {
assert_eq!(packet.packet, buf.to_vec());
received_packet = true;
}
});
if !received_packet {
panic!("did not receive expected data");
}
jh0.join().unwrap();
}
// Test that the client can both read telecommands from the server and send back
// telemetry to the server.
#[test]
fn basic_client_tmtc_test() {
let (tc_source_tx, tc_source_rx) = mpsc::channel();
let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let mut tcp_server = TcpServerTestbench::new(0);
let local_addr = tcp_server.local_addr();
let mut tc_buf: [u8; 8] = [0; 8];
let mut tm_buf: [u8; 8] = [0; 8];
TEST_TC
.write_to_be_bytes(&mut tc_buf)
.expect("writing TM failed");
TEST_TM
.write_to_be_bytes(&mut tm_buf)
.expect("writing TM failed");
let jh0 = thread::spawn(move || {
let mut read_buf: [u8; 64] = [0; 64];
let mut stream = tcp_server
.check_for_connections(3)
.expect("no client connection detected");
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("setting read timeout failed");
stream.write_all(&tc_buf).expect("writing TC failed");
let read_bytes = tcp_server.try_reading_one_packet(&mut stream, 5, &mut read_buf);
if read_bytes == 0 {
panic!("did not receive expected data");
} else {
assert_eq!(&tm_buf, &read_buf[0..read_bytes]);
}
});
tm_tcp_client_tx
.send(PacketAsVec::new(0, tm_buf.to_vec()))
.unwrap();
let mut spp_client = TcpSppClientStd::new(
1,
tc_source_tx,
tm_tcp_client_rx,
VALID_IDS,
Duration::from_millis(30),
local_addr.port(),
)
.expect("creating TCP SPP client failed");
assert!(spp_client.connected());
let mut received_packet = false;
(0..3).for_each(|_| {
spp_client.operation().unwrap();
if let Ok(packet) = tc_source_rx.try_recv() {
assert_eq!(packet.packet, tc_buf.to_vec());
received_packet = true;
}
});
if !received_packet {
panic!("did not receive expected data");
}
jh0.join().unwrap();
}
#[test]
fn test_broken_connection() {
init();
let (tc_source_tx, _tc_source_rx) = mpsc::channel();
let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let mut tcp_server = TcpServerTestbench::new(0);
let local_port = tcp_server.local_addr().port();
let drop_signal = Arc::new(AtomicBool::new(false));
let drop_signal_0 = drop_signal.clone();
let mut tc_buf: [u8; 8] = [0; 8];
let mut tm_buf: [u8; 8] = [0; 8];
TEST_TC
.write_to_be_bytes(&mut tc_buf)
.expect("writing TM failed");
TEST_TM
.write_to_be_bytes(&mut tm_buf)
.expect("writing TM failed");
let mut jh0 = thread::spawn(move || {
tcp_server
.check_for_connections(3)
.expect("no client connection detected");
drop_signal_0.store(true, std::sync::atomic::Ordering::Relaxed);
});
let mut spp_client = TcpSppClientStd::new(
1,
tc_source_tx,
tm_tcp_client_rx,
VALID_IDS,
Duration::from_millis(30),
local_port,
)
.expect("creating TCP SPP client failed");
while !drop_signal.load(std::sync::atomic::Ordering::Relaxed) {
std::thread::sleep(Duration::from_millis(100));
}
tm_tcp_client_tx
.send(PacketAsVec::new(0, tm_buf.to_vec()))
.unwrap();
match spp_client.operation() {
Ok(ClientResult::ConnectionLost) => (),
Ok(ClientResult::Ok) => {
panic!("expected operation error");
}
Err(ClientError::Io(e)) => {
println!("io error: {:?}", e);
if e.kind() != io::ErrorKind::ConnectionReset
&& e.kind() != io::ErrorKind::ConnectionAborted
{
panic!("expected some disconnet error");
}
}
_ => {
panic!("unexpected error")
} }
}; };
Ok(Self { assert!(!spp_client.connected());
common: TcpSppClientCommon { jh0.join().unwrap();
id, // spp_client.operation();
read_buf: [0; 4096], tcp_server = TcpServerTestbench::new(local_port);
tm_tcp_client_rx, tm_tcp_client_tx
server_addr, .send(PacketAsVec::new(0, tm_buf.to_vec()))
tc_source_tx, .unwrap();
validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), jh0 = thread::spawn(move || {
}, let mut stream = tcp_server
stream: client, .check_for_connections(3)
}) .expect("no client connection detected");
} let mut read_buf: [u8; 64] = [0; 64];
let read_bytes = tcp_server.try_reading_one_packet(&mut stream, 5, &mut read_buf);
pub fn attempt_connect(&mut self, log_error: bool) -> io::Result<bool> { if read_bytes == 0 {
Ok(match StdTcpStream::connect(self.common.server_addr) { panic!("did not receive expected data");
Ok(stream) => {
stream.set_read_timeout(Some(STOP_CHECK_FREQUENCY))?;
self.stream = Some(stream);
true
}
Err(e) => {
if log_error {
log::warn!("error connecting to server: {}", e);
}
false
}
})
}
pub fn operation(&mut self) -> Result<(), ClientError> {
if let Some(client) = &mut self.stream {
match client.read(&mut self.common.read_buf) {
// Not sure if this can happen or this is actually an error condition..
Ok(0) => {
log::info!("server closed connection");
self.stream = None;
}
Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?,
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut
{
self.common.write_to_server(client)?;
return Ok(());
}
log::warn!("server error: {e:?}");
if e.kind() == io::ErrorKind::ConnectionReset {
self.stream = None;
}
return Err(e.into());
}
}
} else { } else {
if self.attempt_connect(false)? { assert_eq!(&tm_buf, &read_buf[0..read_bytes]);
log::info!("reconnected to server succesfully");
return self.operation();
} }
std::thread::sleep(STOP_CHECK_FREQUENCY); });
} let result = spp_client.operation();
println!("{:?}", result);
Ok(()) assert!(!spp_client.connected());
assert!(result.is_ok());
jh0.join().unwrap();
} }
} }

View File

@ -9,7 +9,7 @@ use log::info;
use ops_sat_rs::config::{ use ops_sat_rs::config::{
cfg_file::create_app_config, cfg_file::create_app_config,
components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER},
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY},
VALID_PACKET_ID_LIST, VALID_PACKET_ID_LIST,
}; };
use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
@ -171,6 +171,7 @@ fn main() {
tc_source_tx, tc_source_tx,
tm_tcp_client_rx, tm_tcp_client_rx,
VALID_PACKET_ID_LIST, VALID_PACKET_ID_LIST,
STOP_CHECK_FREQUENCY,
app_cfg.tcp_spp_server_port, app_cfg.tcp_spp_server_port,
) )
.expect("creating TCP SPP client failed"); .expect("creating TCP SPP client failed");

View File

@ -275,8 +275,8 @@ mod tests {
use satrs::pus::test_util::{ use satrs::pus::test_util::{
TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1, TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1,
}; };
use satrs::pus::verification;
use satrs::pus::verification::test_util::TestVerificationReporter; use satrs::pus::verification::test_util::TestVerificationReporter;
use satrs::pus::{verification, TcInMemory};
use satrs::request::MessageMetadata; use satrs::request::MessageMetadata;
use satrs::ComponentId; use satrs::ComponentId;
use satrs::{ use satrs::{
@ -429,7 +429,13 @@ mod tests {
.verif_reporter() .verif_reporter()
.check_next_is_acceptance_success(id, accepted_token.request_id()); .check_next_is_acceptance_success(id, accepted_token.request_id());
self.pus_packet_tx self.pus_packet_tx
.send(EcssTcAndToken::new(tc.to_vec().unwrap(), accepted_token)) .send(EcssTcAndToken::new(
TcInMemory::Vec(PacketAsVec::new(
self.service.service_helper.id(),
tc.to_vec().unwrap(),
)),
accepted_token,
))
.unwrap(); .unwrap();
} }
} }