use std::{ fmt::Debug, fs::OpenOptions, io::{self, ErrorKind, Write}, net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket}, sync::{atomic::AtomicBool, mpsc, Arc}, thread, time::Duration, }; use cfdp::{ dest::DestinationHandler, filestore::NativeFilestore, request::{PutRequestOwned, StaticPutRequestCacher}, source::SourceHandler, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, EntityType, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, PduProvider, RemoteEntityConfig, StdCheckTimerCreator, TransactionId, UserFaultHookProvider, }; use log::{debug, info, warn}; use spacepackets::{ cfdp::{pdu::PduError, ChecksumType, ConditionCode, TransmissionMode}, seq_count::SeqCountProviderSyncU16, util::{UnsignedByteFieldU16, UnsignedEnum}, }; const PYTHON_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1); const RUST_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2); const RUST_PORT: u16 = 5111; const PY_PORT: u16 = 5222; const LOG_LEVEL: log::LevelFilter = log::LevelFilter::Info; const FILE_DATA: &str = "Hello World!"; #[derive(Default)] pub struct ExampleFaultHandler {} impl UserFaultHookProvider for ExampleFaultHandler { fn notice_of_suspension_cb( &mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64, ) { panic!( "unexpected suspension of transaction {:?}, condition code {:?}, progress {}", transaction_id, cond, progress ); } fn notice_of_cancellation_cb( &mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64, ) { panic!( "unexpected cancellation of transaction {:?}, condition code {:?}, progress {}", transaction_id, cond, progress ); } fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) { panic!( "unexpected abandonment of transaction {:?}, condition code {:?}, progress {}", transaction_id, cond, progress ); } fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) { panic!( "ignoring unexpected error in transaction {:?}, condition code {:?}, progress {}", transaction_id, cond, progress ); } } pub struct ExampleCfdpUser { entity_type: EntityType, completion_signal: Arc, } impl ExampleCfdpUser { pub fn new(entity_type: EntityType, completion_signal: Arc) -> Self { Self { entity_type, completion_signal, } } } impl CfdpUser for ExampleCfdpUser { fn transaction_indication(&mut self, id: &crate::TransactionId) { println!( "{:?} entity: Transaction indication for {:?}", self.entity_type, id ); } fn eof_sent_indication(&mut self, id: &crate::TransactionId) { println!( "{:?} entity: EOF sent for transaction {:?}", self.entity_type, id ); } fn transaction_finished_indication(&mut self, finished_params: &TransactionFinishedParams) { println!( "{:?} entity: Transaction finished: {:?}", self.entity_type, finished_params ); self.completion_signal .store(true, std::sync::atomic::Ordering::Relaxed); } fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams) { println!( "{:?} entity: Metadata received: {:?}", self.entity_type, md_recvd_params ); } fn file_segment_recvd_indication(&mut self, segment_recvd_params: &FileSegmentRecvdParams) { println!( "{:?} entity: File segment {:?} received", self.entity_type, segment_recvd_params ); } fn report_indication(&mut self, _id: &crate::TransactionId) {} fn suspended_indication(&mut self, _id: &crate::TransactionId, _condition_code: ConditionCode) { panic!("unexpected suspended indication"); } fn resumed_indication(&mut self, _id: &crate::TransactionId, _progresss: u64) {} fn fault_indication( &mut self, _id: &crate::TransactionId, _condition_code: ConditionCode, _progress: u64, ) { panic!("unexpected fault indication"); } fn abandoned_indication( &mut self, _id: &crate::TransactionId, _condition_code: ConditionCode, _progress: u64, ) { panic!("unexpected abandoned indication"); } fn eof_recvd_indication(&mut self, id: &crate::TransactionId) { println!( "{:?} entity: EOF received for transaction {:?}", self.entity_type, id ); } } pub struct UdpServer { pub socket: UdpSocket, recv_buf: Vec, sender_addr: Option, source_tc_tx: mpsc::Sender, dest_tc_tx: mpsc::Sender, source_tm_rx: mpsc::Receiver, dest_tm_rx: mpsc::Receiver, } #[derive(Debug, thiserror::Error)] pub enum UdpServerError { #[error(transparent)] Io(#[from] io::Error), #[error("pdu error: {0}")] Pdu(#[from] PduError), #[error("send error")] Send, } impl UdpServer { pub fn new( addr: A, max_recv_size: usize, source_tc_tx: mpsc::Sender, dest_tc_tx: mpsc::Sender, source_tm_rx: mpsc::Receiver, dest_tm_rx: mpsc::Receiver, ) -> Result { let server = Self { socket: UdpSocket::bind(addr)?, recv_buf: vec![0; max_recv_size], source_tc_tx, dest_tc_tx, sender_addr: None, source_tm_rx, dest_tm_rx, }; server.socket.set_nonblocking(true)?; Ok(server) } pub fn try_recv_tc( &mut self, ) -> Result, UdpServerError> { let res = match self.socket.recv_from(&mut self.recv_buf) { Ok(res) => res, Err(e) => { return if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut { Ok(None) } else { Err(e.into()) } } }; let (_, from) = res; self.sender_addr = Some(from); let pdu_owned = PduOwnedWithInfo::new_from_raw_packet(&self.recv_buf)?; match pdu_owned.packet_target()? { cfdp::PacketTarget::SourceEntity => { self.source_tc_tx .send(pdu_owned.clone()) .map_err(|_| UdpServerError::Send)?; } cfdp::PacketTarget::DestEntity => { self.dest_tc_tx .send(pdu_owned.clone()) .map_err(|_| UdpServerError::Send)?; } } Ok(Some((pdu_owned, from))) } pub fn recv_and_send_telemetry(&mut self) { if self.last_sender().is_none() { return; } let tm_handler = |receiver: &mpsc::Receiver| { while let Ok(tm) = receiver.try_recv() { let result = self.socket.send_to(tm.pdu(), self.last_sender().unwrap()); if let Err(e) = result { warn!("Sending TM with UDP socket failed: {e}") } } }; tm_handler(&self.source_tm_rx); tm_handler(&self.dest_tm_rx); } pub fn last_sender(&self) -> Option { self.sender_addr } } fn main() { fern::Dispatch::new() .format(|out, message, record| { out.finish(format_args!( "{}[{}][{}] {}", chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"), std::thread::current().name().expect("thread is not named"), record.level(), message )) }) .level(LOG_LEVEL) .chain(std::io::stdout()) .apply() .unwrap(); // Simplified event handling using atomic signals. let stop_signal_source = Arc::new(AtomicBool::new(false)); let stop_signal_dest = stop_signal_source.clone(); let stop_signal_ctrl = stop_signal_source.clone(); let completion_signal_source = Arc::new(AtomicBool::new(false)); let completion_signal_source_main = completion_signal_source.clone(); let completion_signal_dest = Arc::new(AtomicBool::new(false)); let completion_signal_dest_main = completion_signal_dest.clone(); let srcfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); let mut file = OpenOptions::new() .write(true) .open(&srcfile) .expect("opening file failed"); file.write_all(FILE_DATA.as_bytes()) .expect("writing file content failed"); let destdir = tempfile::tempdir().expect("creating temp directory failed"); let destfile = destdir.path().join("test.txt"); let local_cfg_source = LocalEntityConfig::new( RUST_ID.into(), IndicationConfig::default(), ExampleFaultHandler::default(), ); let (source_tm_tx, source_tm_rx) = mpsc::channel::(); let (dest_tm_tx, dest_tm_rx) = mpsc::channel::(); let put_request_cacher = StaticPutRequestCacher::new(2048); let remote_cfg_python = RemoteEntityConfig::new_with_default_values( PYTHON_ID.into(), 1024, true, false, spacepackets::cfdp::TransmissionMode::Unacknowledged, ChecksumType::Crc32, ); let seq_count_provider = SeqCountProviderSyncU16::default(); let mut source_handler = SourceHandler::new( local_cfg_source, source_tm_tx, NativeFilestore::default(), put_request_cacher, 2048, remote_cfg_python, seq_count_provider, ); let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending, completion_signal_source); let local_cfg_dest = LocalEntityConfig::new( RUST_ID.into(), IndicationConfig::default(), ExampleFaultHandler::default(), ); let mut dest_handler = DestinationHandler::new( local_cfg_dest, 1024, dest_tm_tx, NativeFilestore::default(), remote_cfg_python, StdCheckTimerCreator::default(), ); let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest); let put_request = PutRequestOwned::new_regular_request( PYTHON_ID.into(), srcfile.to_str().expect("invaid path string"), destfile.to_str().expect("invaid path string"), Some(TransmissionMode::Unacknowledged), Some(true), ) .expect("put request creation failed"); let (source_tc_tx, source_tc_rx) = mpsc::channel(); let (dest_tc_tx, dest_tc_rx) = mpsc::channel(); let sock_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), RUST_PORT); let mut udp_server = UdpServer::new( sock_addr, 2048, source_tc_tx, dest_tc_tx, source_tm_rx, dest_tm_rx, ) .expect("creating UDP server failed"); let start = std::time::Instant::now(); let jh_source = thread::Builder::new() .name("cfdp src entity".to_string()) .spawn(move || { info!("Starting RUST SRC"); source_handler .put_request(&put_request) .expect("put request failed"); loop { let mut next_delay = None; let mut undelayed_call_count = 0; let packet_info = match source_tc_rx.try_recv() { Ok(pdu_with_info) => Some(pdu_with_info), Err(e) => match e { mpsc::TryRecvError::Empty => None, mpsc::TryRecvError::Disconnected => { panic!("unexpected disconnect from destination channel sender"); } }, }; match source_handler.state_machine(&mut cfdp_user_source, packet_info.as_ref()) { Ok(sent_packets) => { if sent_packets == 0 { next_delay = Some(Duration::from_millis(50)); } } Err(e) => { println!("Source handler error: {}", e); next_delay = Some(Duration::from_millis(50)); } } if let Some(delay) = next_delay { thread::sleep(delay); } else { undelayed_call_count += 1; } if stop_signal_source.load(std::sync::atomic::Ordering::Relaxed) { break; } // Safety feature against configuration errors. if undelayed_call_count >= 200 { panic!("Source handler state machine possible in permanent loop"); } } }) .unwrap(); let jh_dest = thread::Builder::new() .name("cfdp dest entity".to_string()) .spawn(move || { info!("Starting RUST DEST. Local ID {}", RUST_ID.value()); loop { let mut next_delay = None; let mut undelayed_call_count = 0; let packet_info = match dest_tc_rx.try_recv() { Ok(pdu_with_info) => Some(pdu_with_info), Err(e) => match e { mpsc::TryRecvError::Empty => None, mpsc::TryRecvError::Disconnected => { panic!("unexpected disconnect from destination channel sender"); } }, }; match dest_handler.state_machine(&mut cfdp_user_dest, packet_info.as_ref()) { Ok(sent_packets) => { if sent_packets == 0 { next_delay = Some(Duration::from_millis(50)); } } Err(e) => { println!("Source handler error: {}", e); next_delay = Some(Duration::from_millis(50)); } } if let Some(delay) = next_delay { thread::sleep(delay); } else { undelayed_call_count += 1; } if stop_signal_dest.load(std::sync::atomic::Ordering::Relaxed) { break; } // Safety feature against configuration errors. if undelayed_call_count >= 200 { panic!("Destination handler state machine possible in permanent loop"); } } }) .unwrap(); let jh_udp_server = thread::Builder::new() .name("cfdp udp server".to_string()) .spawn(move || { info!("Starting UDP server on {}", sock_addr); loop { loop { match udp_server.try_recv_tc() { Ok(result) => match result { Some((pdu, _addr)) => { debug!("Received PDU on UDP server: {:?}", pdu); } None => break, }, Err(e) => { warn!("UDP server error: {}", e); break; } } } udp_server.recv_and_send_telemetry(); thread::sleep(Duration::from_millis(50)); } }) .unwrap(); loop { if completion_signal_source_main.load(std::sync::atomic::Ordering::Relaxed) && completion_signal_dest_main.load(std::sync::atomic::Ordering::Relaxed) { let file = std::fs::read_to_string(destfile).expect("reading file failed"); assert_eq!(file, FILE_DATA); // Stop the threads gracefully. stop_signal_ctrl.store(true, std::sync::atomic::Ordering::Relaxed); break; } if std::time::Instant::now() - start > Duration::from_secs(2) { panic!("file transfer not finished in 2 seconds"); } std::thread::sleep(Duration::from_millis(50)); } jh_source.join().unwrap(); jh_dest.join().unwrap(); jh_udp_server.join().unwrap(); }