All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good
391 lines
14 KiB
Rust
391 lines
14 KiB
Rust
use std::{
|
|
collections::VecDeque,
|
|
io::ErrorKind,
|
|
net::{SocketAddr, UdpSocket},
|
|
sync::{atomic::AtomicBool, mpsc, Arc},
|
|
time::Duration,
|
|
};
|
|
|
|
use satrs_minisim::{SimMessageProvider, SimReply, SimRequest};
|
|
|
|
// A UDP server which handles all TC received by a client application.
|
|
pub struct SimUdpServer {
|
|
socket: UdpSocket,
|
|
request_sender: mpsc::Sender<SimRequest>,
|
|
// shared_last_sender: SharedSocketAddr,
|
|
reply_receiver: mpsc::Receiver<SimReply>,
|
|
reply_queue: VecDeque<SimReply>,
|
|
max_num_replies: usize,
|
|
// Stop signal to stop the server. Required for unittests and useful to allow clean shutdown
|
|
// of the application.
|
|
stop_signal: Option<Arc<AtomicBool>>,
|
|
idle_sleep_period_ms: u64,
|
|
req_buf: [u8; 4096],
|
|
sender_addr: Option<SocketAddr>,
|
|
}
|
|
|
|
impl SimUdpServer {
|
|
pub fn new(
|
|
local_port: u16,
|
|
request_sender: mpsc::Sender<SimRequest>,
|
|
reply_receiver: mpsc::Receiver<SimReply>,
|
|
max_num_replies: usize,
|
|
stop_signal: Option<Arc<AtomicBool>>,
|
|
) -> std::io::Result<Self> {
|
|
let socket = UdpSocket::bind(SocketAddr::from(([0, 0, 0, 0], local_port)))?;
|
|
socket.set_nonblocking(true)?;
|
|
Ok(Self {
|
|
socket,
|
|
request_sender,
|
|
reply_receiver,
|
|
reply_queue: VecDeque::new(),
|
|
max_num_replies,
|
|
stop_signal,
|
|
idle_sleep_period_ms: 3,
|
|
req_buf: [0; 4096],
|
|
sender_addr: None,
|
|
})
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
pub fn server_addr(&self) -> std::io::Result<SocketAddr> {
|
|
self.socket.local_addr()
|
|
}
|
|
|
|
pub fn run(&mut self) {
|
|
loop {
|
|
if let Some(stop_signal) = &self.stop_signal {
|
|
if stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
|
|
break;
|
|
}
|
|
}
|
|
let processed_requests = self.process_requests();
|
|
let processed_replies = self.process_replies();
|
|
let sent_replies = self.send_replies();
|
|
// Sleep for a bit if there is nothing to do to prevent burning CPU cycles. Delay
|
|
// should be kept short to ensure responsiveness of the system.
|
|
if !processed_requests && !processed_replies && !sent_replies {
|
|
std::thread::sleep(Duration::from_millis(self.idle_sleep_period_ms));
|
|
}
|
|
}
|
|
}
|
|
|
|
fn process_requests(&mut self) -> bool {
|
|
let mut processed_requests = false;
|
|
loop {
|
|
// Blocks for a certain amount of time until data is received to allow doing periodic
|
|
// work like checking the stop signal.
|
|
let (bytes_read, src) = match self.socket.recv_from(&mut self.req_buf) {
|
|
Ok((bytes_read, src)) => (bytes_read, src),
|
|
Err(e) if e.kind() == ErrorKind::WouldBlock => {
|
|
// Continue to perform regular checks like the stop signal.
|
|
break;
|
|
}
|
|
Err(e) => {
|
|
// Handle unexpected errors (e.g., socket closed) here.
|
|
log::error!("unexpected request server error: {e}");
|
|
break;
|
|
}
|
|
};
|
|
|
|
self.sender_addr = Some(src);
|
|
|
|
let sim_req = SimRequest::from_raw_data(&self.req_buf[..bytes_read]);
|
|
if sim_req.is_err() {
|
|
log::warn!(
|
|
"received UDP request with invalid format: {}",
|
|
sim_req.unwrap_err()
|
|
);
|
|
return processed_requests;
|
|
}
|
|
self.request_sender.send(sim_req.unwrap()).unwrap();
|
|
processed_requests = true;
|
|
}
|
|
processed_requests
|
|
}
|
|
|
|
fn process_replies(&mut self) -> bool {
|
|
let mut processed_replies = false;
|
|
loop {
|
|
match self.reply_receiver.try_recv() {
|
|
Ok(reply) => {
|
|
if self.reply_queue.len() >= self.max_num_replies {
|
|
self.reply_queue.pop_front();
|
|
}
|
|
self.reply_queue.push_back(reply);
|
|
processed_replies = true;
|
|
}
|
|
Err(e) => match e {
|
|
mpsc::TryRecvError::Empty => return processed_replies,
|
|
mpsc::TryRecvError::Disconnected => {
|
|
log::error!("all UDP reply senders disconnected")
|
|
}
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
fn send_replies(&mut self) -> bool {
|
|
if self.sender_addr.is_none() {
|
|
return false;
|
|
}
|
|
let mut sent_replies = false;
|
|
while !self.reply_queue.is_empty() {
|
|
let next_reply_to_send = self.reply_queue.pop_front().unwrap();
|
|
self.socket
|
|
.send_to(
|
|
serde_json::to_string(&next_reply_to_send)
|
|
.unwrap()
|
|
.as_bytes(),
|
|
self.sender_addr.unwrap(),
|
|
)
|
|
.expect("sending reply failed");
|
|
sent_replies = true;
|
|
}
|
|
sent_replies
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::{
|
|
io::ErrorKind,
|
|
sync::{
|
|
atomic::{AtomicBool, Ordering},
|
|
mpsc, Arc,
|
|
},
|
|
time::Duration,
|
|
};
|
|
|
|
use satrs_minisim::{
|
|
eps::{PcduReply, PcduRequest},
|
|
udp::{ReceptionError, SimUdpClient},
|
|
SimCtrlReply, SimCtrlRequest, SimReply, SimRequest,
|
|
};
|
|
|
|
use crate::eps::tests::get_all_off_switch_map;
|
|
use delegate::delegate;
|
|
|
|
use super::SimUdpServer;
|
|
|
|
// Wait time to ensure even possibly laggy systems like CI servers can run the tests.
|
|
const SERVER_WAIT_TIME_MS: u64 = 50;
|
|
|
|
struct UdpTestbench {
|
|
client: SimUdpClient,
|
|
stop_signal: Arc<AtomicBool>,
|
|
request_receiver: mpsc::Receiver<SimRequest>,
|
|
reply_sender: mpsc::Sender<SimReply>,
|
|
}
|
|
|
|
impl UdpTestbench {
|
|
pub fn new(
|
|
client_non_blocking: bool,
|
|
client_read_timeout_ms: Option<u64>,
|
|
max_num_replies: usize,
|
|
) -> std::io::Result<(Self, SimUdpServer)> {
|
|
let (request_sender, request_receiver) = mpsc::channel();
|
|
let (reply_sender, reply_receiver) = mpsc::channel();
|
|
let stop_signal = Arc::new(AtomicBool::new(false));
|
|
let server = SimUdpServer::new(
|
|
0,
|
|
request_sender,
|
|
reply_receiver,
|
|
max_num_replies,
|
|
Some(stop_signal.clone()),
|
|
)?;
|
|
let server_addr = server.server_addr()?;
|
|
Ok((
|
|
Self {
|
|
client: SimUdpClient::new(
|
|
&server_addr,
|
|
client_non_blocking,
|
|
client_read_timeout_ms,
|
|
)?,
|
|
stop_signal,
|
|
request_receiver,
|
|
reply_sender,
|
|
},
|
|
server,
|
|
))
|
|
}
|
|
|
|
pub fn try_recv_request(&self) -> Result<SimRequest, mpsc::TryRecvError> {
|
|
self.request_receiver.try_recv()
|
|
}
|
|
|
|
pub fn stop(&self) {
|
|
self.stop_signal.store(true, Ordering::Relaxed);
|
|
}
|
|
|
|
pub fn send_reply(&self, sim_reply: &SimReply) {
|
|
self.reply_sender
|
|
.send(sim_reply.clone())
|
|
.expect("sending sim reply failed");
|
|
}
|
|
|
|
delegate! {
|
|
to self.client {
|
|
pub fn send_request(&self, sim_request: &SimRequest) -> std::io::Result<usize>;
|
|
pub fn recv_sim_reply(&mut self) -> Result<SimReply, ReceptionError>;
|
|
}
|
|
}
|
|
|
|
pub fn check_no_sim_reply_available(&mut self) {
|
|
if let Err(ReceptionError::Io(ref io_error)) = self.recv_sim_reply() {
|
|
if io_error.kind() == ErrorKind::WouldBlock {
|
|
// Continue to perform regular checks like the stop signal.
|
|
return;
|
|
} else {
|
|
// Handle unexpected errors (e.g., socket closed) here.
|
|
panic!("unexpected request server error: {io_error}");
|
|
}
|
|
}
|
|
panic!("unexpected reply available");
|
|
}
|
|
|
|
pub fn check_next_sim_reply(&mut self, expected_reply: &SimReply) {
|
|
match self.recv_sim_reply() {
|
|
Ok(received_sim_reply) => assert_eq!(expected_reply, &received_sim_reply),
|
|
Err(e) => match e {
|
|
ReceptionError::Io(ref io_error) => {
|
|
if io_error.kind() == ErrorKind::WouldBlock {
|
|
// Continue to perform regular checks like the stop signal.
|
|
panic!("no simulation reply received");
|
|
} else {
|
|
// Handle unexpected errors (e.g., socket closed) here.
|
|
panic!("unexpected request server error: {e}");
|
|
}
|
|
}
|
|
ReceptionError::SerdeJson(json_error) => {
|
|
panic!("unexpected JSON error: {json_error}");
|
|
}
|
|
},
|
|
}
|
|
}
|
|
}
|
|
#[test]
|
|
fn test_basic_udp_request_reception() {
|
|
let (udp_testbench, mut udp_server) =
|
|
UdpTestbench::new(true, Some(SERVER_WAIT_TIME_MS), 10)
|
|
.expect("could not create testbench");
|
|
let server_thread = std::thread::spawn(move || udp_server.run());
|
|
let sim_request = SimRequest::new(PcduRequest::RequestSwitchInfo);
|
|
udp_testbench
|
|
.send_request(&sim_request)
|
|
.expect("sending request failed");
|
|
std::thread::sleep(Duration::from_millis(SERVER_WAIT_TIME_MS));
|
|
// Check that the sim request has arrives and was forwarded.
|
|
let received_sim_request = udp_testbench
|
|
.try_recv_request()
|
|
.expect("did not receive request");
|
|
assert_eq!(sim_request, received_sim_request);
|
|
// Stop the server.
|
|
udp_testbench.stop();
|
|
server_thread.join().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn test_udp_reply_server() {
|
|
let (mut udp_testbench, mut udp_server) =
|
|
UdpTestbench::new(false, Some(SERVER_WAIT_TIME_MS), 10)
|
|
.expect("could not create testbench");
|
|
let server_thread = std::thread::spawn(move || udp_server.run());
|
|
udp_testbench
|
|
.send_request(&SimRequest::new(SimCtrlRequest::Ping))
|
|
.expect("sending request failed");
|
|
|
|
let sim_reply = SimReply::new(PcduReply::SwitchInfo(get_all_off_switch_map()));
|
|
udp_testbench.send_reply(&sim_reply);
|
|
|
|
udp_testbench.check_next_sim_reply(&sim_reply);
|
|
|
|
// Stop the server.
|
|
udp_testbench.stop();
|
|
server_thread.join().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn test_udp_req_server_and_reply_sender() {
|
|
let (mut udp_testbench, mut udp_server) =
|
|
UdpTestbench::new(false, Some(SERVER_WAIT_TIME_MS), 10)
|
|
.expect("could not create testbench");
|
|
|
|
let server_thread = std::thread::spawn(move || udp_server.run());
|
|
|
|
// Send a ping so that the server knows the address of the client.
|
|
// Do not check that the request arrives on the receiver side, is done by other test.
|
|
udp_testbench
|
|
.send_request(&SimRequest::new(SimCtrlRequest::Ping))
|
|
.expect("sending request failed");
|
|
|
|
// Send a reply to the server, ensure it gets forwarded to the client.
|
|
let sim_reply = SimReply::new(PcduReply::SwitchInfo(get_all_off_switch_map()));
|
|
udp_testbench.send_reply(&sim_reply);
|
|
std::thread::sleep(Duration::from_millis(SERVER_WAIT_TIME_MS));
|
|
|
|
// Now we check that the reply server can send back replies to the client.
|
|
udp_testbench.check_next_sim_reply(&sim_reply);
|
|
|
|
udp_testbench.stop();
|
|
server_thread.join().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn test_udp_replies_client_unconnected() {
|
|
let (mut udp_testbench, mut udp_server) =
|
|
UdpTestbench::new(true, None, 10).expect("could not create testbench");
|
|
|
|
let server_thread = std::thread::spawn(move || udp_server.run());
|
|
|
|
// Send a reply to the server. The client is not connected, so it won't get forwarded.
|
|
let sim_reply = SimReply::new(PcduReply::SwitchInfo(get_all_off_switch_map()));
|
|
udp_testbench.send_reply(&sim_reply);
|
|
std::thread::sleep(Duration::from_millis(10));
|
|
|
|
udp_testbench.check_no_sim_reply_available();
|
|
|
|
// Connect by sending a ping.
|
|
udp_testbench
|
|
.send_request(&SimRequest::new(SimCtrlRequest::Ping))
|
|
.expect("sending request failed");
|
|
std::thread::sleep(Duration::from_millis(SERVER_WAIT_TIME_MS));
|
|
|
|
udp_testbench.check_next_sim_reply(&sim_reply);
|
|
|
|
// Now we check that the reply server can sent back replies to the client.
|
|
udp_testbench.stop();
|
|
server_thread.join().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn test_udp_reply_server_old_replies_overwritten() {
|
|
let (mut udp_testbench, mut udp_server) =
|
|
UdpTestbench::new(true, None, 3).expect("could not create testbench");
|
|
|
|
let server_thread = std::thread::spawn(move || udp_server.run());
|
|
|
|
// The server only caches up to 3 replies.
|
|
let sim_reply = SimReply::new(SimCtrlReply::Pong);
|
|
for _ in 0..4 {
|
|
udp_testbench.send_reply(&sim_reply);
|
|
}
|
|
std::thread::sleep(Duration::from_millis(20));
|
|
|
|
udp_testbench.check_no_sim_reply_available();
|
|
|
|
// Connect by sending a ping.
|
|
udp_testbench
|
|
.send_request(&SimRequest::new(SimCtrlRequest::Ping))
|
|
.expect("sending request failed");
|
|
std::thread::sleep(Duration::from_millis(SERVER_WAIT_TIME_MS));
|
|
|
|
for _ in 0..3 {
|
|
udp_testbench.check_next_sim_reply(&sim_reply);
|
|
}
|
|
udp_testbench.check_no_sim_reply_available();
|
|
udp_testbench.stop();
|
|
server_thread.join().unwrap();
|
|
}
|
|
}
|