Merge pull request 'introduce stop signal handling for TCP' (#149) from tcp-server-stop-signal into main
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good

Reviewed-on: #149
This commit is contained in:
Robin Müller 2024-04-09 18:11:29 +02:00
commit f71ba3e8d8
7 changed files with 75 additions and 5 deletions

View File

@ -104,7 +104,13 @@ impl<
packet_id_lookup: HashSet<PacketId>, packet_id_lookup: HashSet<PacketId>,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
Ok(Self { Ok(Self {
server: TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)?, server: TcpSpacepacketsServer::new(
cfg,
tm_source,
tc_receiver,
packet_id_lookup,
None,
)?,
}) })
} }

View File

@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Introduced generic `EventMessage` which is generic over the event type and the additional - Introduced generic `EventMessage` which is generic over the event type and the additional
parameter type. This message also contains the sender ID which can be useful for debugging parameter type. This message also contains the sender ID which can be useful for debugging
or application layer / FDIR logic. or application layer / FDIR logic.
- Stop signal handling for the TCP servers.
## Changed ## Changed

View File

@ -1,5 +1,7 @@
use alloc::sync::Arc;
use alloc::vec; use alloc::vec;
use cobs::encode; use cobs::encode;
use core::sync::atomic::AtomicBool;
use delegate::delegate; use delegate::delegate;
use std::io::Write; use std::io::Write;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -140,6 +142,7 @@ impl<
cfg: ServerConfig, cfg: ServerConfig,
tm_source: TmSource, tm_source: TmSource,
tc_receiver: TcReceiver, tc_receiver: TcReceiver,
stop_signal: Option<Arc<AtomicBool>>,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
Ok(Self { Ok(Self {
generic_server: TcpTmtcGenericServer::new( generic_server: TcpTmtcGenericServer::new(
@ -148,6 +151,7 @@ impl<
CobsTmSender::new(cfg.tm_buffer_size), CobsTmSender::new(cfg.tm_buffer_size),
tm_source, tm_source,
tc_receiver, tc_receiver,
stop_signal,
)?, )?,
}) })
} }
@ -178,6 +182,7 @@ mod tests {
io::{Read, Write}, io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
thread, thread,
time::Instant,
}; };
use crate::{ use crate::{
@ -212,11 +217,13 @@ mod tests {
addr: &SocketAddr, addr: &SocketAddr,
tc_receiver: SyncTcCacher, tc_receiver: SyncTcCacher,
tm_source: SyncTmSource, tm_source: SyncTmSource,
stop_signal: Option<Arc<AtomicBool>>,
) -> TcpTmtcInCobsServer<(), (), SyncTmSource, SyncTcCacher> { ) -> TcpTmtcInCobsServer<(), (), SyncTmSource, SyncTcCacher> {
TcpTmtcInCobsServer::new( TcpTmtcInCobsServer::new(
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
tm_source, tm_source,
tc_receiver, tc_receiver,
stop_signal,
) )
.expect("TCP server generation failed") .expect("TCP server generation failed")
} }
@ -226,7 +233,8 @@ mod tests {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default(); let tc_receiver = SyncTcCacher::default();
let tm_source = SyncTmSource::default(); let tm_source = SyncTmSource::default();
let mut tcp_server = generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source); let mut tcp_server =
generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source, None);
let dest_addr = tcp_server let dest_addr = tcp_server
.local_addr() .local_addr()
.expect("retrieving dest addr failed"); .expect("retrieving dest addr failed");
@ -278,8 +286,12 @@ mod tests {
let mut tm_source = SyncTmSource::default(); let mut tm_source = SyncTmSource::default();
tm_source.add_tm(&INVERTED_PACKET); tm_source.add_tm(&INVERTED_PACKET);
tm_source.add_tm(&SIMPLE_PACKET); tm_source.add_tm(&SIMPLE_PACKET);
let mut tcp_server = let mut tcp_server = generic_tmtc_server(
generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source.clone()); &auto_port_addr,
tc_receiver.clone(),
tm_source.clone(),
None,
);
let dest_addr = tcp_server let dest_addr = tcp_server
.local_addr() .local_addr()
.expect("retrieving dest addr failed"); .expect("retrieving dest addr failed");
@ -376,4 +388,30 @@ mod tests {
assert_eq!(tc_queue.pop_front().unwrap(), &INVERTED_PACKET); assert_eq!(tc_queue.pop_front().unwrap(), &INVERTED_PACKET);
drop(tc_queue); drop(tc_queue);
} }
#[test]
fn test_server_stop_signal() {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default();
let tm_source = SyncTmSource::default();
let stop_signal = Arc::new(AtomicBool::new(false));
let mut tcp_server = generic_tmtc_server(
&auto_port_addr,
tc_receiver.clone(),
tm_source,
Some(stop_signal.clone()),
);
let start = Instant::now();
// Call the connection handler in separate thread, does block.
thread::spawn(move || loop {
let result = tcp_server.handle_next_connection();
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
if Instant::now() - start > Duration::from_millis(50) {
panic!("regular stop signal handling failed");
}
});
stop_signal.store(true, Ordering::Relaxed);
}
} }

View File

@ -1,6 +1,8 @@
//! Generic TCP TMTC servers with different TMTC format flavours. //! Generic TCP TMTC servers with different TMTC format flavours.
use alloc::sync::Arc;
use alloc::vec; use alloc::vec;
use alloc::vec::Vec; use alloc::vec::Vec;
use core::sync::atomic::AtomicBool;
use core::time::Duration; use core::time::Duration;
use socket2::{Domain, Socket, Type}; use socket2::{Domain, Socket, Type};
use std::io::Read; use std::io::Read;
@ -145,6 +147,7 @@ pub struct TcpTmtcGenericServer<
pub(crate) tm_buffer: Vec<u8>, pub(crate) tm_buffer: Vec<u8>,
pub(crate) tc_receiver: TcReceiver, pub(crate) tc_receiver: TcReceiver,
pub(crate) tc_buffer: Vec<u8>, pub(crate) tc_buffer: Vec<u8>,
stop_signal: Option<Arc<AtomicBool>>,
tc_handler: TcParser, tc_handler: TcParser,
tm_handler: TmSender, tm_handler: TmSender,
} }
@ -176,6 +179,7 @@ impl<
tm_sender: TmSender, tm_sender: TmSender,
tm_source: TmSource, tm_source: TmSource,
tc_receiver: TcReceiver, tc_receiver: TcReceiver,
stop_signal: Option<Arc<AtomicBool>>,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
// Create a TCP listener bound to two addresses. // Create a TCP listener bound to two addresses.
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
@ -194,6 +198,7 @@ impl<
tm_buffer: vec![0; cfg.tm_buffer_size], tm_buffer: vec![0; cfg.tm_buffer_size],
tc_receiver, tc_receiver,
tc_buffer: vec![0; cfg.tc_buffer_size], tc_buffer: vec![0; cfg.tc_buffer_size],
stop_signal,
}) })
} }
@ -284,6 +289,16 @@ impl<
// No TC read, no TM was sent, but the client has not disconnected. // No TC read, no TM was sent, but the client has not disconnected.
// Perform an inner delay to avoid burning CPU time. // Perform an inner delay to avoid burning CPU time.
thread::sleep(self.inner_loop_delay); thread::sleep(self.inner_loop_delay);
// Optional stop signal handling.
if self.stop_signal.is_some()
&& self
.stop_signal
.as_ref()
.unwrap()
.load(std::sync::atomic::Ordering::Relaxed)
{
return Ok(connection_result);
}
} }
} }
_ => { _ => {

View File

@ -1,3 +1,5 @@
use alloc::sync::Arc;
use core::sync::atomic::AtomicBool;
use delegate::delegate; use delegate::delegate;
use std::{ use std::{
io::Write, io::Write,
@ -131,6 +133,7 @@ impl<
tm_source: TmSource, tm_source: TmSource,
tc_receiver: TcReceiver, tc_receiver: TcReceiver,
packet_id_checker: PacketIdChecker, packet_id_checker: PacketIdChecker,
stop_signal: Option<Arc<AtomicBool>>,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
Ok(Self { Ok(Self {
generic_server: TcpTmtcGenericServer::new( generic_server: TcpTmtcGenericServer::new(
@ -139,6 +142,7 @@ impl<
SpacepacketsTmSender::default(), SpacepacketsTmSender::default(),
tm_source, tm_source,
tc_receiver, tc_receiver,
stop_signal,
)?, )?,
}) })
} }
@ -197,12 +201,14 @@ mod tests {
tc_receiver: SyncTcCacher, tc_receiver: SyncTcCacher,
tm_source: SyncTmSource, tm_source: SyncTmSource,
packet_id_lookup: HashSet<PacketId>, packet_id_lookup: HashSet<PacketId>,
stop_signal: Option<Arc<AtomicBool>>,
) -> TcpSpacepacketsServer<(), (), SyncTmSource, SyncTcCacher, HashSet<PacketId>> { ) -> TcpSpacepacketsServer<(), (), SyncTmSource, SyncTcCacher, HashSet<PacketId>> {
TcpSpacepacketsServer::new( TcpSpacepacketsServer::new(
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
tm_source, tm_source,
tc_receiver, tc_receiver,
packet_id_lookup, packet_id_lookup,
stop_signal,
) )
.expect("TCP server generation failed") .expect("TCP server generation failed")
} }
@ -219,6 +225,7 @@ mod tests {
tc_receiver.clone(), tc_receiver.clone(),
tm_source, tm_source,
packet_id_lookup, packet_id_lookup,
None,
); );
let dest_addr = tcp_server let dest_addr = tcp_server
.local_addr() .local_addr()
@ -288,6 +295,7 @@ mod tests {
tc_receiver.clone(), tc_receiver.clone(),
tm_source, tm_source,
packet_id_lookup, packet_id_lookup,
None,
); );
let dest_addr = tcp_server let dest_addr = tcp_server
.local_addr() .local_addr()

View File

@ -1,4 +1,4 @@
// #[cfg(feature = "crossbeam")] #[cfg(feature = "crossbeam")]
pub mod crossbeam_test { pub mod crossbeam_test {
use hashbrown::HashMap; use hashbrown::HashMap;
use satrs::pool::{PoolProvider, PoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig}; use satrs::pool::{PoolProvider, PoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig};

View File

@ -96,6 +96,7 @@ fn test_cobs_server() {
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
tm_source, tm_source,
tc_receiver.clone(), tc_receiver.clone(),
None,
) )
.expect("TCP server generation failed"); .expect("TCP server generation failed");
let dest_addr = tcp_server let dest_addr = tcp_server
@ -179,6 +180,7 @@ fn test_ccsds_server() {
tm_source, tm_source,
tc_receiver.clone(), tc_receiver.clone(),
packet_id_lookup, packet_id_lookup,
None,
) )
.expect("TCP server generation failed"); .expect("TCP server generation failed");
let dest_addr = tcp_server let dest_addr = tcp_server