diff --git a/satrs-example/src/interface/sim_client_udp.rs b/satrs-example/src/interface/sim_client_udp.rs index d16adfd..16db261 100644 --- a/satrs-example/src/interface/sim_client_udp.rs +++ b/satrs-example/src/interface/sim_client_udp.rs @@ -31,7 +31,7 @@ pub fn create_sim_client(sim_request_rx: mpsc::Receiver) -> Option, - ) -> Result { + ) -> Result { let mut reply_buf: [u8; 4096] = [0; 4096]; - let udp_client = UdpSocket::bind("127.0.0.1:0")?; + let mut udp_client = UdpSocket::bind("127.0.0.1:0")?; udp_client.set_read_timeout(Some(Duration::from_millis(100)))?; + Self::attempt_connection(&mut udp_client, simulator_addr, &mut reply_buf)?; + udp_client.set_nonblocking(true)?; + Ok(Self { + udp_client, + simulator_addr, + sim_request_rx, + reply_map: SimReplyMap(HashMap::new()), + reply_buf, + }) + } + + pub fn attempt_connection( + udp_client: &mut UdpSocket, + simulator_addr: SocketAddr, + reply_buf: &mut [u8], + ) -> Result<(), SimClientCreationError> { let sim_req = SimRequest::new_with_epoch_time(SimCtrlRequest::Ping); let sim_req_json = serde_json::to_string(&sim_req).expect("failed to serialize SimRequest"); udp_client.send_to(sim_req_json.as_bytes(), simulator_addr)?; - match udp_client.recv(&mut reply_buf) { + match udp_client.recv(reply_buf) { Ok(reply_len) => { let sim_reply: SimReply = serde_json::from_slice(&reply_buf[0..reply_len])?; if sim_reply.component() != SimComponent::SimCtrl { - return Err(SimClientCreationResult::ReplyIsNotPong(sim_reply)); + return Err(SimClientCreationError::ReplyIsNotPong(sim_reply)); } - udp_client.set_read_timeout(None)?; let sim_ctrl_reply = SimCtrlReply::from_sim_message(&sim_reply).expect("invalid SIM reply"); match sim_ctrl_reply { - SimCtrlReply::Pong => Ok(Self { - udp_client, - simulator_addr, - sim_request_rx, - reply_map: SimReplyMap(HashMap::new()), - reply_buf, - }), SimCtrlReply::InvalidRequest(_) => { panic!("received invalid request reply from UDP sim server") } + SimCtrlReply::Pong => Ok(()), } } Err(e) => { if e.kind() == std::io::ErrorKind::TimedOut || e.kind() == std::io::ErrorKind::WouldBlock { - Err(SimClientCreationResult::Timeout) + Err(SimClientCreationError::Timeout) } else { - Err(SimClientCreationResult::Io(e)) + Err(SimClientCreationError::Io(e)) } } } @@ -174,7 +183,238 @@ impl SimClientUdp { #[cfg(test)] pub mod tests { - // TODO: Write some basic tests which verify that the ping/pong handling/check for the - // constructor works as expected. - fn test_basic() {} + use std::{ + collections::HashMap, + net::{SocketAddr, UdpSocket}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc, Arc, + }, + time::Duration, + }; + + use satrs_minisim::{ + eps::{PcduReply, PcduRequest}, + SerializableSimMsgPayload, SimComponent, SimCtrlReply, SimCtrlRequest, SimMessageProvider, + SimReply, SimRequest, + }; + + use super::SimClientUdp; + + struct UdpSimTestServer { + udp_server: UdpSocket, + request_tx: mpsc::Sender, + reply_rx: mpsc::Receiver, + last_sender: Option, + stop_signal: Arc, + recv_buf: [u8; 1024], + } + + impl UdpSimTestServer { + pub fn new( + request_tx: mpsc::Sender, + reply_rx: mpsc::Receiver, + stop_signal: Arc, + ) -> Self { + let udp_server = UdpSocket::bind("127.0.0.1:0").expect("creating UDP server failed"); + udp_server + .set_nonblocking(true) + .expect("failed to set UDP server to non-blocking"); + Self { + udp_server, + request_tx, + reply_rx, + last_sender: None, + stop_signal, + recv_buf: [0; 1024], + } + } + + pub fn operation(&mut self) { + loop { + let mut no_sim_replies_handled = true; + let mut no_data_received = true; + if self.stop_signal.load(Ordering::Relaxed) { + break; + } + if let Some(last_sender) = self.last_sender { + loop { + match self.reply_rx.try_recv() { + Ok(sim_reply) => { + let sim_reply_json = serde_json::to_string(&sim_reply) + .expect("failed to serialize SimReply"); + self.udp_server + .send_to(sim_reply_json.as_bytes(), last_sender) + .expect("failed to send reply to client from UDP server"); + no_sim_replies_handled = false; + } + Err(e) => match e { + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + panic!("reply sender disconnected") + } + }, + } + } + } + + loop { + match self.udp_server.recv_from(&mut self.recv_buf) { + Ok((read_bytes, from)) => { + let sim_request: SimRequest = + serde_json::from_slice(&self.recv_buf[0..read_bytes]) + .expect("failed to deserialize SimRequest"); + if sim_request.component() == SimComponent::SimCtrl { + // For a ping, we perform the reply handling here directly + let sim_ctrl_request = + SimCtrlRequest::from_sim_message(&sim_request) + .expect("failed to convert SimRequest to SimCtrlRequest"); + match sim_ctrl_request { + SimCtrlRequest::Ping => { + no_data_received = false; + self.last_sender = Some(from); + let sim_reply = SimReply::new(&SimCtrlReply::Pong); + let sim_reply_json = serde_json::to_string(&sim_reply) + .expect("failed to serialize SimReply"); + self.udp_server + .send_to(sim_reply_json.as_bytes(), from) + .expect( + "failed to send reply to client from UDP server", + ); + } + }; + } + // Forward each SIM request for testing purposes. + self.request_tx + .send(sim_request) + .expect("failed to send request"); + } + Err(e) => { + if e.kind() != std::io::ErrorKind::WouldBlock + && e.kind() != std::io::ErrorKind::TimedOut + { + panic!("UDP server error: {}", e); + } + break; + } + } + } + if no_sim_replies_handled && no_data_received { + std::thread::sleep(Duration::from_millis(5)); + } + } + } + + pub fn local_addr(&self) -> SocketAddr { + self.udp_server.local_addr().unwrap() + } + } + + #[test] + fn basic_connection_test() { + let (server_sim_request_tx, server_sim_request_rx) = mpsc::channel(); + let (_server_sim_reply_tx, server_sim_reply_rx) = mpsc::channel(); + let stop_signal = Arc::new(AtomicBool::new(false)); + let mut udp_server = UdpSimTestServer::new( + server_sim_request_tx, + server_sim_reply_rx, + stop_signal.clone(), + ); + let server_addr = udp_server.local_addr(); + let (_client_sim_req_tx, client_sim_req_rx) = mpsc::channel(); + // Need to spawn the simulator UDP server before calling the client constructor. + let jh0 = std::thread::spawn(move || { + udp_server.operation(); + }); + // Creating the client also performs the connection test. + SimClientUdp::new(server_addr, client_sim_req_rx).unwrap(); + let sim_request = server_sim_request_rx + .recv_timeout(Duration::from_millis(50)) + .expect("no SIM request received"); + let ping_request = SimCtrlRequest::from_sim_message(&sim_request) + .expect("failed to create SimCtrlRequest"); + assert_eq!(ping_request, SimCtrlRequest::Ping); + // Stop the server. + stop_signal.store(true, Ordering::Relaxed); + jh0.join().unwrap(); + } + + #[test] + fn basic_request_reply_test() { + let (server_sim_request_tx, server_sim_request_rx) = mpsc::channel(); + let (server_sim_reply_tx, sever_sim_reply_rx) = mpsc::channel(); + let stop_signal = Arc::new(AtomicBool::new(false)); + let mut udp_server = UdpSimTestServer::new( + server_sim_request_tx, + sever_sim_reply_rx, + stop_signal.clone(), + ); + let server_addr = udp_server.local_addr(); + let (client_sim_req_tx, client_sim_req_rx) = mpsc::channel(); + let (client_pcdu_reply_tx, client_pcdu_reply_rx) = mpsc::channel(); + // Need to spawn the simulator UDP server before calling the client constructor. + let jh0 = std::thread::spawn(move || { + udp_server.operation(); + }); + + // Creating the client also performs the connection test. + let mut client = SimClientUdp::new(server_addr, client_sim_req_rx).unwrap(); + client.add_reply_recipient(SimComponent::Pcdu, client_pcdu_reply_tx); + + let sim_request = server_sim_request_rx + .recv_timeout(Duration::from_millis(50)) + .expect("no SIM request received"); + let ping_request = SimCtrlRequest::from_sim_message(&sim_request) + .expect("failed to create SimCtrlRequest"); + assert_eq!(ping_request, SimCtrlRequest::Ping); + + let pcdu_req = PcduRequest::RequestSwitchInfo; + client_sim_req_tx + .send(SimRequest::new_with_epoch_time(pcdu_req)) + .expect("send failed"); + client.operation(); + + // Check that the request arrives properly at the server. + let sim_request = server_sim_request_rx + .recv_timeout(Duration::from_millis(50)) + .expect("no SIM request received"); + let req_recvd_on_server = + PcduRequest::from_sim_message(&sim_request).expect("failed to create SimCtrlRequest"); + matches!(req_recvd_on_server, PcduRequest::RequestSwitchInfo); + + // We inject the reply ourselves. + let pcdu_reply = PcduReply::SwitchInfo(HashMap::new()); + server_sim_reply_tx + .send(SimReply::new(&pcdu_reply)) + .expect("sending PCDU reply failed"); + + // Now we verify that the reply is sent by the UDP server back to the client, and then + // forwarded by the clients internal map. + let mut pcdu_reply_received = false; + for _ in 0..3 { + client.operation(); + + match client_pcdu_reply_rx.try_recv() { + Ok(sim_reply) => { + assert_eq!(sim_reply.component(), SimComponent::Pcdu); + let pcdu_reply_from_client = PcduReply::from_sim_message(&sim_reply) + .expect("failed to create PcduReply"); + assert_eq!(pcdu_reply_from_client, pcdu_reply); + pcdu_reply_received = true; + break; + } + Err(e) => match e { + mpsc::TryRecvError::Empty => std::thread::sleep(Duration::from_millis(10)), + mpsc::TryRecvError::Disconnected => panic!("reply sender disconnected"), + }, + } + } + if !pcdu_reply_received { + panic!("no reply received"); + } + + // Stop the server. + stop_signal.store(true, Ordering::Relaxed); + jh0.join().unwrap(); + } } diff --git a/satrs-minisim/src/acs.rs b/satrs-minisim/src/acs.rs index 0e4b0bd..299d09d 100644 --- a/satrs-minisim/src/acs.rs +++ b/satrs-minisim/src/acs.rs @@ -153,7 +153,7 @@ impl MagnetorquerModel { pub fn send_housekeeping_data(&mut self) { self.reply_sender - .send(SimReply::new(MgtReply::Hk(MgtHkSet { + .send(SimReply::new(&MgtReply::Hk(MgtHkSet { dipole: self.torque_dipole, torquing: self.torquing, }))) diff --git a/satrs-minisim/src/controller.rs b/satrs-minisim/src/controller.rs index 09d2772..b5404d8 100644 --- a/satrs-minisim/src/controller.rs +++ b/satrs-minisim/src/controller.rs @@ -102,7 +102,7 @@ impl SimController { match sim_ctrl_request { SimCtrlRequest::Ping => { self.reply_sender - .send(SimReply::new(SimCtrlReply::Pong)) + .send(SimReply::new(&SimCtrlReply::Pong)) .expect("sending reply from sim controller failed"); } } @@ -178,7 +178,7 @@ impl SimController { error ); self.reply_sender - .send(SimReply::new(SimCtrlReply::from(error))) + .send(SimReply::new(&SimCtrlReply::from(error))) .expect("sending reply from sim controller failed"); } } diff --git a/satrs-minisim/src/eps.rs b/satrs-minisim/src/eps.rs index 07c5c4e..af20272 100644 --- a/satrs-minisim/src/eps.rs +++ b/satrs-minisim/src/eps.rs @@ -44,7 +44,7 @@ impl PcduModel { } pub fn send_switch_info(&mut self) { - let reply = SimReply::new(PcduReply::SwitchInfo(self.switcher_map.clone())); + let reply = SimReply::new(&PcduReply::SwitchInfo(self.switcher_map.clone())); self.reply_sender.send(reply).unwrap(); } diff --git a/satrs-minisim/src/lib.rs b/satrs-minisim/src/lib.rs index 1f0f1b6..1beb32b 100644 --- a/satrs-minisim/src/lib.rs +++ b/satrs-minisim/src/lib.rs @@ -96,11 +96,11 @@ pub struct SimReply { } impl SimReply { - pub fn new>(serializable_reply: T) -> Self { + pub fn new>(serializable_reply: &T) -> Self { Self { inner: SimMessage { target: T::TARGET, - payload: serde_json::to_string(&serializable_reply).unwrap(), + payload: serde_json::to_string(serializable_reply).unwrap(), }, } } @@ -185,7 +185,7 @@ pub mod eps { const TARGET: SimComponent = SimComponent::Pcdu; } - #[derive(Debug, Clone, Serialize, Deserialize)] + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum PcduReply { SwitchInfo(SwitchMap), } @@ -295,7 +295,7 @@ pub mod acs { impl MgmReplyProvider for MgmLis3MdlReply { fn create_mgm_reply(common: MgmReplyCommon) -> SimReply { - SimReply::new(Self::new(common)) + SimReply::new(&Self::new(common)) } } } @@ -382,7 +382,7 @@ pub mod tests { #[test] fn test_basic_reply() { - let sim_reply = SimReply::new(DummyReply::Pong); + let sim_reply = SimReply::new(&DummyReply::Pong); assert_eq!(sim_reply.component(), SimComponent::SimCtrl); assert_eq!(sim_reply.msg_type(), SimMessageType::Reply); let dummy_request = diff --git a/satrs-minisim/src/udp.rs b/satrs-minisim/src/udp.rs index 7d1d56c..e177547 100644 --- a/satrs-minisim/src/udp.rs +++ b/satrs-minisim/src/udp.rs @@ -344,7 +344,7 @@ mod tests { .send_request(&SimRequest::new_with_epoch_time(SimCtrlRequest::Ping)) .expect("sending request failed"); - let sim_reply = SimReply::new(PcduReply::SwitchInfo(get_all_off_switch_map())); + let sim_reply = SimReply::new(&PcduReply::SwitchInfo(get_all_off_switch_map())); udp_testbench.send_reply(&sim_reply); udp_testbench.check_next_sim_reply(&sim_reply); @@ -369,7 +369,7 @@ mod tests { .expect("sending request failed"); // Send a reply to the server, ensure it gets forwarded to the client. - let sim_reply = SimReply::new(PcduReply::SwitchInfo(get_all_off_switch_map())); + let sim_reply = SimReply::new(&PcduReply::SwitchInfo(get_all_off_switch_map())); udp_testbench.send_reply(&sim_reply); std::thread::sleep(Duration::from_millis(SERVER_WAIT_TIME_MS)); @@ -388,7 +388,7 @@ mod tests { let server_thread = std::thread::spawn(move || udp_server.run()); // Send a reply to the server. The client is not connected, so it won't get forwarded. - let sim_reply = SimReply::new(PcduReply::SwitchInfo(get_all_off_switch_map())); + let sim_reply = SimReply::new(&PcduReply::SwitchInfo(get_all_off_switch_map())); udp_testbench.send_reply(&sim_reply); std::thread::sleep(Duration::from_millis(10)); @@ -415,7 +415,7 @@ mod tests { let server_thread = std::thread::spawn(move || udp_server.run()); // The server only caches up to 3 replies. - let sim_reply = SimReply::new(SimCtrlReply::Pong); + let sim_reply = SimReply::new(&SimCtrlReply::Pong); for _ in 0..4 { udp_testbench.send_reply(&sim_reply); }