use std::net::{SocketAddr, UdpSocket}; use std::sync::mpsc; use log::{info, warn}; use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer}; use satrs::pus::HandlingStatus; use satrs::queue::GenericSendError; use satrs::tmtc::PacketAsVec; pub trait UdpTmHandler { fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr); } pub struct DynamicUdpTmHandler { pub tm_rx: mpsc::Receiver, } impl UdpTmHandler for DynamicUdpTmHandler { fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr) { while let Ok(tm) = self.tm_rx.try_recv() { if tm.packet.len() > 9 { let service = tm.packet[7]; let subservice = tm.packet[8]; info!("Sending PUS TM[{service},{subservice}]") } else { info!("Sending PUS TM"); } let result = socket.send_to(&tm.packet, recv_addr); if let Err(e) = result { warn!("Sending TM with UDP socket failed: {e}") } } } } pub struct UdpTmtcServer { pub udp_tc_server: UdpTcServer, GenericSendError>, pub tm_handler: TmHandler, } impl UdpTmtcServer { pub fn periodic_operation(&mut self) { loop { if self.poll_tc_server() == HandlingStatus::Empty { break; } } if let Some(recv_addr) = self.udp_tc_server.last_sender() { self.tm_handler .send_tm_to_udp_client(&self.udp_tc_server.socket, &recv_addr); } } fn poll_tc_server(&mut self) -> HandlingStatus { match self.udp_tc_server.try_recv_tc() { Ok(_) => HandlingStatus::HandledOne, Err(e) => { match e { ReceiveResult::NothingReceived => (), ReceiveResult::Io(io_error) => { warn!("Error receiving TC from UDP server: {io_error}"); } ReceiveResult::Send(send_error) => { warn!("error sending TM to UDP client: {send_error}"); } } HandlingStatus::Empty } } } } #[cfg(test)] mod tests { use std::{ collections::VecDeque, net::IpAddr, sync::{mpsc::TryRecvError, Arc, Mutex}, }; use ops_sat_rs::config::{EXPERIMENT_APID, OBSW_SERVER_ADDR}; use satrs::{ spacepackets::{ ecss::{tc::PusTcCreator, WritablePusPacket}, SpHeader, }, tmtc::PacketSenderRaw, ComponentId, }; use super::*; const UDP_SERVER_ID: ComponentId = 0x05; #[derive(Default, Debug, Clone)] pub struct TestReceiver { tc_vec: Arc>>, } impl PacketSenderRaw for TestReceiver { type Error = (); fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> { self.tc_vec .lock() .unwrap() .push_back(PacketAsVec::new(sender_id, packet.to_vec())); Ok(()) } } #[derive(Default, Debug, Clone)] pub struct TestTmHandler { addrs_to_send_to: Arc>>, } impl UdpTmHandler for TestTmHandler { fn send_tm_to_udp_client(&mut self, _socket: &UdpSocket, recv_addr: &SocketAddr) { self.addrs_to_send_to.lock().unwrap().push_back(*recv_addr); } } #[test] fn test_basic() { let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); let (tx, rx) = mpsc::channel(); let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap(); let tm_handler = TestTmHandler::default(); let _tm_handler_calls = tm_handler.addrs_to_send_to.clone(); let mut udp_dyn_server = UdpTmtcServer { udp_tc_server, tm_handler, }; udp_dyn_server.periodic_operation(); matches!(rx.try_recv(), Err(TryRecvError::Empty)); } #[test] fn test_transactions() { let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); let (tx, rx) = mpsc::channel(); let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap(); let server_addr = udp_tc_server.socket.local_addr().unwrap(); let tm_handler = TestTmHandler::default(); let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); let mut udp_dyn_server = UdpTmtcServer { udp_tc_server, tm_handler, }; let sph = SpHeader::new_for_unseg_tc(EXPERIMENT_APID, 0, 0); let ping_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true) .to_vec() .unwrap(); let client = UdpSocket::bind("127.0.0.1:0").expect("Connecting to UDP server failed"); let client_addr = client.local_addr().unwrap(); client.connect(server_addr).unwrap(); client.send(&ping_tc).unwrap(); udp_dyn_server.periodic_operation(); { let packet_with_sender = rx.try_recv().unwrap(); assert_eq!(packet_with_sender.packet, ping_tc); matches!(rx.try_recv(), Err(TryRecvError::Empty)); } { let mut tm_handler_calls = tm_handler_calls.lock().unwrap(); assert!(!tm_handler_calls.is_empty()); assert_eq!(tm_handler_calls.len(), 1); let received_addr = tm_handler_calls.pop_front().unwrap(); assert_eq!(received_addr, client_addr); } udp_dyn_server.periodic_operation(); matches!(rx.try_recv(), Err(TryRecvError::Empty)); // Still tries to send to the same client. { let mut tm_handler_calls = tm_handler_calls.lock().unwrap(); assert!(!tm_handler_calls.is_empty()); assert_eq!(tm_handler_calls.len(), 1); let received_addr = tm_handler_calls.pop_front().unwrap(); assert_eq!(received_addr, client_addr); } } }