diff --git a/Cargo.toml b/Cargo.toml index a38733b..68a2b8e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ serde = ["dep:serde", "spacepackets/serde"] [dev-dependencies] tempfile = "3" rand = "0.8" +log = "0.4" [package.metadata.docs.rs] all-features = true diff --git a/examples/python-interop/main.rs b/examples/python-interop/main.rs index 10c88a6..dbaf0e5 100644 --- a/examples/python-interop/main.rs +++ b/examples/python-interop/main.rs @@ -1,6 +1,8 @@ use std::{ + fmt::Debug, fs::OpenOptions, - io::Write, + io::{self, ErrorKind, Write}, + net::{SocketAddr, ToSocketAddrs, UdpSocket}, sync::{atomic::AtomicBool, mpsc, Arc}, thread, time::Duration, @@ -8,6 +10,7 @@ use std::{ use cfdp::{ dest::DestinationHandler, + determine_packet_target, filestore::NativeFilestore, request::{PutRequestOwned, StaticPutRequestCacher}, source::SourceHandler, @@ -15,8 +18,9 @@ use cfdp::{ EntityType, IndicationConfig, LocalEntityConfig, PduWithInfo, RemoteEntityConfig, StdCheckTimerCreator, TransactionId, UserFaultHookProvider, }; +use log::{info, warn}; use spacepackets::{ - cfdp::{ChecksumType, ConditionCode, TransmissionMode}, + cfdp::{pdu::PduError, ChecksumType, ConditionCode, TransmissionMode}, seq_count::SeqCountProviderSyncU16, util::UnsignedByteFieldU16, }; @@ -155,6 +159,90 @@ impl CfdpUser for ExampleCfdpUser { } } +pub struct UdpServer { + pub socket: UdpSocket, + recv_buf: Vec, + sender_addr: Option, + src_tx: mpsc::SyncSender>, + dest_tx: mpsc::SyncSender>, + tm_rx: mpsc::Receiver>, +} + +#[derive(Debug, thiserror::Error)] +pub enum UdpServerError { + #[error("nothing was received")] + NothingReceived, + #[error(transparent)] + Io(#[from] io::Error), + #[error("pdu error: {0}")] + Pdu(#[from] PduError), + #[error("send error")] + Send, +} + +impl UdpServer { + pub fn new( + addr: A, + max_recv_size: usize, + src_tx: mpsc::SyncSender>, + dest_tx: mpsc::SyncSender>, + tm_rx: mpsc::Receiver>, + ) -> Result { + let server = Self { + socket: UdpSocket::bind(addr)?, + recv_buf: vec![0; max_recv_size], + tm_rx, + src_tx, + dest_tx, + sender_addr: None, + }; + server.socket.set_nonblocking(true)?; + Ok(server) + } + + pub fn try_recv_tc(&mut self) -> Result<(usize, SocketAddr), UdpServerError> { + let res = match self.socket.recv_from(&mut self.recv_buf) { + Ok(res) => res, + Err(e) => { + return if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut { + Err(UdpServerError::NothingReceived) + } else { + Err(e.into()) + } + } + }; + let (num_bytes, from) = res; + self.sender_addr = Some(from); + let packet_target = determine_packet_target(&self.recv_buf)?; + match packet_target { + cfdp::PacketTarget::SourceEntity => { + self.src_tx + .send(self.recv_buf[0..num_bytes].to_vec()) + .map_err(|_| UdpServerError::Send)?; + } + cfdp::PacketTarget::DestEntity => { + self.dest_tx + .send(self.recv_buf[0..num_bytes].to_vec()) + .map_err(|_| UdpServerError::Send)?; + } + } + Ok(res) + } + + pub fn send_tm(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr) { + while let Ok(tm) = self.tm_rx.try_recv() { + let result = socket.send_to(&tm, recv_addr); + if let Err(e) = result { + warn!("Sending TM with UDP socket failed: {e}") + } + } + } + + pub fn last_sender(&self) -> Option { + self.sender_addr + } +} + fn main() { // Simplified event handling using atomic signals. let stop_signal_source = Arc::new(AtomicBool::new(false)); diff --git a/src/dest.rs b/src/dest.rs index 7657442..3d3415e 100644 --- a/src/dest.rs +++ b/src/dest.rs @@ -860,7 +860,11 @@ mod tests { use spacepackets::{ cfdp::{ lv::Lv, - pdu::{finished::FinishedPduReader, metadata::MetadataPduCreator, WritablePduPacket}, + pdu::{ + finished::{self, FinishedPduReader}, + metadata::MetadataPduCreator, + WritablePduPacket, + }, ChecksumType, TransmissionMode, }, util::{UbfU16, UnsignedByteFieldU16}, @@ -1565,6 +1569,13 @@ mod tests { Some(FileDirectiveType::FinishedPdu) ); let finished_pdu = FinishedPduReader::from_bytes(&sent_pdu.raw_pdu).unwrap(); + assert_eq!(finished_pdu.file_status(), FileStatus::Retained); + assert_eq!( + finished_pdu.condition_code(), + ConditionCode::CheckLimitReached + ); + assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Incomplete); assert!(tb.handler.pdu_sender.queue_empty()); + tb.expected_full_data = faulty_file_data.to_vec(); } } diff --git a/src/lib.rs b/src/lib.rs index 84f9428..3a56dce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -724,19 +724,20 @@ impl PduProvider for DummyPduProvider { pub struct PacketInfo<'raw_packet> { pdu_type: PduType, file_directive_type: Option, - //target: PacketTarget, raw_packet: &'raw_packet [u8], } -pub fn determine_packet_target( - file_directive_type: Option, - raw_pdu: &[u8], -) -> Result { - if file_directive_type.is_none() { +pub fn determine_packet_target(raw_pdu: &[u8]) -> Result { + let (header, header_len) = PduHeader::from_bytes(raw_pdu)?; + if header.pdu_type() == PduType::FileData { return Ok(PacketTarget::DestEntity); } - let (_, header_len) = PduHeader::from_bytes(raw_pdu)?; - let file_directive_type = file_directive_type.unwrap(); + let file_directive_type = FileDirectiveType::try_from(raw_pdu[header_len]).map_err(|_| { + PduError::InvalidDirectiveType { + found: raw_pdu[header_len], + expected: None, + } + })?; let packet_target = match file_directive_type { // Section c) of 4.5.3: These PDUs should always be targeted towards the file sender a.k.a. @@ -823,7 +824,7 @@ impl PduProvider for PacketInfo<'_> { } fn packet_target(&self) -> Result { - determine_packet_target(self.file_directive_type, self.raw_packet) + determine_packet_target(self.raw_packet) } } @@ -870,7 +871,7 @@ pub mod alloc_mod { } fn packet_target(&self) -> Result { - determine_packet_target(self.file_directive_type, &self.pdu) + determine_packet_target(&self.pdu) } } }