diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index ec37092..827f774 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -143,8 +143,10 @@ mod tests { sync::atomic::{AtomicBool, Ordering}, time::Duration, }; + #[allow(unused_imports)] + use std::println; use std::{ - io::Write, + io::{Read, Write}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, thread, }; @@ -165,6 +167,8 @@ mod tests { const TEST_APID_0: u16 = 0x02; const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0); + const TEST_APID_1: u16 = 0x10; + const TEST_PACKET_ID_1: PacketId = PacketId::const_tc(true, TEST_APID_1); fn generic_tmtc_server( addr: &SocketAddr, @@ -180,6 +184,7 @@ mod tests { ) .expect("TCP server generation failed") } + #[test] fn test_basic_tc_only() { let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); @@ -231,7 +236,203 @@ mod tests { panic!("connection was not handled properly"); } // Check that TC has arrived. - let tc_queue = tc_receiver.tc_queue.lock().unwrap(); + let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); assert_eq!(tc_queue.len(), 1); + assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len_ping]); + } + + #[test] + fn test_basic_tc_and_tm() { + let mut buffer: [u8; 32] = [0; 32]; + let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let tc_receiver = SyncTcCacher::default(); + let mut tm_source = SyncTmSource::default(); + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true); + let tm_packet_len = verif_tm + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + tm_source.add_tm(&buffer[..tm_packet_len]); + let tm_vec = buffer[..tm_packet_len].to_vec(); + let mut packet_id_lookup = HashSet::new(); + packet_id_lookup.insert(TEST_PACKET_ID_0); + let mut tcp_server = generic_tmtc_server( + &auto_port_addr, + tc_receiver.clone(), + tm_source, + packet_id_lookup, + ); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!(conn_result.num_received_tcs, 1); + assert_eq!(conn_result.num_sent_tms, 1); + set_if_done.store(true, Ordering::Relaxed); + }); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting reas timeout failed"); + let packet_len = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + stream + .write_all(&buffer[..packet_len]) + .expect("writing to TCP server failed"); + + // Done with writing. + stream + .shutdown(std::net::Shutdown::Write) + .expect("shutting down write failed"); + let mut read_buf: [u8; 16] = [0; 16]; + let mut read_len_total = 0; + // Timeout ensures this does not block forever. + while read_len_total < tm_packet_len { + let read_len = stream.read(&mut read_buf).expect("read failed"); + read_len_total += read_len; + assert_eq!(read_buf[..read_len], tm_vec); + } + drop(stream); + + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that TC has arrived. + let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); + assert_eq!(tc_queue.len(), 1); + assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len]); + } + + #[test] + fn test_multi_tc_multi_tm() { + let mut buffer: [u8; 32] = [0; 32]; + let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let tc_receiver = SyncTcCacher::default(); + let mut tm_source = SyncTmSource::default(); + + // Add telemetry + let mut total_tm_len = 0; + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true); + let tm_packet_len = verif_tm + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + total_tm_len += tm_packet_len; + let tm_0 = buffer[..tm_packet_len].to_vec(); + tm_source.add_tm(&tm_0); + let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap(); + let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 3, None, true); + let tm_packet_len = verif_tm + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + total_tm_len += tm_packet_len; + let tm_1 = buffer[..tm_packet_len].to_vec(); + tm_source.add_tm(&tm_1); + + // Set up server + let mut packet_id_lookup = HashSet::new(); + packet_id_lookup.insert(TEST_PACKET_ID_0); + packet_id_lookup.insert(TEST_PACKET_ID_1); + let mut tcp_server = generic_tmtc_server( + &auto_port_addr, + tc_receiver.clone(), + tm_source, + packet_id_lookup, + ); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!( + conn_result.num_received_tcs, 2, + "wrong number of received TCs" + ); + assert_eq!(conn_result.num_sent_tms, 2, "wrong number of sent TMs"); + set_if_done.store(true, Ordering::Relaxed); + }); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting reas timeout failed"); + + // Send telecommands + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + let packet_len = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let tc_0 = buffer[..packet_len].to_vec(); + stream + .write_all(&tc_0) + .expect("writing to TCP server failed"); + let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap(); + let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true); + let packet_len = action_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let tc_1 = buffer[..packet_len].to_vec(); + stream + .write_all(&tc_1) + .expect("writing to TCP server failed"); + + // Done with writing. + stream + .shutdown(std::net::Shutdown::Write) + .expect("shutting down write failed"); + let mut read_buf: [u8; 32] = [0; 32]; + let mut current_idx = 0; + let mut read_len_total = 0; + // Timeout ensures this does not block forever. + while read_len_total < total_tm_len { + let read_len = stream + .read(&mut read_buf[current_idx..]) + .expect("read failed"); + current_idx += read_len; + read_len_total += read_len; + } + drop(stream); + assert_eq!(read_buf[..tm_0.len()], tm_0); + assert_eq!(read_buf[tm_0.len()..tm_0.len() + tm_1.len()], tm_1); + + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that TC has arrived. + let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); + assert_eq!(tc_queue.len(), 2); + assert_eq!(tc_queue.pop_front().unwrap(), tc_0); + assert_eq!(tc_queue.pop_front().unwrap(), tc_1); } }