diff --git a/examples/python-interop/main.rs b/examples/python-interop/main.rs index 55602ba..86932eb 100644 --- a/examples/python-interop/main.rs +++ b/examples/python-interop/main.rs @@ -19,7 +19,10 @@ use cfdp::{ }; use log::{debug, info, warn}; use spacepackets::{ - cfdp::{pdu::PduError, ChecksumType, ConditionCode, TransmissionMode}, + cfdp::{ + pdu::{file_data::FileDataPdu, metadata::MetadataPduReader, PduError}, + ChecksumType, ConditionCode, TransmissionMode, + }, seq_count::SeqCountProviderSyncU16, util::{UnsignedByteFieldU16, UnsignedEnum}, }; @@ -166,7 +169,7 @@ impl CfdpUser for ExampleCfdpUser { pub struct UdpServer { pub socket: UdpSocket, recv_buf: Vec, - sender_addr: Option, + remote_addr: SocketAddr, source_tc_tx: mpsc::Sender, dest_tc_tx: mpsc::Sender, source_tm_rx: mpsc::Receiver, @@ -186,6 +189,7 @@ pub enum UdpServerError { impl UdpServer { pub fn new( addr: A, + remote_addr: SocketAddr, max_recv_size: usize, source_tc_tx: mpsc::Sender, dest_tc_tx: mpsc::Sender, @@ -197,7 +201,7 @@ impl UdpServer { recv_buf: vec![0; max_recv_size], source_tc_tx, dest_tc_tx, - sender_addr: None, + remote_addr, source_tm_rx, dest_tm_rx, }; @@ -219,7 +223,7 @@ impl UdpServer { } }; let (_, from) = res; - self.sender_addr = Some(from); + self.remote_addr = from; let pdu_owned = PduOwnedWithInfo::new_from_raw_packet(&self.recv_buf)?; match pdu_owned.packet_target()? { cfdp::PacketTarget::SourceEntity => { @@ -237,12 +241,11 @@ impl UdpServer { } pub fn recv_and_send_telemetry(&mut self) { - if self.last_sender().is_none() { - return; - } let tm_handler = |receiver: &mpsc::Receiver| { while let Ok(tm) = receiver.try_recv() { - let result = self.socket.send_to(tm.pdu(), self.last_sender().unwrap()); + debug!("Sending PDU: {:?}", tm); + pdu_printout(&tm); + let result = self.socket.send_to(tm.pdu(), self.remote_addr()); if let Err(e) = result { warn!("Sending TM with UDP socket failed: {e}") } @@ -252,8 +255,30 @@ impl UdpServer { tm_handler(&self.dest_tm_rx); } - pub fn last_sender(&self) -> Option { - self.sender_addr + pub fn remote_addr(&self) -> SocketAddr { + self.remote_addr + } +} + +fn pdu_printout(pdu: &PduOwnedWithInfo) { + match pdu.pdu_type() { + spacepackets::cfdp::PduType::FileDirective => match pdu.file_directive_type().unwrap() { + spacepackets::cfdp::pdu::FileDirectiveType::EofPdu => (), + spacepackets::cfdp::pdu::FileDirectiveType::FinishedPdu => (), + spacepackets::cfdp::pdu::FileDirectiveType::AckPdu => (), + spacepackets::cfdp::pdu::FileDirectiveType::MetadataPdu => { + let meta_pdu = + MetadataPduReader::new(pdu.pdu()).expect("creating metadata pdu failed"); + debug!("Metadata PDU: {:?}", meta_pdu) + } + spacepackets::cfdp::pdu::FileDirectiveType::NakPdu => (), + spacepackets::cfdp::pdu::FileDirectiveType::PromptPdu => (), + spacepackets::cfdp::pdu::FileDirectiveType::KeepAlivePdu => (), + }, + spacepackets::cfdp::PduType::FileData => { + let fd_pdu = FileDataPdu::from_bytes(pdu.pdu()).expect("creating file data pdu failed"); + debug!("File data PDU: {:?}", fd_pdu); + } } } @@ -275,19 +300,20 @@ fn main() { // Simplified event handling using atomic signals. let stop_signal_source = Arc::new(AtomicBool::new(false)); let stop_signal_dest = stop_signal_source.clone(); - let stop_signal_ctrl = stop_signal_source.clone(); + // let stop_signal_ctrl = stop_signal_source.clone(); let completion_signal_source = Arc::new(AtomicBool::new(false)); - let completion_signal_source_main = completion_signal_source.clone(); + // let completion_signal_source_main = completion_signal_source.clone(); let completion_signal_dest = Arc::new(AtomicBool::new(false)); - let completion_signal_dest_main = completion_signal_dest.clone(); + // let completion_signal_dest_main = completion_signal_dest.clone(); let srcfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); let mut file = OpenOptions::new() .write(true) .open(&srcfile) .expect("opening file failed"); + info!("created test source file {:?}", srcfile); file.write_all(FILE_DATA.as_bytes()) .expect("writing file content failed"); let destdir = tempfile::tempdir().expect("creating temp directory failed"); @@ -307,7 +333,7 @@ fn main() { true, false, spacepackets::cfdp::TransmissionMode::Unacknowledged, - ChecksumType::Crc32, + ChecksumType::Crc32C, ); let seq_count_provider = SeqCountProviderSyncU16::default(); let mut source_handler = SourceHandler::new( @@ -348,9 +374,11 @@ fn main() { let (source_tc_tx, source_tc_rx) = mpsc::channel(); let (dest_tc_tx, dest_tc_rx) = mpsc::channel(); - let sock_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), RUST_PORT); + let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), RUST_PORT); + let remote_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), PY_PORT); let mut udp_server = UdpServer::new( - sock_addr, + local_addr, + remote_addr, 2048, source_tc_tx, dest_tc_tx, @@ -359,7 +387,6 @@ fn main() { ) .expect("creating UDP server failed"); - let start = std::time::Instant::now(); let jh_source = thread::Builder::new() .name("cfdp src entity".to_string()) .spawn(move || { @@ -386,7 +413,7 @@ fn main() { } } Err(e) => { - println!("Source handler error: {}", e); + warn!("cfdp src entity error: {}", e); next_delay = Some(Duration::from_millis(50)); } } @@ -452,13 +479,14 @@ fn main() { let jh_udp_server = thread::Builder::new() .name("cfdp udp server".to_string()) .spawn(move || { - info!("Starting UDP server on {}", sock_addr); + info!("Starting UDP server on {}", remote_addr); loop { loop { match udp_server.try_recv_tc() { Ok(result) => match result { Some((pdu, _addr)) => { debug!("Received PDU on UDP server: {:?}", pdu); + pdu_printout(&pdu); } None => break, }, @@ -474,21 +502,23 @@ fn main() { }) .unwrap(); - loop { - if completion_signal_source_main.load(std::sync::atomic::Ordering::Relaxed) - && completion_signal_dest_main.load(std::sync::atomic::Ordering::Relaxed) - { - let file = std::fs::read_to_string(destfile).expect("reading file failed"); - assert_eq!(file, FILE_DATA); - // Stop the threads gracefully. - stop_signal_ctrl.store(true, std::sync::atomic::Ordering::Relaxed); - break; - } - if std::time::Instant::now() - start > Duration::from_secs(2) { - panic!("file transfer not finished in 2 seconds"); - } - std::thread::sleep(Duration::from_millis(50)); + //loop { + /* + if completion_signal_source_main.load(std::sync::atomic::Ordering::Relaxed) + && completion_signal_dest_main.load(std::sync::atomic::Ordering::Relaxed) + { + let file = std::fs::read_to_string(destfile).expect("reading file failed"); + assert_eq!(file, FILE_DATA); + // Stop the threads gracefully. + stop_signal_ctrl.store(true, std::sync::atomic::Ordering::Relaxed); + break; } + if std::time::Instant::now() - start > Duration::from_secs(20) { + panic!("file transfer not finished in 20 seconds"); + } + */ + //std::thread::sleep(Duration::from_millis(50)); + //} jh_source.join().unwrap(); jh_dest.join().unwrap(); diff --git a/src/filestore.rs b/src/filestore.rs index d865b5e..a6886a4 100644 --- a/src/filestore.rs +++ b/src/filestore.rs @@ -1,6 +1,5 @@ use alloc::string::{String, ToString}; use core::fmt::Display; -use crc::{Crc, CRC_32_CKSUM}; use spacepackets::cfdp::ChecksumType; use spacepackets::ByteConversionError; #[cfg(feature = "std")] @@ -9,8 +8,6 @@ use std::path::Path; #[cfg(feature = "std")] pub use std_mod::*; -pub const CRC_32: Crc = Crc::::new(&CRC_32_CKSUM); - #[derive(Debug, Clone)] pub enum FilestoreError { FileDoesNotExist, @@ -171,6 +168,10 @@ pub trait VirtualFilestore { #[cfg(feature = "std")] pub mod std_mod { + use crc::Crc; + + use crate::{CRC_32, CRC_32C}; + use super::*; use std::{ fs::{self, File, OpenOptions}, @@ -306,21 +307,23 @@ pub mod std_mod { checksum_type: ChecksumType, verification_buf: &mut [u8], ) -> Result { + let mut calc_with_crc_lib = |crc: Crc| -> Result { + let mut digest = crc.digest(); + let file_to_check = File::open(file_path)?; + let mut buf_reader = BufReader::new(file_to_check); + loop { + let bytes_read = buf_reader.read(verification_buf)?; + if bytes_read == 0 { + break; + } + digest.update(&verification_buf[0..bytes_read]); + } + Ok(digest.finalize()) + }; match checksum_type { ChecksumType::Modular => self.calc_modular_checksum(file_path), - ChecksumType::Crc32 => { - let mut digest = CRC_32.digest(); - let file_to_check = File::open(file_path)?; - let mut buf_reader = BufReader::new(file_to_check); - loop { - let bytes_read = buf_reader.read(verification_buf)?; - if bytes_read == 0 { - break; - } - digest.update(&verification_buf[0..bytes_read]); - } - Ok(digest.finalize()) - } + ChecksumType::Crc32 => calc_with_crc_lib(CRC_32), + ChecksumType::Crc32C => calc_with_crc_lib(CRC_32C), ChecksumType::NullChecksum => Ok(0), _ => Err(FilestoreError::ChecksumTypeNotImplemented(checksum_type)), } diff --git a/src/lib.rs b/src/lib.rs index 7d0d6d4..9db8859 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,7 @@ pub mod user; use crate::time::CountdownProvider; use core::{cell::RefCell, fmt::Debug, hash::Hash}; -use crc::{Crc, CRC_32_CKSUM}; +use crc::{Crc, CRC_32_ISCSI, CRC_32_ISO_HDLC}; #[cfg(feature = "std")] use hashbrown::HashMap; @@ -682,7 +682,12 @@ pub enum State { Suspended = 2, } -pub const CRC_32: Crc = Crc::::new(&CRC_32_CKSUM); +/// SANA registry entry: https://sanaregistry.org/r/checksum_identifiers/records/4 +/// Entry in CRC catalogue: https://reveng.sourceforge.io/crc-catalogue/all.htm#crc.cat.crc-32 +pub const CRC_32: Crc = Crc::::new(&CRC_32_ISO_HDLC); +/// SANA registry entry: https://sanaregistry.org/r/checksum_identifiers/records/3 +/// Entry in CRC catalogue: https://reveng.sourceforge.io/crc-catalogue/all.htm#crc.cat.crc-32-iscsi +pub const CRC_32C: Crc = Crc::::new(&CRC_32_ISCSI); #[derive(Debug, PartialEq, Eq, Copy, Clone)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] diff --git a/src/source.rs b/src/source.rs index 987a45c..921fcc4 100644 --- a/src/source.rs +++ b/src/source.rs @@ -113,7 +113,7 @@ pub enum SourceError { SourceFileNotValidUtf8(Utf8Error), #[error("destination file does not have valid UTF8 format: {0}")] DestFileNotValidUtf8(Utf8Error), - #[error("error related to PDU creation")] + #[error("error related to PDU creation: {0}")] Pdu(#[from] PduError), #[error("cfdp feature not implemented")] NotImplemented,