introduce stop signal handling
Some checks are pending
Rust/sat-rs/pipeline/head Build started...
Rust/sat-rs/pipeline/pr-main This commit looks good

This commit is contained in:
Robin Müller 2024-04-09 17:21:43 +02:00
parent 0fec994028
commit 3cc9dd3c48
7 changed files with 75 additions and 5 deletions

View File

@ -104,7 +104,13 @@ impl<
packet_id_lookup: HashSet<PacketId>,
) -> Result<Self, std::io::Error> {
Ok(Self {
server: TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)?,
server: TcpSpacepacketsServer::new(
cfg,
tm_source,
tc_receiver,
packet_id_lookup,
None,
)?,
})
}

View File

@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Introduced generic `EventMessage` which is generic over the event type and the additional
parameter type. This message also contains the sender ID which can be useful for debugging
or application layer / FDIR logic.
- Stop signal handling for the TCP servers.
## Changed

View File

@ -1,5 +1,7 @@
use alloc::sync::Arc;
use alloc::vec;
use cobs::encode;
use core::sync::atomic::AtomicBool;
use delegate::delegate;
use std::io::Write;
use std::net::SocketAddr;
@ -140,6 +142,7 @@ impl<
cfg: ServerConfig,
tm_source: TmSource,
tc_receiver: TcReceiver,
stop_signal: Option<Arc<AtomicBool>>,
) -> Result<Self, std::io::Error> {
Ok(Self {
generic_server: TcpTmtcGenericServer::new(
@ -148,6 +151,7 @@ impl<
CobsTmSender::new(cfg.tm_buffer_size),
tm_source,
tc_receiver,
stop_signal,
)?,
})
}
@ -178,6 +182,7 @@ mod tests {
io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
thread,
time::Instant,
};
use crate::{
@ -212,11 +217,13 @@ mod tests {
addr: &SocketAddr,
tc_receiver: SyncTcCacher,
tm_source: SyncTmSource,
stop_signal: Option<Arc<AtomicBool>>,
) -> TcpTmtcInCobsServer<(), (), SyncTmSource, SyncTcCacher> {
TcpTmtcInCobsServer::new(
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
tm_source,
tc_receiver,
stop_signal,
)
.expect("TCP server generation failed")
}
@ -226,7 +233,8 @@ mod tests {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default();
let tm_source = SyncTmSource::default();
let mut tcp_server = generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source);
let mut tcp_server =
generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source, None);
let dest_addr = tcp_server
.local_addr()
.expect("retrieving dest addr failed");
@ -278,8 +286,12 @@ mod tests {
let mut tm_source = SyncTmSource::default();
tm_source.add_tm(&INVERTED_PACKET);
tm_source.add_tm(&SIMPLE_PACKET);
let mut tcp_server =
generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source.clone());
let mut tcp_server = generic_tmtc_server(
&auto_port_addr,
tc_receiver.clone(),
tm_source.clone(),
None,
);
let dest_addr = tcp_server
.local_addr()
.expect("retrieving dest addr failed");
@ -376,4 +388,30 @@ mod tests {
assert_eq!(tc_queue.pop_front().unwrap(), &INVERTED_PACKET);
drop(tc_queue);
}
#[test]
fn test_server_stop_signal() {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default();
let tm_source = SyncTmSource::default();
let stop_signal = Arc::new(AtomicBool::new(false));
let mut tcp_server = generic_tmtc_server(
&auto_port_addr,
tc_receiver.clone(),
tm_source,
Some(stop_signal.clone()),
);
let start = Instant::now();
// Call the connection handler in separate thread, does block.
thread::spawn(move || loop {
let result = tcp_server.handle_next_connection();
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
if Instant::now() - start > Duration::from_millis(50) {
panic!("regular stop signal handling failed");
}
});
stop_signal.store(true, Ordering::Relaxed);
}
}

View File

@ -1,6 +1,8 @@
//! Generic TCP TMTC servers with different TMTC format flavours.
use alloc::sync::Arc;
use alloc::vec;
use alloc::vec::Vec;
use core::sync::atomic::AtomicBool;
use core::time::Duration;
use socket2::{Domain, Socket, Type};
use std::io::Read;
@ -145,6 +147,7 @@ pub struct TcpTmtcGenericServer<
pub(crate) tm_buffer: Vec<u8>,
pub(crate) tc_receiver: TcReceiver,
pub(crate) tc_buffer: Vec<u8>,
stop_signal: Option<Arc<AtomicBool>>,
tc_handler: TcParser,
tm_handler: TmSender,
}
@ -176,6 +179,7 @@ impl<
tm_sender: TmSender,
tm_source: TmSource,
tc_receiver: TcReceiver,
stop_signal: Option<Arc<AtomicBool>>,
) -> Result<Self, std::io::Error> {
// Create a TCP listener bound to two addresses.
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
@ -194,6 +198,7 @@ impl<
tm_buffer: vec![0; cfg.tm_buffer_size],
tc_receiver,
tc_buffer: vec![0; cfg.tc_buffer_size],
stop_signal,
})
}
@ -284,6 +289,16 @@ impl<
// No TC read, no TM was sent, but the client has not disconnected.
// Perform an inner delay to avoid burning CPU time.
thread::sleep(self.inner_loop_delay);
// Optional stop signal handling.
if self.stop_signal.is_some()
&& self
.stop_signal
.as_ref()
.unwrap()
.load(std::sync::atomic::Ordering::Relaxed)
{
return Ok(connection_result);
}
}
}
_ => {

View File

@ -1,3 +1,5 @@
use alloc::sync::Arc;
use core::sync::atomic::AtomicBool;
use delegate::delegate;
use std::{
io::Write,
@ -131,6 +133,7 @@ impl<
tm_source: TmSource,
tc_receiver: TcReceiver,
packet_id_checker: PacketIdChecker,
stop_signal: Option<Arc<AtomicBool>>,
) -> Result<Self, std::io::Error> {
Ok(Self {
generic_server: TcpTmtcGenericServer::new(
@ -139,6 +142,7 @@ impl<
SpacepacketsTmSender::default(),
tm_source,
tc_receiver,
stop_signal,
)?,
})
}
@ -197,12 +201,14 @@ mod tests {
tc_receiver: SyncTcCacher,
tm_source: SyncTmSource,
packet_id_lookup: HashSet<PacketId>,
stop_signal: Option<Arc<AtomicBool>>,
) -> TcpSpacepacketsServer<(), (), SyncTmSource, SyncTcCacher, HashSet<PacketId>> {
TcpSpacepacketsServer::new(
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
tm_source,
tc_receiver,
packet_id_lookup,
stop_signal,
)
.expect("TCP server generation failed")
}
@ -219,6 +225,7 @@ mod tests {
tc_receiver.clone(),
tm_source,
packet_id_lookup,
None,
);
let dest_addr = tcp_server
.local_addr()
@ -288,6 +295,7 @@ mod tests {
tc_receiver.clone(),
tm_source,
packet_id_lookup,
None,
);
let dest_addr = tcp_server
.local_addr()

View File

@ -1,4 +1,4 @@
// #[cfg(feature = "crossbeam")]
#[cfg(feature = "crossbeam")]
pub mod crossbeam_test {
use hashbrown::HashMap;
use satrs::pool::{PoolProvider, PoolProviderWithGuards, StaticMemoryPool, StaticPoolConfig};

View File

@ -96,6 +96,7 @@ fn test_cobs_server() {
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
tm_source,
tc_receiver.clone(),
None,
)
.expect("TCP server generation failed");
let dest_addr = tcp_server
@ -179,6 +180,7 @@ fn test_ccsds_server() {
tm_source,
tc_receiver.clone(),
packet_id_lookup,
None,
)
.expect("TCP server generation failed");
let dest_addr = tcp_server