use std::{ collections::VecDeque, net::{SocketAddr, UdpSocket}, sync::{mpsc, Arc, Mutex}, time::Duration, }; use satrs_minisim::{SimReply, SimRequest}; pub type SharedSocketAddr = Arc>>; // A UDP server which handles all TC received by a client application. pub struct UdpTcServer { socket: UdpSocket, request_sender: mpsc::Sender, shared_last_sender: SharedSocketAddr, } impl UdpTcServer { pub fn new( request_sender: mpsc::Sender, shared_last_sender: SharedSocketAddr, ) -> std::io::Result { let socket = UdpSocket::bind("0.0.0.0:7303")?; Ok(Self { socket, request_sender, shared_last_sender, }) } pub fn run(&mut self) { let mut last_socket_addr = None; loop { // Buffer to store incoming data. let mut buffer = [0u8; 4096]; // Block until data is received. `recv_from` returns the number of bytes read and the // sender's address. let (bytes_read, src) = self .socket .recv_from(&mut buffer) .expect("could not read from socket"); // Convert the buffer into a string slice and print the message. let req_string = std::str::from_utf8(&buffer[..bytes_read]) .expect("Could not write buffer as string"); println!("Received from {}: {}", src, req_string); let sim_req: serde_json::Result = serde_json::from_str(req_string); if sim_req.is_err() { log::warn!( "received UDP request with invalid format: {}", sim_req.unwrap_err() ); continue; } self.request_sender.send(sim_req.unwrap()).unwrap(); // Only set last sender if it has changed. if last_socket_addr.is_some() && src != last_socket_addr.unwrap() { self.shared_last_sender.lock().unwrap().replace(src); } last_socket_addr = Some(src); } } } // A helper object which sends back all replies to the UDP client. // // This helper is scheduled separately to minimize the delay between the requests and replies. pub struct UdpTmClient { reply_receiver: mpsc::Receiver, reply_queue: VecDeque, max_num_replies: usize, socket: UdpSocket, last_sender: SharedSocketAddr, } impl UdpTmClient { pub fn new( reply_receiver: mpsc::Receiver, max_num_replies: usize, last_sender: SharedSocketAddr, ) -> Self { let socket = UdpSocket::bind("127.0.0.1:0").expect("creating UDP client for TM sender failed"); Self { reply_receiver, reply_queue: VecDeque::new(), max_num_replies, socket, last_sender, } } pub fn run(&mut self) { loop { let processed_replies = self.process_replies(); let last_sender_lock = self .last_sender .lock() .expect("locking last UDP sender failed"); let last_sender = *last_sender_lock; drop(last_sender_lock); let mut sent_replies = false; if let Some(last_sender) = last_sender { sent_replies = self.send_replies(last_sender); } if !processed_replies && !sent_replies { std::thread::sleep(Duration::from_millis(20)); } } } fn process_replies(&mut self) -> bool { let mut processed_replies = false; loop { match self.reply_receiver.try_recv() { Ok(reply) => { if self.reply_queue.len() >= self.max_num_replies { self.reply_queue.pop_front(); } self.reply_queue.push_back(reply); processed_replies = true; } Err(e) => match e { mpsc::TryRecvError::Empty => return processed_replies, mpsc::TryRecvError::Disconnected => { log::error!("all UDP reply senders disconnected") } }, } } } fn send_replies(&mut self, last_sender: SocketAddr) -> bool { let mut sent_replies = false; self.socket .connect(last_sender) .expect("connecting to last sender failed"); while !self.reply_queue.is_empty() { let next_reply_to_send = self.reply_queue.pop_front().unwrap(); self.socket .send( serde_json::to_string(&next_reply_to_send) .unwrap() .as_bytes(), ) .expect("sending reply failed"); sent_replies = true; } sent_replies } } #[cfg(test)] mod tests { #[test] fn test_basic_udp_tc_reception() {} }