From 3cc9dd3c48b061146c6d5e181eb112ec13740a8e Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 9 Apr 2024 17:21:43 +0200 Subject: [PATCH] introduce stop signal handling --- satrs-example/src/tcp.rs | 8 +++- satrs/CHANGELOG.md | 1 + satrs/src/hal/std/tcp_cobs_server.rs | 44 ++++++++++++++++++-- satrs/src/hal/std/tcp_server.rs | 15 +++++++ satrs/src/hal/std/tcp_spacepackets_server.rs | 8 ++++ satrs/tests/pus_verification.rs | 2 +- satrs/tests/tcp_servers.rs | 2 + 7 files changed, 75 insertions(+), 5 deletions(-) diff --git a/satrs-example/src/tcp.rs b/satrs-example/src/tcp.rs index 04bb136..7abcf5b 100644 --- a/satrs-example/src/tcp.rs +++ b/satrs-example/src/tcp.rs @@ -104,7 +104,13 @@ impl< packet_id_lookup: HashSet, ) -> Result { Ok(Self { - server: TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)?, + server: TcpSpacepacketsServer::new( + cfg, + tm_source, + tc_receiver, + packet_id_lookup, + None, + )?, }) } diff --git a/satrs/CHANGELOG.md b/satrs/CHANGELOG.md index b539457..e119322 100644 --- a/satrs/CHANGELOG.md +++ b/satrs/CHANGELOG.md @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Introduced generic `EventMessage` which is generic over the event type and the additional parameter type. This message also contains the sender ID which can be useful for debugging or application layer / FDIR logic. +- Stop signal handling for the TCP servers. ## Changed diff --git a/satrs/src/hal/std/tcp_cobs_server.rs b/satrs/src/hal/std/tcp_cobs_server.rs index 7e7036f..4158408 100644 --- a/satrs/src/hal/std/tcp_cobs_server.rs +++ b/satrs/src/hal/std/tcp_cobs_server.rs @@ -1,5 +1,7 @@ +use alloc::sync::Arc; use alloc::vec; use cobs::encode; +use core::sync::atomic::AtomicBool; use delegate::delegate; use std::io::Write; use std::net::SocketAddr; @@ -140,6 +142,7 @@ impl< cfg: ServerConfig, tm_source: TmSource, tc_receiver: TcReceiver, + stop_signal: Option>, ) -> Result { Ok(Self { generic_server: TcpTmtcGenericServer::new( @@ -148,6 +151,7 @@ impl< CobsTmSender::new(cfg.tm_buffer_size), tm_source, tc_receiver, + stop_signal, )?, }) } @@ -178,6 +182,7 @@ mod tests { io::{Read, Write}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, thread, + time::Instant, }; use crate::{ @@ -212,11 +217,13 @@ mod tests { addr: &SocketAddr, tc_receiver: SyncTcCacher, tm_source: SyncTmSource, + stop_signal: Option>, ) -> TcpTmtcInCobsServer<(), (), SyncTmSource, SyncTcCacher> { TcpTmtcInCobsServer::new( ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), tm_source, tc_receiver, + stop_signal, ) .expect("TCP server generation failed") } @@ -226,7 +233,8 @@ mod tests { let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); let tc_receiver = SyncTcCacher::default(); let tm_source = SyncTmSource::default(); - let mut tcp_server = generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source); + let mut tcp_server = + generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source, None); let dest_addr = tcp_server .local_addr() .expect("retrieving dest addr failed"); @@ -278,8 +286,12 @@ mod tests { let mut tm_source = SyncTmSource::default(); tm_source.add_tm(&INVERTED_PACKET); tm_source.add_tm(&SIMPLE_PACKET); - let mut tcp_server = - generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source.clone()); + let mut tcp_server = generic_tmtc_server( + &auto_port_addr, + tc_receiver.clone(), + tm_source.clone(), + None, + ); let dest_addr = tcp_server .local_addr() .expect("retrieving dest addr failed"); @@ -376,4 +388,30 @@ mod tests { assert_eq!(tc_queue.pop_front().unwrap(), &INVERTED_PACKET); drop(tc_queue); } + + #[test] + fn test_server_stop_signal() { + let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let tc_receiver = SyncTcCacher::default(); + let tm_source = SyncTmSource::default(); + let stop_signal = Arc::new(AtomicBool::new(false)); + let mut tcp_server = generic_tmtc_server( + &auto_port_addr, + tc_receiver.clone(), + tm_source, + Some(stop_signal.clone()), + ); + let start = Instant::now(); + // Call the connection handler in separate thread, does block. + thread::spawn(move || loop { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + if Instant::now() - start > Duration::from_millis(50) { + panic!("regular stop signal handling failed"); + } + }); + stop_signal.store(true, Ordering::Relaxed); + } } diff --git a/satrs/src/hal/std/tcp_server.rs b/satrs/src/hal/std/tcp_server.rs index deeb902..bfbcae0 100644 --- a/satrs/src/hal/std/tcp_server.rs +++ b/satrs/src/hal/std/tcp_server.rs @@ -1,6 +1,8 @@ //! Generic TCP TMTC servers with different TMTC format flavours. +use alloc::sync::Arc; use alloc::vec; use alloc::vec::Vec; +use core::sync::atomic::AtomicBool; use core::time::Duration; use socket2::{Domain, Socket, Type}; use std::io::Read; @@ -145,6 +147,7 @@ pub struct TcpTmtcGenericServer< pub(crate) tm_buffer: Vec, pub(crate) tc_receiver: TcReceiver, pub(crate) tc_buffer: Vec, + stop_signal: Option>, tc_handler: TcParser, tm_handler: TmSender, } @@ -176,6 +179,7 @@ impl< tm_sender: TmSender, tm_source: TmSource, tc_receiver: TcReceiver, + stop_signal: Option>, ) -> Result { // Create a TCP listener bound to two addresses. let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; @@ -194,6 +198,7 @@ impl< tm_buffer: vec![0; cfg.tm_buffer_size], tc_receiver, tc_buffer: vec![0; cfg.tc_buffer_size], + stop_signal, }) } @@ -284,6 +289,16 @@ impl< // No TC read, no TM was sent, but the client has not disconnected. // Perform an inner delay to avoid burning CPU time. thread::sleep(self.inner_loop_delay); + // Optional stop signal handling. + if self.stop_signal.is_some() + && self + .stop_signal + .as_ref() + .unwrap() + .load(std::sync::atomic::Ordering::Relaxed) + { + return Ok(connection_result); + } } } _ => { diff --git a/satrs/src/hal/std/tcp_spacepackets_server.rs b/satrs/src/hal/std/tcp_spacepackets_server.rs index a33b137..a1abba9 100644 --- a/satrs/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs/src/hal/std/tcp_spacepackets_server.rs @@ -1,3 +1,5 @@ +use alloc::sync::Arc; +use core::sync::atomic::AtomicBool; use delegate::delegate; use std::{ io::Write, @@ -131,6 +133,7 @@ impl< tm_source: TmSource, tc_receiver: TcReceiver, packet_id_checker: PacketIdChecker, + stop_signal: Option>, ) -> Result { Ok(Self { generic_server: TcpTmtcGenericServer::new( @@ -139,6 +142,7 @@ impl< SpacepacketsTmSender::default(), tm_source, tc_receiver, + stop_signal, )?, }) } @@ -197,12 +201,14 @@ mod tests { tc_receiver: SyncTcCacher, tm_source: SyncTmSource, packet_id_lookup: HashSet, + stop_signal: Option>, ) -> TcpSpacepacketsServer<(), (), SyncTmSource, SyncTcCacher, HashSet> { TcpSpacepacketsServer::new( ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), tm_source, tc_receiver, packet_id_lookup, + stop_signal, ) .expect("TCP server generation failed") } @@ -219,6 +225,7 @@ mod tests { tc_receiver.clone(), tm_source, packet_id_lookup, + None, ); let dest_addr = tcp_server .local_addr() @@ -288,6 +295,7 @@ mod tests { tc_receiver.clone(), tm_source, packet_id_lookup, + None, ); let dest_addr = tcp_server .local_addr() diff --git a/satrs/tests/pus_verification.rs b/satrs/tests/pus_verification.rs index 743535f..ada5850 100644 --- a/satrs/tests/pus_verification.rs +++ b/satrs/tests/pus_verification.rs @@ -1,4 +1,4 @@ -// #[cfg(feature = "crossbeam")] +#[cfg(feature = "crossbeam")] pub mod crossbeam_test { use hashbrown::HashMap; use satrs::pool::{PoolProvider, PoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig}; diff --git a/satrs/tests/tcp_servers.rs b/satrs/tests/tcp_servers.rs index ff3fe78..02e3824 100644 --- a/satrs/tests/tcp_servers.rs +++ b/satrs/tests/tcp_servers.rs @@ -96,6 +96,7 @@ fn test_cobs_server() { ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), tm_source, tc_receiver.clone(), + None, ) .expect("TCP server generation failed"); let dest_addr = tcp_server @@ -179,6 +180,7 @@ fn test_ccsds_server() { tm_source, tc_receiver.clone(), packet_id_lookup, + None, ) .expect("TCP server generation failed"); let dest_addr = tcp_server