//! # CFDP Destination Entity Module //! //! The [DestinationHandler] is the primary component of this module which converts the PDUs sent //! from a remote source entity back to a file. A file copy operation on the receiver side //! is started with the reception of a Metadata PDU, for example one generated by the //! [spacepackets::cfdp::pdu::metadata::MetadataPduCreator]. After that, file packet PDUs, for //! example generated with the [spacepackets::cfdp::pdu::file_data] module, can be inserted into //! the destination handler and will be assembled into a file. //! //! A destination entity might still generate packets which need to be sent back to the source //! entity of the file transfer. However, this handler allows freedom of communication like the //! source entity by using a user-provided [PduSender] instance to send all generated PDUs. //! //! The transaction will be finished when following conditions are met: //! //! 1. A valid EOF PDU, for example one generated by the [spacepackets::cfdp::pdu::eof::EofPdu] //! helper, has been inserted into the class. //! 2. The file checksum verification has been successful. If this is not the case for the //! unacknowledged mode, the handler will re-attempt the checksum calculation up to a certain //! threshold called the check limit. If the threshold is reached, the transaction will //! finish with a failure. //! //! ### Unacknowledged mode with closure //! //! 3. Finished PDU has been sent back to the remote side. //! //! ### Acknowledged mode //! //! 3. An EOF ACK PDU has been sent back to the remote side. //! 4. A Finished PDU has been sent back to the remote side. //! 5. A Finished PDU ACK was received. use crate::{ DummyPduProvider, FaultInfo, GenericSendError, IndicationConfig, PduProvider, PositiveAckParams, lost_segments::{LostSegmentError, LostSegmentStore}, user::TransactionFinishedParams, }; use core::{ cell::{Cell, RefCell}, str::{Utf8Error, from_utf8, from_utf8_unchecked}, }; use super::{ Countdown, EntityType, LocalEntityConfig, PacketTarget, PduSender, RemoteConfigStore, RemoteEntityConfig, State, TimerContext, TimerCreator, TransactionId, UserFaultHook, filestore::{FilestoreError, VirtualFilestore}, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams}, }; use smallvec::SmallVec; use spacepackets::{ cfdp::{ ChecksumType, ConditionCode, FaultHandlerCode, LargeFileFlag, PduType, TransactionStatus, TransmissionMode, pdu::{ CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, ack::AckPdu, eof::EofPdu, file_data::FileDataPdu, finished::{DeliveryCode, FileStatus, FinishedPduCreator}, metadata::{MetadataGenericParams, MetadataPduReader}, nak::{NakPduCreator, NakPduCreatorWithReservedSeqReqsBuf}, }, tlv::{EntityIdTlv, GenericTlv, ReadableTlv, TlvType, msg_to_user::MsgToUserTlv}, }, util::{UnsignedByteField, UnsignedEnum}, }; #[derive(Debug)] struct FileNames { src_file_name: [u8; u8::MAX as usize], src_file_name_len: usize, dest_file_name: [u8; u8::MAX as usize], dest_file_name_len: usize, dest_path_buf: [u8; u8::MAX as usize * 2], dest_file_path_len: usize, } #[derive(Debug, Default, Clone, Copy)] pub struct AnomalyTracker { invalid_ack_directive_code: u8, lost_segment_errors: u8, } impl AnomalyTracker { #[inline] pub fn invalid_ack_directive_code(&mut self) -> u8 { self.invalid_ack_directive_code } #[inline] pub fn lost_segment_errors(&mut self) -> u8 { self.lost_segment_errors } #[inline] pub fn increment_lost_segment_errors(&mut self) { self.lost_segment_errors = self.lost_segment_errors.wrapping_add(1); } #[inline] pub fn increment_invalid_ack_directive_code(&mut self) { self.invalid_ack_directive_code = self.invalid_ack_directive_code.wrapping_add(1); } #[inline] pub fn reset(&mut self) { *self = Default::default(); } } #[derive(Debug, PartialEq, Eq, Copy, Clone)] enum CompletionDisposition { Completed = 0, Cancelled = 1, } /// This enumeration models the different transaction steps of the destination entity handler. #[derive(Debug, Copy, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum TransactionStep { Idle, TransactionStart, /// Special state which is only used for acknowledged mode. The CFDP entity is still waiting /// for a missing metadata PDU to be re-sent. Until then, all arriving file data PDUs will only /// update the internal lost segment tracker. When the EOF PDU arrives, the state will be left. /// Please note that deferred lost segment handling might also be active when this state is set. WaitingForMetadata, ReceivingFileDataPdus, /// This is the check timer step as specified in chapter 4.6.3.3 b) of the standard. /// The destination handler will still check for file data PDUs which might lead to a full /// file transfer completion. ReceivingFileDataPdusWithCheckLimitHandling, WaitingForMissingData, //SendingAckPdu, TransferCompletion, WaitingForFinishedAck, } #[derive(Debug)] struct FinishedParams { condition_code: Cell, delivery_code: Cell, fault_location_finished: Cell>, file_status: FileStatus, } impl Default for FinishedParams { fn default() -> Self { Self { condition_code: Cell::new(ConditionCode::NoError), delivery_code: Cell::new(DeliveryCode::Incomplete), fault_location_finished: Cell::new(None), file_status: FileStatus::Unreported, } } } impl FinishedParams { pub fn reset(&mut self) { self.condition_code.set(ConditionCode::NoError); self.delivery_code.set(DeliveryCode::Incomplete); self.file_status = FileStatus::Unreported; } } #[derive(Debug, Copy, Clone)] pub struct AcknowledgedModeParams { last_start_offset: u64, last_end_offset: u64, metadata_missing: bool, nak_activity_counter: u32, deferred_procedure_active: bool, } // This contains parameters for destination transaction. #[derive(Debug)] struct TransactionParams { pdu_conf: CommonPduConfig, file_names: FileNames, msgs_to_user_size: usize, // TODO: Should we make this configurable? msgs_to_user_buf: [u8; 1024], remote_cfg: Option, transaction_id: Option, metadata_params: MetadataGenericParams, file_size: u64, progress: u64, acked_params: Option, deferred_procedure_timer: Option, finished_params: FinishedParams, positive_ack_params: Option, ack_timer: Option, metadata_only: bool, completion_disposition: Cell, checksum: u32, anomaly_tracker: AnomalyTracker, current_check_count: u32, current_check_timer: Option, } impl TransactionParams { fn transmission_mode(&self) -> TransmissionMode { self.pdu_conf.trans_mode } } impl Default for FileNames { fn default() -> Self { Self { src_file_name: [0; u8::MAX as usize], src_file_name_len: Default::default(), dest_file_name: [0; u8::MAX as usize], dest_file_name_len: Default::default(), dest_path_buf: [0; u8::MAX as usize * 2], dest_file_path_len: Default::default(), } } } impl TransactionParams { fn file_size(&self) -> u64 { self.metadata_params.file_size } fn metadata_params(&self) -> &MetadataGenericParams { &self.metadata_params } } impl Default for TransactionParams { fn default() -> Self { Self { pdu_conf: Default::default(), msgs_to_user_size: 0, file_size: 0, msgs_to_user_buf: [0; 1024], file_names: Default::default(), remote_cfg: None, transaction_id: None, metadata_params: Default::default(), progress: Default::default(), deferred_procedure_timer: None, acked_params: None, metadata_only: false, positive_ack_params: None, ack_timer: None, finished_params: FinishedParams::default(), completion_disposition: Cell::new(CompletionDisposition::Completed), checksum: 0, current_check_count: 0, anomaly_tracker: AnomalyTracker::default(), current_check_timer: None, } } } impl TransactionParams { fn reset(&mut self) { self.finished_params.reset(); self.anomaly_tracker.reset(); } } #[derive(Debug, thiserror::Error)] #[non_exhaustive] pub enum DestError { /// File directive expected, but none specified #[error("expected file directive")] DirectiveFieldEmpty, #[error("can not process packet type {pdu_type:?} with directive type {directive_type:?}")] CantProcessPacketType { pdu_type: PduType, directive_type: Option, }, #[error("first packet must be metadata PDU for unacknowledged transfers")] FirstPacketNotMetadata, #[error( "can not process PDU with type {pdu_type:?} and directive field {file_directive_type:?} in \ current transaction step {step:?}" )] WrongStepForPdu { step: TransactionStep, pdu_type: PduType, file_directive_type: Option, }, // Received new metadata PDU while being already being busy with a file transfer. #[error("busy with transfer")] RecvdMetadataButIsBusy, #[error("empty source file field")] EmptySrcFileField, #[error("empty dest file field")] EmptyDestFileField, #[error("pdu error {0}")] Pdu(#[from] PduError), #[error("io error {0}")] #[cfg(feature = "std")] Io(#[from] std::io::Error), #[error("file store error: {0}")] Filestore(#[from] FilestoreError), #[error("unexpected large file size which is not supported for current transfer")] UnexpectedLargeFileSize, #[error("lost segment error: {0}")] LostSegmentError(#[from] LostSegmentError), #[error("path conversion error {0}")] PathConversion(#[from] Utf8Error), #[error("error building dest path from source file name and dest folder")] PathConcat, #[error("no remote entity configuration found for {0:?}")] NoRemoteConfigFound(UnsignedByteField), #[error("issue sending PDU: {0}")] SendError(#[from] GenericSendError), #[error("invalid remote entity configuration: {0:?}")] InvalidRemoteConfig(RemoteEntityConfig), #[error("cfdp feature not implemented")] NotImplemented, } /// This is the primary CFDP destination handler. It models the CFDP destination entity, which is /// primarily responsible for receiving files sent from another CFDP entity. It performs the /// reception side of File Copy Operations. /// /// The [DestinationHandler::state_machine] function is the primary function to drive the /// destination handler. It can be used to insert packets into the destination /// handler and driving the state machine, which might generate new packets to be sent to the /// remote entity. Please note that the destination handler can also only process Metadata, EOF and /// Prompt PDUs in addition to ACK PDUs where the acknowledged PDU is the Finished PDU. /// All generated packets are sent using the user provided [PduSender]. /// /// /// The handler has an internal buffer for PDU generation and checksum generation. The size of this /// buffer is select via Cargo features and defaults to 2048 bytes. It does not allocate /// memory during run-time and thus is suitable for embedded systems where allocation is /// not possible. Furthermore, it uses the [VirtualFilestore] abstraction to allow /// usage on systems without a [std] filesystem. /// /// This handler is able to deal with file copy operations to directories, similarly to how the /// UNIX tool `cp` works. If the destination path is a directory instead of a regular full path, /// the source path base file name will be appended to the destination path to form the resulting /// new full path. /// // This handler also does not support concurrency out of the box but is flexible enough to be used /// in different concurrent contexts. For example, you can dynamically create new handlers and /// run them inside a thread pool, or move the newly created handler to a new thread.""" pub struct DestinationHandler< PduSenderInstance: PduSender, UserFaultHookInstance: UserFaultHook, VirtualFileStoreInstance: VirtualFilestore, RemoteConfigStoreInstance: RemoteConfigStore, TimerCreatorInstance: TimerCreator, CountdownInstance: Countdown, LostSegmentTracker: LostSegmentStore, > { local_cfg: LocalEntityConfig, step: core::cell::Cell, state: State, transaction_params: TransactionParams, pdu_and_cksum_buffer: RefCell<[u8; crate::buf_len::PACKET_BUF_LEN]>, pub pdu_sender: PduSenderInstance, pub vfs: VirtualFileStoreInstance, pub remote_cfg_table: RemoteConfigStoreInstance, pub check_timer_creator: TimerCreatorInstance, lost_segment_tracker: LostSegmentTracker, } #[cfg(feature = "std")] pub type StdDestinationHandler = DestinationHandler< PduSender, UserFaultHook, crate::filestore::NativeFilestore, crate::RemoteConfigStoreStd, crate::StdTimerCreator, crate::StdCountdown, crate::lost_segments::LostSegmentsList, >; #[cfg(feature = "std")] impl StdDestinationHandler { #[cfg(feature = "std")] pub fn new_std( local_cfg: LocalEntityConfig, pdu_sender: PduSenderInstance, ) -> Self { Self::new( local_cfg, pdu_sender, crate::filestore::NativeFilestore::default(), crate::RemoteConfigStoreStd::default(), crate::StdTimerCreator::default(), crate::lost_segments::LostSegmentsList::default(), ) } } impl< PduSenderInstance: PduSender, UserFaultHookInstance: UserFaultHook, VirtualFilestoreInstance: VirtualFilestore, RemoteConfigStoreInstance: RemoteConfigStore, TimerCreatorInstance: TimerCreator, CountdownInstance: Countdown, LostSegmentTracker: LostSegmentStore, > DestinationHandler< PduSenderInstance, UserFaultHookInstance, VirtualFilestoreInstance, RemoteConfigStoreInstance, TimerCreatorInstance, CountdownInstance, LostSegmentTracker, > { /// Constructs a new destination handler. /// /// # Arguments /// /// * `local_cfg` - The local CFDP entity configuration. /// /// * `pdu_and_cksum_buf_size` - The handler requires a buffer to generate PDUs and perform /// checksum calculations. This parameter can either be a known upper bound for the packet /// size, for example 2048 or 4096 bytes. /// It can also specifically be determined by the largest packet size parameter of all /// remote entity configurations in the passed `remote_cfg_table`. /// * `pdu_sender` - [PduSender] used to send generated PDU packets. /// * `vfs` - [VirtualFilestore] implementation used by the handler, which decouples the CFDP /// implementation from the underlying filestore/filesystem. This allows to use this handler /// for embedded systems where a standard runtime might not be available. /// * `remote_cfg_table` - The [RemoteConfigStore] used to look up remote /// entities and target specific configuration for file copy operations. /// * `check_timer_creator` - [TimerCreator] used by the CFDP handler to generate /// timers required by various tasks. This allows to use this handler for embedded systems /// where the standard time APIs might not be available. pub fn new( local_cfg: LocalEntityConfig, pdu_sender: PduSenderInstance, vfs: VirtualFilestoreInstance, remote_cfg_table: RemoteConfigStoreInstance, timer_creator: TimerCreatorInstance, lost_segment_tracker: LostSegmentTracker, ) -> Self { Self { local_cfg, step: Cell::new(TransactionStep::Idle), state: State::Idle, transaction_params: Default::default(), pdu_and_cksum_buffer: core::cell::RefCell::new([0; crate::buf_len::PACKET_BUF_LEN]), pdu_sender, vfs, remote_cfg_table, check_timer_creator: timer_creator, lost_segment_tracker, } } pub fn state_machine_no_packet( &mut self, cfdp_user: &mut impl CfdpUser, ) -> Result { self.state_machine(cfdp_user, None::<&DummyPduProvider>) } /// This is the core function to drive the destination handler. It is also used to insert /// packets into the destination handler. /// /// The state machine should either be called if a packet with the appropriate destination ID /// is received and periodically to perform all CFDP related tasks, for example /// checking for timeouts or missed file segments. /// /// The function returns the number of sent PDU packets on success. pub fn state_machine( &mut self, cfdp_user: &mut impl CfdpUser, packet_to_insert: Option<&impl PduProvider>, ) -> Result { let mut sent_packets = 0; if let Some(packet) = packet_to_insert { sent_packets += self.insert_packet(cfdp_user, packet)?; } match self.state { State::Idle => { // TODO: In acknowledged mode, add timer handling. } State::Busy => { sent_packets += self.fsm_busy(cfdp_user)?; } State::Suspended => { // There is now way to suspend the handler currently anyway. } } Ok(sent_packets) } /// This function models the Cancel.request CFDP primitive and is the recommended way /// to cancel a transaction. It will cause a Notice Of Cancellation at this entity. /// Please note that the state machine might still be active because a cancelled transfer /// might still require some packets to be sent to the remote sender entity. /// /// If no unexpected errors occur, this function returns whether the current transfer /// was cancelled. It returns [false] if the state machine is currently idle or if there /// is a transaction ID missmatch. pub fn cancel_request(&mut self, transaction_id: &TransactionId) -> bool { if self.state() == super::State::Idle { return false; } if let Some(active_id) = self.transaction_id() { if active_id == *transaction_id { self.trigger_notice_of_completion_cancelled( ConditionCode::CancelRequestReceived, EntityIdTlv::new(self.local_cfg.id), ); self.set_step(TransactionStep::TransferCompletion); return true; } } false } /// Returns [None] if the state machine is IDLE, and the transmission mode of the current /// request otherwise. #[inline] pub fn transmission_mode(&self) -> Option { if self.state == State::Idle { return None; } Some(self.transaction_params.transmission_mode()) } #[inline] pub fn transaction_id(&self) -> Option { self.transaction_params.transaction_id } /// Get the step, which denotes the exact step of a pending CFDP transaction when applicable. #[inline] pub fn step(&self) -> TransactionStep { self.step.get() } /// Get the step, which denotes whether the CFDP handler is active, and which CFDP class /// is used if it is active. #[inline] pub fn state(&self) -> State { self.state } #[inline] pub fn local_cfg(&self) -> &LocalEntityConfig { &self.local_cfg } #[inline] pub fn local_cfg_mut(&mut self) -> &mut LocalEntityConfig { &mut self.local_cfg } /// This function is public to allow completely resetting the handler, but it is explicitely /// discouraged to do this. CFDP has mechanism to detect issues and errors on itself. /// Resetting the handler might interfere with these mechanisms and lead to unexpected /// behaviour. pub fn reset(&mut self) { self.set_step(TransactionStep::Idle); self.state = State::Idle; self.transaction_params.reset(); } fn insert_packet( &mut self, cfdp_user: &mut impl CfdpUser, packet_to_insert: &impl PduProvider, ) -> Result { let mut sent_packets = 0; if packet_to_insert.packet_target()? != PacketTarget::DestEntity { // Unwrap is okay here, a PacketInfo for a file data PDU should always have the // destination as the target. return Err(DestError::CantProcessPacketType { pdu_type: packet_to_insert.pdu_type(), directive_type: packet_to_insert.file_directive_type(), }); } match packet_to_insert.pdu_type() { PduType::FileDirective => { if packet_to_insert.file_directive_type().is_none() { return Err(DestError::DirectiveFieldEmpty); } sent_packets += self.handle_file_directive( cfdp_user, packet_to_insert.file_directive_type().unwrap(), packet_to_insert.raw_pdu(), )?; } PduType::FileData => { let fd_pdu = FileDataPdu::from_bytes(packet_to_insert.raw_pdu())?; sent_packets += self.handle_file_data(cfdp_user, fd_pdu)?; } } Ok(sent_packets) } fn handle_file_directive( &mut self, cfdp_user: &mut impl CfdpUser, pdu_directive: FileDirectiveType, raw_packet: &[u8], ) -> Result { let mut sent_packets = 0; match pdu_directive { FileDirectiveType::EofPdu => { let eof_pdu = EofPdu::from_bytes(raw_packet)?; sent_packets += self.handle_eof_pdu(cfdp_user, eof_pdu)? } FileDirectiveType::FinishedPdu | FileDirectiveType::NakPdu | FileDirectiveType::KeepAlivePdu => { return Err(DestError::CantProcessPacketType { pdu_type: PduType::FileDirective, directive_type: Some(pdu_directive), }); } FileDirectiveType::AckPdu => { let ack_pdu = AckPdu::from_bytes(raw_packet)?; self.handle_ack_pdu(ack_pdu)?; } FileDirectiveType::MetadataPdu => { let metadata_pdu = MetadataPduReader::from_bytes(raw_packet)?; self.handle_metadata_pdu(metadata_pdu)? } FileDirectiveType::PromptPdu => self.handle_prompt_pdu(raw_packet)?, }; Ok(sent_packets) } fn handle_ack_pdu(&mut self, ack_pdu: AckPdu) -> Result<(), DestError> { if ack_pdu.directive_code_of_acked_pdu() != FileDirectiveType::FinishedPdu { self.transaction_params .anomaly_tracker .increment_invalid_ack_directive_code(); } // We are done. self.reset(); Ok(()) } fn first_packet_handling(&mut self, pdu_config: &CommonPduConfig) -> Result<(), DestError> { self.transaction_params.reset(); let remote_cfg = self.remote_cfg_table.get(pdu_config.source_id().value()); if remote_cfg.is_none() { return Err(DestError::NoRemoteConfigFound(pdu_config.source_id())); } self.transaction_params.remote_cfg = Some(*remote_cfg.unwrap()); self.transaction_params.transaction_id = Some(TransactionId::new( pdu_config.source_id(), pdu_config.transaction_seq_num, )); self.transaction_params.pdu_conf = *pdu_config; self.transaction_params.pdu_conf.direction = spacepackets::cfdp::Direction::TowardsSender; self.state = State::Busy; Ok(()) } fn handle_metadata_pdu(&mut self, metadata_pdu: MetadataPduReader) -> Result<(), DestError> { let first_packet = self.step() == TransactionStep::Idle; if first_packet { self.first_packet_handling(metadata_pdu.pdu_header().common_pdu_conf())?; } match self.transmission_mode() { Some(transmission_mode) => { if !first_packet && transmission_mode == TransmissionMode::Unacknowledged { return Err(DestError::RecvdMetadataButIsBusy); } match self.transaction_params.acked_params.as_mut() { Some(acked_params) => { if acked_params.metadata_missing { if let Some(timer) = self.transaction_params.deferred_procedure_timer.as_mut() { acked_params.nak_activity_counter = 0; timer.reset(); } acked_params.metadata_missing = false; } else { // Quietly ignore. return Ok(()); } } None => { self.transaction_params.acked_params = Some(AcknowledgedModeParams { last_start_offset: 0, last_end_offset: 0, metadata_missing: false, nak_activity_counter: 0, deferred_procedure_active: false, }); } } } None => { return Err(DestError::RecvdMetadataButIsBusy); } } self.transaction_params.metadata_params = *metadata_pdu.metadata_params(); let remote_cfg = self.remote_cfg_table.get(metadata_pdu.source_id().value()); if remote_cfg.is_none() { return Err(DestError::NoRemoteConfigFound(metadata_pdu.dest_id())); } let src_name = metadata_pdu.src_file_name(); let dest_name = metadata_pdu.dest_file_name(); if src_name.is_empty() && dest_name.is_empty() { self.transaction_params.metadata_only = true; } if !self.transaction_params.metadata_only && src_name.is_empty() { return Err(DestError::EmptySrcFileField); } if !self.transaction_params.metadata_only && dest_name.is_empty() { return Err(DestError::EmptyDestFileField); } if !self.transaction_params.metadata_only { self.transaction_params.file_names.src_file_name[..src_name.len_value()] .copy_from_slice(src_name.value()); self.transaction_params.file_names.src_file_name_len = src_name.len_value(); if dest_name.is_empty() { return Err(DestError::EmptyDestFileField); } self.transaction_params.file_names.dest_file_name[..dest_name.len_value()] .copy_from_slice(dest_name.value()); self.transaction_params.file_names.dest_file_name_len = dest_name.len_value(); self.transaction_params.msgs_to_user_size = 0; } if !metadata_pdu.options().is_empty() { for option_tlv in metadata_pdu.options_iter().unwrap() { if option_tlv.is_standard_tlv() && option_tlv.tlv_type().unwrap() == TlvType::MsgToUser { self.transaction_params .msgs_to_user_buf .copy_from_slice(option_tlv.raw_data().unwrap()); self.transaction_params.msgs_to_user_size += option_tlv.len_full(); } } } self.set_step(TransactionStep::TransactionStart); Ok(()) } fn reset_nak_activity_parameters_if_active(&mut self) { if let Some(acked_params) = self.transaction_params.acked_params.as_mut() { if acked_params.metadata_missing { if let Some(timer) = self.transaction_params.deferred_procedure_timer.as_mut() { acked_params.nak_activity_counter = 0; timer.reset(); } } } } fn handle_file_data_without_previous_metadata( &mut self, fd_pdu: &FileDataPdu, ) -> Result { let mut packets_sent = 0; if fd_pdu.transmission_mode() == TransmissionMode::Unacknowledged { // In acknowledged mode, we need to wait for the metadata PDU first. return Err(DestError::FirstPacketNotMetadata); } self.first_packet_handling(fd_pdu.pdu_header().common_pdu_conf())?; self.transaction_params.progress = fd_pdu.offset() + fd_pdu.file_data().len() as u64; self.transaction_params.acked_params = Some(AcknowledgedModeParams { last_start_offset: 0, last_end_offset: 0, nak_activity_counter: 0, metadata_missing: true, deferred_procedure_active: false, }); if fd_pdu.file_data().len() as u64 > 0 { self.lost_segment_tracker .add_lost_segment((0, self.transaction_params.progress))?; let ack_params = self.transaction_params.acked_params.as_mut().unwrap(); ack_params.last_start_offset = self.transaction_params.progress; ack_params.last_end_offset = self.transaction_params.progress; } if self .transaction_params .remote_cfg .as_ref() .unwrap() .immediate_nak_mode { let mut num_seg_reqs = 1; if fd_pdu.file_data().len() as u64 > 0 { num_seg_reqs += 1; } let pdu_header = PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0); let mut nak_pdu = NakPduCreatorWithReservedSeqReqsBuf::new( self.pdu_and_cksum_buffer.get_mut(), pdu_header, num_seg_reqs, ) .map_err(PduError::from)?; let buf_mut = nak_pdu.segment_request_buffer_mut(); let mut current_offset = 0; let increment = if pdu_header.common_pdu_conf().file_flag == LargeFileFlag::Large { 8 } else { 4 }; buf_mut[0..current_offset].fill(0); current_offset += increment; buf_mut[current_offset..current_offset + increment].fill(0); current_offset += increment; if fd_pdu.file_data().len() as u64 > 0 { buf_mut[0..current_offset].fill(0); current_offset += increment; if pdu_header.common_pdu_conf().file_flag == LargeFileFlag::Large { buf_mut[current_offset..current_offset + increment] .copy_from_slice(&self.transaction_params.progress.to_be_bytes()); } else { buf_mut[current_offset..current_offset + increment] .copy_from_slice(&(self.transaction_params.progress as u32).to_be_bytes()); } } let written_size = nak_pdu .finish(0, self.transaction_params.progress) .map_err(PduError::from)?; self.pdu_sender.send_file_directive_pdu( FileDirectiveType::NakPdu, &self.pdu_and_cksum_buffer.borrow()[0..written_size], )?; packets_sent += 1; } Ok(packets_sent) } fn handle_file_data( &mut self, user: &mut impl CfdpUser, fd_pdu: FileDataPdu, ) -> Result { let mut sent_packets = 0; let mut handle_indication = |id: TransactionId, indication_config: &IndicationConfig| { if indication_config.file_segment_recv { user.file_segment_recvd_indication(&FileSegmentRecvdParams { id, offset: fd_pdu.offset(), length: fd_pdu.file_data().len(), segment_metadata: fd_pdu.segment_metadata(), }); } }; let step = self.step.get(); if self.state == State::Idle { if step == TransactionStep::Idle { sent_packets += self.handle_file_data_without_previous_metadata(&fd_pdu)?; self.set_step(TransactionStep::WaitingForMetadata); handle_indication( self.transaction_id().unwrap(), &self.local_cfg.indication_cfg, ); return Ok(sent_packets); } if step != TransactionStep::ReceivingFileDataPdus && step != TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling { return Err(DestError::WrongStepForPdu { pdu_type: PduType::FileData, file_directive_type: None, step, }); } } handle_indication( self.transaction_id().unwrap(), &self.local_cfg.indication_cfg, ); if let Err(e) = self.vfs.write_data( // Safety: It was already verified that the path is valid during the transaction start. unsafe { from_utf8_unchecked( //from_utf8( &self.transaction_params.file_names.dest_path_buf [0..self.transaction_params.file_names.dest_file_path_len], ) }, fd_pdu.offset(), fd_pdu.file_data(), ) { if self.declare_fault(ConditionCode::FilestoreRejection) == FaultHandlerCode::AbandonTransaction { self.abandon_transaction(); } return Err(e.into()); } self.transaction_params.progress = core::cmp::max( self.transaction_params.progress, fd_pdu.offset() + fd_pdu.file_data().len() as u64, ); if self.transmission_mode().unwrap() == TransmissionMode::Acknowledged { sent_packets += self.lost_segment_handling(&fd_pdu)?; } Ok(sent_packets) } fn lost_segment_handling(&mut self, fd_pdu: &FileDataPdu) -> Result { // Lost segment detection: 4.6.4.3.1 a) and b) are covered by this code. c) is covered //by dedicated code which is run when the EOF PDU is handled. if self.transaction_params.acked_params.is_none() { return Ok(0); } let mut sent_packets = 0; let acked_params = self.transaction_params.acked_params.as_mut().unwrap(); if fd_pdu.offset() > acked_params.last_end_offset { let lost_segment = (acked_params.last_end_offset, fd_pdu.offset()); self.lost_segment_tracker.add_lost_segment(lost_segment)?; if self .transaction_params .remote_cfg .as_ref() .unwrap() .immediate_nak_mode { let pdu_header = PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0); if self.transaction_params.pdu_conf.file_flag == LargeFileFlag::Normal && lost_segment.0 > u32::MAX as u64 || lost_segment.1 > u32::MAX as u64 { return Err(DestError::UnexpectedLargeFileSize); } let seg32 = [(lost_segment.0 as u32, lost_segment.1 as u32)]; let seg64 = [lost_segment]; let nak_pdu = if self.transaction_params.pdu_conf.file_flag == LargeFileFlag::Normal { NakPduCreator::new_normal_file_size( pdu_header, 0, self.transaction_params.progress as u32, &seg32, ) .unwrap() } else { NakPduCreator::new_large_file_size( pdu_header, 0, self.transaction_params.progress, &seg64, ) .unwrap() }; let written_len = nak_pdu.write_to_bytes(self.pdu_and_cksum_buffer.get_mut())?; self.pdu_sender.send_file_directive_pdu( FileDirectiveType::NakPdu, &self.pdu_and_cksum_buffer.borrow()[0..written_len], )?; sent_packets += 1; } } if fd_pdu.offset() >= acked_params.last_end_offset { acked_params.last_start_offset = fd_pdu.offset(); acked_params.last_end_offset = fd_pdu.offset() + fd_pdu.file_data().len() as u64; } if fd_pdu.offset() + fd_pdu.file_data().len() as u64 <= acked_params.last_start_offset { // Might be a re-requested FD PDU. let removed = self.lost_segment_tracker.remove_lost_segment(( fd_pdu.offset(), fd_pdu.offset() + fd_pdu.file_data().len() as u64, ))?; // Reception of missing segments resets the NAK activity parameters. // See CFDP 4.6.4.7. if removed && acked_params.deferred_procedure_active { self.reset_nak_activity_parameters_if_active(); } } Ok(sent_packets) } fn handle_eof_pdu( &mut self, cfdp_user: &mut impl CfdpUser, eof_pdu: EofPdu, ) -> Result { let sent_packets = 0; let first_packet = self.step() == TransactionStep::Idle; if first_packet { self.first_packet_handling(eof_pdu.pdu_header().common_pdu_conf())?; } if self.local_cfg.indication_cfg.eof_recv { // Unwrap is okay here, application logic ensures that transaction ID is valid here. cfdp_user .eof_recvd_indication(self.transaction_params.transaction_id.as_ref().unwrap()); } if first_packet { if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged { return Err(DestError::WrongStepForPdu { pdu_type: PduType::FileDirective, file_directive_type: Some(FileDirectiveType::EofPdu), step: self.step(), }); } return self.handle_eof_without_previous_metadata_in_acked_mode(&eof_pdu); } if eof_pdu.condition_code() == ConditionCode::NoError { if !self.handle_eof_no_error(&eof_pdu)? { return Ok(sent_packets); } } else { self.handle_eof_cancel(&eof_pdu); }; let mut sent_packets = 0; match self.transaction_params.transmission_mode() { TransmissionMode::Acknowledged => { self.acknowledge_eof_pdu(&eof_pdu)?; sent_packets += 1; match self.transaction_params.acked_params.as_ref() { Some(acked_params) => { if acked_params.metadata_missing || !self.lost_segment_tracker.is_empty() { self.start_deferred_lost_segment_handling(); } else { self.set_step(TransactionStep::TransferCompletion); } } // This can happen when the EOF is the first packet to arrive. None => { self.start_deferred_lost_segment_handling(); } } } TransmissionMode::Unacknowledged => { self.set_step(TransactionStep::TransferCompletion); } } Ok(sent_packets) } fn handle_eof_without_previous_metadata_in_acked_mode( &mut self, eof_pdu: &EofPdu, ) -> Result { let mut sent_packets = 0; self.transaction_params.file_size = eof_pdu.file_size(); self.transaction_params.checksum = eof_pdu.file_checksum(); self.transaction_params.acked_params = Some(AcknowledgedModeParams { last_start_offset: 0, last_end_offset: 0, nak_activity_counter: 0, metadata_missing: true, deferred_procedure_active: false, }); if self.transaction_params.file_size > 0 { self.lost_segment_tracker.reset(); // Add the whole file to the lost segments map for now. self.lost_segment_tracker .add_lost_segment((0, eof_pdu.file_size()))?; } if eof_pdu.condition_code() != ConditionCode::NoError { self.handle_eof_cancel(eof_pdu); } self.acknowledge_eof_pdu(eof_pdu)?; sent_packets += 1; if self.transaction_params.completion_disposition.get() == CompletionDisposition::Cancelled { self.set_step(TransactionStep::TransferCompletion); return Ok(sent_packets); } self.set_step(TransactionStep::WaitingForMetadata); self.start_deferred_lost_segment_handling(); Ok(sent_packets) } fn handle_eof_cancel(&mut self, eof_pdu: &EofPdu) { // This is an EOF (Cancel), perform Cancel Response Procedures according to chapter // 4.6.6 of the standard. Set remote ID as fault location. self.trigger_notice_of_completion_cancelled( eof_pdu.condition_code(), EntityIdTlv::new(self.transaction_params.remote_cfg.unwrap().entity_id), ); // Store this as progress for the checksum calculation as well. self.transaction_params.progress = eof_pdu.file_size(); if let Some(ack_params) = &self.transaction_params.acked_params { if ack_params.metadata_missing { return; } } if self.transaction_params.progress == 0 { // Empty file, no file data PDU. self.transaction_params .finished_params .delivery_code .set(DeliveryCode::Complete); return; } if self.checksum_verify(self.transaction_params.progress, eof_pdu.file_checksum()) { self.transaction_params .finished_params .delivery_code .set(DeliveryCode::Complete); return; } self.transaction_params .finished_params .delivery_code .set(DeliveryCode::Incomplete); } fn acknowledge_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result<(), DestError> { let pdu_header = PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0); let ack_pdu = AckPdu::new_for_eof_pdu( pdu_header, eof_pdu.condition_code(), TransactionStatus::Active, ); let written_len = ack_pdu.write_to_bytes(self.pdu_and_cksum_buffer.get_mut())?; self.pdu_sender.send_file_directive_pdu( FileDirectiveType::AckPdu, &self.pdu_and_cksum_buffer.borrow()[0..written_len], )?; Ok(()) } fn start_deferred_lost_segment_handling(&mut self) { match &mut self.transaction_params.acked_params { Some(params) => { params.last_start_offset = self.transaction_params.file_size; params.last_end_offset = self.transaction_params.file_size; params.deferred_procedure_active = true; params.nak_activity_counter = 0; } None => { self.transaction_params.acked_params = Some(AcknowledgedModeParams { last_start_offset: self.transaction_params.file_size, last_end_offset: self.transaction_params.file_size, metadata_missing: false, nak_activity_counter: 0, deferred_procedure_active: true, }) } } } fn check_for_deferred_lost_segment_completion(&mut self, metadata_missing: bool) -> bool { if self.lost_segment_tracker.is_empty() && !metadata_missing { // We are done and have received everything. match self.checksum_verify( self.transaction_params.progress, self.transaction_params.checksum, ) { true => { self.transaction_params .finished_params .condition_code .set(ConditionCode::NoError); self.transaction_params .finished_params .delivery_code .set(DeliveryCode::Complete); } false => { self.transaction_params .finished_params .condition_code .set(ConditionCode::FileChecksumFailure); self.transaction_params .finished_params .delivery_code .set(DeliveryCode::Incomplete); } } self.transaction_params .acked_params .as_mut() .unwrap() .deferred_procedure_active = false; self.set_step(TransactionStep::TransferCompletion); return true; } false } fn deferred_lost_segment_handling(&mut self) -> Result { assert!( self.transaction_params.acked_params.is_some(), "acknowledged parameters unexpectedly None" ); let acked_params = self.transaction_params.acked_params.unwrap(); if self.check_for_deferred_lost_segment_completion(acked_params.metadata_missing) { return Ok(0); } let mut sent_packets = 0; let first_nak_issuance = self.transaction_params.deferred_procedure_timer.is_none(); if first_nak_issuance { self.transaction_params.deferred_procedure_timer = Some( self.check_timer_creator .create_countdown(TimerContext::NakActivity { expiry_time: self .transaction_params .remote_cfg .as_ref() .unwrap() .nak_timer_interval, }), ); } else if !self .transaction_params .deferred_procedure_timer .as_ref() .unwrap() .has_expired() { return Ok(sent_packets); } if !first_nak_issuance && acked_params.nak_activity_counter + 1 == self .transaction_params .remote_cfg .as_ref() .unwrap() .nak_timer_expiration_limit { self.transaction_params .finished_params .delivery_code .set(DeliveryCode::Incomplete); if self.declare_fault(ConditionCode::NakLimitReached) == FaultHandlerCode::AbandonTransaction { self.abandon_transaction(); } return Ok(sent_packets); } if !first_nak_issuance { self.transaction_params .acked_params .as_mut() .unwrap() .nak_activity_counter += 1; self.transaction_params .deferred_procedure_timer .as_mut() .unwrap() .reset(); } let pdu_header = PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0); let max_segment_reqs = NakPduCreatorWithReservedSeqReqsBuf::calculate_max_segment_requests( self.transaction_params .remote_cfg .as_ref() .unwrap() .max_packet_len, &pdu_header, ) .map_err(|_| DestError::InvalidRemoteConfig(self.transaction_params.remote_cfg.unwrap()))?; let mut segments_to_send = self.lost_segment_tracker.number_of_segments(); if acked_params.metadata_missing { segments_to_send += 1; } if segments_to_send <= max_segment_reqs { // Should never fail because we calculcated the allowed maximum number of segment // requests for the buffer. let mut nak_pdu_creator = NakPduCreatorWithReservedSeqReqsBuf::new( self.pdu_and_cksum_buffer.get_mut(), pdu_header, segments_to_send, ) .unwrap(); // All error conditions were checked so this should never fail. // // 1. Number of segments was calculated. // 2. Buffer size was calculated based on PDU header and maximum packet size. // 3. Large file flag is based on PDU header, so there should never be a missmatch. self.lost_segment_tracker .write_to_nak_segment_list(&mut nak_pdu_creator, acked_params.metadata_missing) .expect("unexpected lost segment write error"); let written_len = nak_pdu_creator .finish(0, self.transaction_params.file_size) .map_err(PduError::from)?; self.pdu_sender.send_file_directive_pdu( FileDirectiveType::NakPdu, &self.pdu_and_cksum_buffer.borrow()[0..written_len], )?; sent_packets += 1; } else { sent_packets += self.write_multi_packet_nak_sequence( segments_to_send, max_segment_reqs, acked_params.metadata_missing, )?; } Ok(sent_packets) } #[cold] fn write_multi_packet_nak_sequence( &mut self, mut segments_to_send: usize, max_segments: usize, first_segment_metadata: bool, ) -> Result { let mut sent_packets = 0; let pdu_header = PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0); let mut nak_pdu_creator = NakPduCreatorWithReservedSeqReqsBuf::new( self.pdu_and_cksum_buffer.get_mut(), pdu_header, max_segments, ) .unwrap(); let mut segment_index = 0; let mut buf_index = 0; let mut current_start_of_scope = 0; let mut current_end_of_scope = 0; let mut seg_buf = nak_pdu_creator.segment_request_buffer_mut(); // First segment re-requests metadata PDU. if first_segment_metadata { if pdu_header.common_pdu_conf().file_flag == LargeFileFlag::Large { seg_buf[0..16].fill(0); buf_index = 16; } else { seg_buf[0..8].fill(0); buf_index = 8; } segment_index = 1; // account for the header gap entry } for (idx, (start, end)) in self.lost_segment_tracker.iter().enumerate() { // flush full PDU *before* writing the new entry if segment_index == max_segments { let written_len = nak_pdu_creator .finish(current_start_of_scope, current_end_of_scope) .map_err(PduError::from)?; self.pdu_sender.send_file_directive_pdu( FileDirectiveType::NakPdu, &self.pdu_and_cksum_buffer.borrow()[..written_len], )?; sent_packets += 1; segments_to_send = segments_to_send.saturating_sub(max_segments); // new PDU with the same capacity nak_pdu_creator = NakPduCreatorWithReservedSeqReqsBuf::new( self.pdu_and_cksum_buffer.get_mut(), pdu_header, core::cmp::min(segments_to_send, max_segments), ) .unwrap(); seg_buf = nak_pdu_creator.segment_request_buffer_mut(); segment_index = 0; current_start_of_scope = current_end_of_scope; buf_index = 0; } // write entry if pdu_header.common_pdu_conf().file_flag == LargeFileFlag::Large { seg_buf[buf_index..buf_index + 8].copy_from_slice(&start.to_be_bytes()); buf_index += 8; seg_buf[buf_index..buf_index + 8].copy_from_slice(&end.to_be_bytes()); buf_index += 8; } else { seg_buf[buf_index..buf_index + 4].copy_from_slice(&(start as u32).to_be_bytes()); buf_index += 4; seg_buf[buf_index..buf_index + 4].copy_from_slice(&(end as u32).to_be_bytes()); buf_index += 4; } if idx == max_segments { // Clamp the end of scope to the progress for the last NAK in the sequence. current_end_of_scope = self.transaction_params.progress; } else { current_end_of_scope = end; } segment_index += 1; } // send trailing PDU if anything was written if segment_index > 0 { let written_len = nak_pdu_creator .finish(current_start_of_scope, current_end_of_scope) .map_err(PduError::from)?; self.pdu_sender.send_file_directive_pdu( FileDirectiveType::NakPdu, &self.pdu_and_cksum_buffer.borrow()[..written_len], )?; sent_packets += 1; } Ok(sent_packets) } fn trigger_notice_of_completion_cancelled( &self, cond_code: ConditionCode, fault_location: EntityIdTlv, ) { self.transaction_params .completion_disposition .set(CompletionDisposition::Cancelled); self.transaction_params .finished_params .condition_code .set(cond_code); self.transaction_params .finished_params .fault_location_finished .set(Some(fault_location)); // For anything larger than 0, we'd have to do a checksum check to verify whether // the delivery is really complete, and we need the EOF checksum for that.. if self.transaction_params.progress == 0 { self.transaction_params .finished_params .delivery_code .set(DeliveryCode::Complete); } } /// Returns whether the transfer can be completed regularly. fn handle_eof_no_error(&mut self, eof_pdu: &EofPdu) -> Result { // CFDP 4.6.1.2.9: Declare file size error if progress exceeds file size if self.transaction_params.progress > eof_pdu.file_size() { match self.declare_fault(ConditionCode::FileSizeError) { FaultHandlerCode::IgnoreError => (), FaultHandlerCode::AbandonTransaction => { self.abandon_transaction(); return Ok(false); } FaultHandlerCode::NoticeOfCancellation | FaultHandlerCode::NoticeOfSuspension => { return Ok(false); } } } else if (self.transaction_params.progress < eof_pdu.file_size()) && self.transaction_params.transmission_mode() == TransmissionMode::Acknowledged { // CFDP 4.6.4.3.1: The end offset of the last received file segment and the file // size as stated in the EOF PDU is not the same, so we need to add that segment to // the lost segments for the deferred lost segment detection procedure. self.lost_segment_tracker .add_lost_segment((self.transaction_params.progress, eof_pdu.file_size()))?; } self.transaction_params.file_size = eof_pdu.file_size(); self.transaction_params.checksum = eof_pdu.file_checksum(); if self.transaction_params.transmission_mode() == TransmissionMode::Unacknowledged && !self.checksum_verify( self.transaction_params.progress, self.transaction_params.checksum, ) { self.start_check_limit_handling(); return Ok(false); } self.transaction_params .finished_params .delivery_code .set(DeliveryCode::Complete); self.transaction_params .finished_params .condition_code .set(ConditionCode::NoError); Ok(true) } fn checksum_verify(&mut self, verify_len: u64, checksum: u32) -> bool { if self.transaction_params.metadata_params().checksum_type == ChecksumType::NullChecksum || self.transaction_params.metadata_only { return true; } match self.vfs.checksum_verify( checksum, // Safety: It was already verified that the path is valid during the transaction start. unsafe { from_utf8_unchecked( &self.transaction_params.file_names.dest_path_buf [0..self.transaction_params.file_names.dest_file_path_len], ) }, self.transaction_params.metadata_params().checksum_type, verify_len, self.pdu_and_cksum_buffer.get_mut(), ) { Ok(false) => { if self.declare_fault(ConditionCode::FileChecksumFailure) == FaultHandlerCode::AbandonTransaction { self.abandon_transaction(); } false } Ok(true) => true, Err(e) => match e { FilestoreError::ChecksumTypeNotImplemented(_) => { if self.declare_fault(ConditionCode::UnsupportedChecksumType) == FaultHandlerCode::AbandonTransaction { self.abandon_transaction(); } // For this case, the applicable algorithm shall be the the null checksum, // which is always succesful. true } _ => { if self.declare_fault(ConditionCode::FilestoreRejection) == FaultHandlerCode::AbandonTransaction { self.abandon_transaction(); } // Treat this equivalent to a failed checksum procedure. false } }, } } fn start_check_limit_handling(&mut self) { self.set_step(TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling); self.transaction_params.current_check_timer = Some( self.check_timer_creator .create_countdown(TimerContext::CheckLimit { local_id: self.local_cfg.id, remote_id: self.transaction_params.remote_cfg.unwrap().entity_id, entity_type: EntityType::Receiving, }), ); self.transaction_params.current_check_count = 0; } fn check_limit_handling(&mut self) -> Result<(), DestError> { if self.transaction_params.current_check_timer.is_none() { return Ok(()); } let check_timer = self .transaction_params .current_check_timer .as_ref() .unwrap(); if check_timer.has_expired() { if self.checksum_verify( self.transaction_params.progress, self.transaction_params.checksum, ) { self.transaction_params .finished_params .condition_code .set(ConditionCode::NoError); self.transaction_params .finished_params .delivery_code .set(DeliveryCode::Complete); self.set_step(TransactionStep::TransferCompletion); return Ok(()); } if self.transaction_params.current_check_count + 1 >= self.transaction_params.remote_cfg.unwrap().check_limit { if self.declare_fault(ConditionCode::CheckLimitReached) == FaultHandlerCode::AbandonTransaction { self.abandon_transaction(); } } else { self.transaction_params.current_check_count += 1; self.transaction_params .current_check_timer .as_mut() .unwrap() .reset(); } } Ok(()) } fn handle_prompt_pdu(&mut self, _raw_packet: &[u8]) -> Result<(), DestError> { Err(DestError::NotImplemented) } fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result { let mut sent_packets = 0; if self.step() == TransactionStep::TransactionStart { let result = self.transaction_start(cfdp_user); if let Err(e) = result { // If we can not even start the transaction properly, reset the handler. // We could later let the user do this optionally, but for now this is the safer // approach to prevent inconsistent states of the handler. self.reset(); return Err(e); } } if self.step() == TransactionStep::WaitingForMetadata || self.step() == TransactionStep::ReceivingFileDataPdus { if let Some(ack_params) = &mut self.transaction_params.acked_params { if ack_params.deferred_procedure_active { sent_packets += self.deferred_lost_segment_handling()?; } } } if self.step() == TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling { self.check_limit_handling()?; } if self.step() == TransactionStep::TransferCompletion { sent_packets += self.transfer_completion(cfdp_user)?; } if self.step() == TransactionStep::WaitingForFinishedAck { sent_packets += self.handle_positive_ack_procedures()?; // Little hack because of the state machine handling order to ensure the transfer // is completed in one go. if self.step() == TransactionStep::TransferCompletion { sent_packets += self.transfer_completion(cfdp_user)?; } } Ok(sent_packets) } fn transaction_start(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { let id = self.transaction_id().unwrap(); let dest_name = from_utf8( &self.transaction_params.file_names.dest_file_name [..self.transaction_params.file_names.dest_file_name_len], )?; self.transaction_params.file_names.dest_path_buf[0..dest_name.len()] .copy_from_slice(dest_name.as_bytes()); self.transaction_params.file_names.dest_file_path_len = dest_name.len(); let source_id = self.transaction_params.pdu_conf.source_id(); let src_name = from_utf8( &self.transaction_params.file_names.src_file_name [0..self.transaction_params.file_names.src_file_name_len], )?; let mut msgs_to_user = SmallVec::<[MsgToUserTlv<'_>; 16]>::new(); let mut num_msgs_to_user = 0; if self.transaction_params.msgs_to_user_size > 0 { let mut index = 0; while index < self.transaction_params.msgs_to_user_size { // This should never panic as the validity of the options was checked beforehand. let msgs_to_user_tlv = MsgToUserTlv::from_bytes(&self.transaction_params.msgs_to_user_buf[index..]) .expect("message to user creation failed unexpectedly"); msgs_to_user.push(msgs_to_user_tlv); index += msgs_to_user_tlv.len_full(); num_msgs_to_user += 1; } } let metadata_recvd_params = MetadataReceivedParams { id, source_id, file_size: self.transaction_params.file_size(), src_file_name: src_name, dest_file_name: dest_name, msgs_to_user: &msgs_to_user[..num_msgs_to_user], }; cfdp_user.metadata_recvd_indication(&metadata_recvd_params); if self.vfs.exists(dest_name)? && self.vfs.is_dir(dest_name)? { // Create new destination path by concatenating the last part of the source source // name and the destination folder. For example, for a source file of /tmp/hello.txt // and a destination name of /home/test, the resulting file name should be // /home/test/hello.txt // Safety: It was already verified that the path is valid during the transaction start. let source_file_name = self.vfs.file_name(src_name)?; if source_file_name.is_none() { return Err(DestError::PathConcat); } let source_name = source_file_name.unwrap(); self.transaction_params.file_names.dest_path_buf[dest_name.len()] = b'/'; self.transaction_params.file_names.dest_path_buf [dest_name.len() + 1..dest_name.len() + 1 + source_name.len()] .copy_from_slice(source_name.as_bytes()); self.transaction_params.file_names.dest_file_path_len += 1 + source_name.len(); } let dest_path_str = from_utf8( &self.transaction_params.file_names.dest_path_buf [0..self.transaction_params.file_names.dest_file_path_len], )?; if self.vfs.exists(dest_path_str)? { self.vfs.truncate_file(dest_path_str)?; } else { self.vfs.create_file(dest_path_str)?; } self.transaction_params.finished_params.file_status = FileStatus::Retained; drop(msgs_to_user); self.set_step(TransactionStep::ReceivingFileDataPdus); Ok(()) } fn transfer_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result { let mut sent_packets = 0; self.notice_of_completion(cfdp_user)?; if self.transaction_params.transmission_mode() == TransmissionMode::Acknowledged || self.transaction_params.metadata_params().closure_requested { sent_packets += self.send_finished_pdu()?; self.start_positive_ack_procedure(); if self.transaction_params.transmission_mode() == TransmissionMode::Acknowledged { self.set_step(TransactionStep::WaitingForFinishedAck); return Ok(sent_packets); } } self.reset(); Ok(sent_packets) } fn start_positive_ack_procedure(&mut self) { match &mut self.transaction_params.positive_ack_params { Some(current) => { current.ack_counter = 0; } None => { self.transaction_params.positive_ack_params = Some(PositiveAckParams { ack_counter: 0, positive_ack_of_cancellation: false, }) } } self.transaction_params.ack_timer = Some( self.check_timer_creator .create_countdown(TimerContext::PositiveAck { expiry_time: self .transaction_params .remote_cfg .as_ref() .unwrap() .positive_ack_timer_interval, }), ); } fn handle_positive_ack_procedures(&mut self) -> Result { // Do we have positive-ack params? if self.transaction_params.positive_ack_params.is_none() { return Ok(0); } let params = self.transaction_params.positive_ack_params.unwrap(); // Has the timer expired? if !self .transaction_params .ack_timer .as_mut() .unwrap() .has_expired() { return Ok(0); } let expiration_limit = self .transaction_params .remote_cfg .as_ref() .unwrap() .positive_ack_timer_expiration_limit; // If bumping the counter would exceed the limit, fault & maybe abandon if params.ack_counter + 1 >= expiration_limit { if self.declare_fault(ConditionCode::PositiveAckLimitReached) == FaultHandlerCode::AbandonTransaction { self.abandon_transaction(); return Ok(0); } self.transaction_params .positive_ack_params .as_mut() .unwrap() .positive_ack_of_cancellation = true; return Ok(0); } let params_mut = self .transaction_params .positive_ack_params .as_mut() .unwrap(); params_mut.ack_counter += 1; self.transaction_params.ack_timer.as_mut().unwrap().reset(); self.send_finished_pdu() } fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> { if self.transaction_params.completion_disposition.get() == CompletionDisposition::Completed { // TODO: Execute any filestore requests } else if self .transaction_params .remote_cfg .as_ref() .unwrap() .disposition_on_cancellation && self.transaction_params.finished_params.delivery_code.get() == DeliveryCode::Incomplete { // Safety: We already verified that the path is valid during the transaction start. let dest_path = unsafe { from_utf8_unchecked( &self.transaction_params.file_names.dest_path_buf [0..self.transaction_params.file_names.dest_file_path_len], ) }; if self.vfs.exists(dest_path)? && self.vfs.is_file(dest_path)? { self.vfs.remove_file(dest_path)?; } self.transaction_params.finished_params.file_status = FileStatus::DiscardDeliberately; } let tstate = &self.transaction_params; let transaction_finished_params = TransactionFinishedParams { id: tstate.transaction_id.unwrap(), condition_code: tstate.finished_params.condition_code.get(), delivery_code: tstate.finished_params.delivery_code.get(), file_status: tstate.finished_params.file_status, }; cfdp_user.transaction_finished_indication(&transaction_finished_params); Ok(()) } // When the fault handler code [FaultHandlerCode::AbandonTransaction] is returned, the caller // must call [Self::abandon_transaction] as soon as it is possible. fn declare_fault(&self, condition_code: ConditionCode) -> FaultHandlerCode { // Cache those, because they might be reset when abandoning the transaction. let transaction_id = self.transaction_id().unwrap(); let progress = self.transaction_params.progress; let mut fh_code = self .local_cfg .fault_handler .get_fault_handler(condition_code); // CFDP 4.11.2.3.2: Any fault declared in the course of transferring the Finished (cancel) // PDU must result in abadonment of the transaction. if let Some(positive_ack) = &self.transaction_params.positive_ack_params { if positive_ack.positive_ack_of_cancellation { fh_code = FaultHandlerCode::AbandonTransaction; } } match fh_code { FaultHandlerCode::NoticeOfCancellation => { self.notice_of_cancellation(condition_code, EntityIdTlv::new(self.local_cfg().id)); } FaultHandlerCode::NoticeOfSuspension => self.notice_of_suspension(), FaultHandlerCode::IgnoreError => (), FaultHandlerCode::AbandonTransaction => (), } self.local_cfg.fault_handler.report_fault( fh_code, FaultInfo::new(transaction_id, condition_code, progress), ) } fn notice_of_cancellation(&self, condition_code: ConditionCode, fault_location: EntityIdTlv) { self.set_step_internal(TransactionStep::TransferCompletion); let tstate = &self.transaction_params; tstate.finished_params.condition_code.set(condition_code); tstate .completion_disposition .set(CompletionDisposition::Cancelled); tstate .finished_params .fault_location_finished .set(Some(fault_location)); } fn notice_of_suspension(&self) { // TODO: Implement suspension handling. } fn abandon_transaction(&mut self) { self.reset(); } #[inline] fn set_step_internal(&self, step: TransactionStep) { self.step.set(step); } #[inline] fn set_step(&mut self, step: TransactionStep) { self.set_step_internal(step); } fn send_finished_pdu(&mut self) -> Result { let tstate = &self.transaction_params; let pdu_header = PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0); let finished_pdu = if tstate.finished_params.condition_code.get() == ConditionCode::NoError || tstate.finished_params.condition_code.get() == ConditionCode::UnsupportedChecksumType { FinishedPduCreator::new_no_error( pdu_header, tstate.finished_params.delivery_code.get(), tstate.finished_params.file_status, ) } else { FinishedPduCreator::new( pdu_header, tstate.finished_params.condition_code.get(), tstate.finished_params.delivery_code.get(), tstate.finished_params.file_status, &[], tstate.finished_params.fault_location_finished.get(), ) }; finished_pdu.write_to_bytes(self.pdu_and_cksum_buffer.get_mut())?; self.pdu_sender.send_file_directive_pdu( FileDirectiveType::FinishedPdu, &self.pdu_and_cksum_buffer.borrow()[0..finished_pdu.len_written()], )?; Ok(1) } } #[cfg(test)] mod tests { #[allow(unused_imports)] use std::println; use std::{ fs, path::{Path, PathBuf}, string::String, }; use alloc::vec::Vec; use rand::Rng; use spacepackets::{ cfdp::{ ChecksumType, TransmissionMode, lv::Lv, pdu::{ WritablePduPacket, finished::FinishedPduReader, metadata::MetadataPduCreator, nak::NakPduReader, }, }, util::UnsignedByteFieldU8, }; use crate::{ CRC_32, FaultHandler, IndicationConfig, PduRawWithInfo, RemoteConfigStoreStd, filestore::NativeFilestore, lost_segments::LostSegmentsList, tests::{ LOCAL_ID, REMOTE_ID, SentPdu, TestCfdpSender, TestCfdpUser, TestCheckTimer, TestCheckTimerCreator, TestFaultHandler, TimerExpiryControl, basic_remote_cfg_table, }, }; use super::*; #[derive(Debug, Clone, Copy)] pub struct TransferInfo { id: TransactionId, header: PduHeader, } type TestDestHandler = DestinationHandler< TestCfdpSender, TestFaultHandler, NativeFilestore, RemoteConfigStoreStd, TestCheckTimerCreator, TestCheckTimer, LostSegmentsList, >; struct DestHandlerTestbench { expiry_control: TimerExpiryControl, handler: TestDestHandler, src_path: PathBuf, dest_path: PathBuf, check_dest_file: bool, check_handler_idle_at_drop: bool, closure_requested: bool, pdu_conf: CommonPduConfig, expected_full_data: Vec, expected_file_size: u64, buf: [u8; 512], } impl DestHandlerTestbench { fn new_with_fixed_paths( fault_handler: TestFaultHandler, transmission_mode: TransmissionMode, closure_requested: bool, ) -> Self { let (src_path, dest_path) = init_full_filepaths_textfile(); assert!(!Path::exists(&dest_path)); Self::new( fault_handler, transmission_mode, closure_requested, true, src_path, dest_path, ) } fn new( fault_handler: TestFaultHandler, transmission_mode: TransmissionMode, closure_requested: bool, check_dest_file: bool, src_path: PathBuf, dest_path: PathBuf, ) -> Self { let expiry_control = TimerExpiryControl::default(); let test_sender = TestCfdpSender::default(); let dest_handler = default_dest_handler(fault_handler, test_sender, &expiry_control); let mut handler = Self { expiry_control, handler: dest_handler, src_path, closure_requested, dest_path, check_dest_file, check_handler_idle_at_drop: true, expected_file_size: 0, pdu_conf: CommonPduConfig::new_with_byte_fields( LOCAL_ID, REMOTE_ID, UnsignedByteFieldU8::new(0), ) .unwrap(), expected_full_data: Vec::new(), buf: [0; 512], }; handler.pdu_conf.trans_mode = transmission_mode; handler.state_check(State::Idle, TransactionStep::Idle); handler } pub fn fault_handler(&self) -> &FaultHandler { &self.handler.local_cfg.fault_handler } #[inline] fn set_large_file_flag(&mut self, large_file: LargeFileFlag) { self.pdu_conf.file_flag = large_file; } fn dest_path(&self) -> &PathBuf { &self.dest_path } fn all_fault_queues_empty(&self) -> bool { self.handler .local_cfg .user_fault_hook() .borrow() .all_queues_empty() } #[allow(dead_code)] fn indication_cfg_mut(&mut self) -> &mut IndicationConfig { &mut self.handler.local_cfg.indication_cfg } fn remote_cfg_mut(&mut self) -> &mut RemoteEntityConfig { self.handler .remote_cfg_table .get_mut(LOCAL_ID.value()) .unwrap() } fn pdu_queue_empty(&mut self) -> bool { self.handler.pdu_sender.queue_empty() } fn pdu_queue_len(&mut self) -> usize { self.handler.pdu_sender.queue_len() } fn get_next_pdu(&mut self) -> Option { self.handler.pdu_sender.retrieve_next_pdu() } fn indication_cfg(&mut self) -> &IndicationConfig { &self.handler.local_cfg.indication_cfg } fn set_check_timer_expired(&mut self) { self.expiry_control.set_check_limit_expired(); } fn set_nak_activity_timer_expired(&mut self) { self.expiry_control.set_nak_activity_expired(); } fn set_positive_ack_expired(&mut self) { self.expiry_control.set_positive_ack_expired(); } fn test_user_from_cached_paths(&self, expected_file_size: u64) -> TestCfdpUser { TestCfdpUser::new( 0, self.src_path.to_string_lossy().into(), self.dest_path.to_string_lossy().into(), expected_file_size, ) } fn generic_transfer_init( &mut self, user: &mut TestCfdpUser, file_size: u64, ) -> Result { self.expected_file_size = file_size; assert_eq!(user.transaction_indication_call_count, 0); assert_eq!(user.metadata_recv_queue.len(), 0); let pdu_header = PduHeader::new_for_file_directive(self.pdu_conf, 0); let metadata_pdu = create_metadata_pdu( &pdu_header, self.src_path.as_path(), self.dest_path.as_path(), file_size, self.closure_requested, ); let packet_info = create_packet_info(&metadata_pdu, &mut self.buf); self.handler.state_machine(user, Some(&packet_info))?; assert_eq!(user.metadata_recv_queue.len(), 1); assert_eq!( self.handler.transmission_mode().unwrap(), self.pdu_conf.trans_mode ); assert_eq!(user.transaction_indication_call_count, 0); assert_eq!(user.metadata_recv_queue.len(), 1); let metadata_recvd = user.metadata_recv_queue.pop_front().unwrap(); assert_eq!(metadata_recvd.source_id, LOCAL_ID.into()); assert_eq!( metadata_recvd.src_file_name, String::from(self.src_path.to_str().unwrap()) ); assert_eq!( metadata_recvd.dest_file_name, String::from(self.dest_path().to_str().unwrap()) ); assert_eq!(metadata_recvd.id, self.handler.transaction_id().unwrap()); assert_eq!(metadata_recvd.file_size, file_size); assert!(metadata_recvd.msgs_to_user.is_empty()); Ok(TransferInfo { id: self.handler.transaction_id().unwrap(), header: pdu_header, }) } fn generic_file_data_insert( &mut self, user: &mut TestCfdpUser, offset: u64, file_data_chunk: &[u8], ) -> Result { let pdu_header = PduHeader::new_for_file_data_default(self.pdu_conf, 0); let filedata_pdu = FileDataPdu::new_no_seg_metadata(pdu_header, offset, file_data_chunk); filedata_pdu .write_to_bytes(&mut self.buf) .expect("writing file data PDU failed"); let packet_info = PduRawWithInfo::new(&self.buf).expect("creating packet info failed"); let result = self.handler.state_machine(user, Some(&packet_info)); if self.indication_cfg().file_segment_recv { assert!(!user.file_seg_recvd_queue.is_empty()); let file_seg = user.file_seg_recvd_queue.pop_front().unwrap(); assert_eq!(file_seg.offset, offset); assert_eq!(file_seg.length, file_data_chunk.len()); } result } fn generic_eof_no_error( &mut self, user: &mut TestCfdpUser, expected_full_data: Vec, ) -> Result { self.expected_full_data = expected_full_data; assert_eq!(user.finished_indic_queue.len(), 0); let pdu_header = PduHeader::new_for_file_directive(self.pdu_conf, 0); let eof_pdu = create_no_error_eof(&self.expected_full_data, &pdu_header); let packet_info = create_packet_info(&eof_pdu, &mut self.buf); self.check_handler_idle_at_drop = true; self.check_dest_file = true; let result = self.handler.state_machine(user, Some(&packet_info)); if self.indication_cfg().eof_recv { assert_eq!(user.eof_recvd_call_count, 1); } result } fn check_completion_indication_success(&mut self, user: &mut TestCfdpUser) { assert_eq!(user.finished_indic_queue.len(), 1); let finished_indication = user.finished_indic_queue.pop_front().unwrap(); assert_eq!( finished_indication.id, self.handler.transaction_id().unwrap() ); assert_eq!(finished_indication.file_status, FileStatus::Retained); assert_eq!(finished_indication.delivery_code, DeliveryCode::Complete); assert_eq!(finished_indication.condition_code, ConditionCode::NoError); } fn check_completion_indication_failure( &mut self, user: &mut TestCfdpUser, cond_code: ConditionCode, file_status: FileStatus, delivery_code: DeliveryCode, ) { assert_eq!(user.finished_indic_queue.len(), 1); let finished_indication = user.finished_indic_queue.pop_front().unwrap(); assert_eq!( finished_indication.id, self.handler.transaction_id().unwrap() ); assert_eq!(finished_indication.file_status, file_status); assert_eq!(finished_indication.delivery_code, delivery_code); assert_eq!(finished_indication.condition_code, cond_code); } fn check_eof_ack_pdu(&mut self, cond_code: ConditionCode) { assert!(!self.pdu_queue_empty()); let pdu = self.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); assert_eq!(pdu.file_directive_type.unwrap(), FileDirectiveType::AckPdu); let ack_pdu = AckPdu::from_bytes(&pdu.raw_pdu).unwrap(); assert_eq!(ack_pdu.condition_code(), cond_code); assert_eq!(ack_pdu.transaction_status(), TransactionStatus::Active); assert_eq!( ack_pdu.directive_code_of_acked_pdu(), FileDirectiveType::EofPdu ); } fn check_finished_pdu_success(&mut self) { let pdu = self.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); assert_eq!( pdu.file_directive_type.unwrap(), FileDirectiveType::FinishedPdu ); let finished_pdu = FinishedPduReader::from_bytes(&pdu.raw_pdu).unwrap(); assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Complete); assert_eq!(finished_pdu.file_status(), FileStatus::Retained); assert_eq!(finished_pdu.condition_code(), ConditionCode::NoError); assert!(finished_pdu.fault_location().is_none()); } fn check_finished_pdu_failure( &mut self, cond_code: ConditionCode, file_status: FileStatus, delivery_code: DeliveryCode, ) { let pdu = self.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); assert_eq!( pdu.file_directive_type.unwrap(), FileDirectiveType::FinishedPdu ); let finished_pdu = FinishedPduReader::from_bytes(&pdu.raw_pdu).unwrap(); assert_eq!(finished_pdu.delivery_code(), delivery_code); assert_eq!(finished_pdu.file_status(), file_status); assert_eq!(finished_pdu.condition_code(), cond_code); } fn acknowledge_finished_pdu( &mut self, user: &mut impl CfdpUser, transfer_info: &TransferInfo, ) { let ack_pdu = AckPdu::new_for_finished_pdu( transfer_info.header, ConditionCode::NoError, TransactionStatus::Active, ); self.handler .state_machine(user, Some(&create_packet_info(&ack_pdu, &mut self.buf))) .expect("handling ack PDU failed"); } fn state_check(&self, state: State, step: TransactionStep) { assert_eq!(self.handler.state(), state); assert_eq!(self.handler.step(), step); } } // Specifying some checks in the drop method avoids some boilerplate. impl Drop for DestHandlerTestbench { fn drop(&mut self) { if !self.all_fault_queues_empty() { let fh_queues = self.handler.local_cfg.user_fault_hook().borrow(); println!( "fault queues not empty. cancellation {}, suspension {}, ignored {}, abandon {}", fh_queues.notice_of_cancellation_queue.len(), fh_queues.notice_of_suspension_queue.len(), fh_queues.ignored_queue.len(), fh_queues.abandoned_queue.len() ); } assert!(self.all_fault_queues_empty(), "fault queues not empty, "); assert!(self.pdu_queue_empty()); if self.check_handler_idle_at_drop { self.state_check(State::Idle, TransactionStep::Idle); } if self.check_dest_file { assert!(Path::exists(&self.dest_path)); let read_content = fs::read(&self.dest_path).expect("reading back string failed"); assert_eq!(read_content.len() as u64, self.expected_file_size); assert_eq!(read_content, self.expected_full_data); assert!(fs::remove_file(self.dest_path.as_path()).is_ok()); } } } fn init_full_filepaths_textfile() -> (PathBuf, PathBuf) { ( tempfile::TempPath::from_path("/tmp/test.txt").to_path_buf(), tempfile::NamedTempFile::new() .unwrap() .into_temp_path() .to_path_buf(), ) } fn default_dest_handler( test_fault_handler: TestFaultHandler, test_packet_sender: TestCfdpSender, expiry_control: &TimerExpiryControl, ) -> TestDestHandler { let local_entity_cfg = LocalEntityConfig { id: REMOTE_ID.into(), indication_cfg: IndicationConfig::default(), fault_handler: FaultHandler::new(test_fault_handler), }; DestinationHandler::new( local_entity_cfg, test_packet_sender, NativeFilestore::default(), basic_remote_cfg_table(LOCAL_ID, 1024, true), TestCheckTimerCreator::new(expiry_control), LostSegmentsList::default(), ) } fn create_metadata_pdu<'filename>( pdu_header: &PduHeader, src_name: &'filename Path, dest_name: &'filename Path, file_size: u64, closure_requested: bool, ) -> MetadataPduCreator<'filename, 'filename, 'static> { let checksum_type = if file_size == 0 { ChecksumType::NullChecksum } else { ChecksumType::Crc32 }; let metadata_params = MetadataGenericParams::new(closure_requested, checksum_type, file_size); MetadataPduCreator::new_no_opts( *pdu_header, metadata_params, Lv::new_from_str(src_name.as_os_str().to_str().unwrap()).unwrap(), Lv::new_from_str(dest_name.as_os_str().to_str().unwrap()).unwrap(), ) } fn create_packet_info<'a>( pdu: &'a impl WritablePduPacket, buf: &'a mut [u8], ) -> PduRawWithInfo<'a> { let written_len = pdu .write_to_bytes(buf) .expect("writing metadata PDU failed"); PduRawWithInfo::new(&buf[..written_len]).expect("generating packet info failed") } fn create_no_error_eof(file_data: &[u8], pdu_header: &PduHeader) -> EofPdu { let crc32 = if !file_data.is_empty() { let mut digest = CRC_32.digest(); digest.update(file_data); digest.finalize() } else { 0 }; EofPdu::new_no_error(*pdu_header, crc32, file_data.len() as u64) } #[test] fn test_basic() { let fault_handler = TestFaultHandler::default(); let test_sender = TestCfdpSender::default(); let dest_handler = default_dest_handler(fault_handler, test_sender, &TimerExpiryControl::default()); assert!(dest_handler.transmission_mode().is_none()); assert!( dest_handler .local_cfg .fault_handler .user_hook .borrow() .all_queues_empty() ); assert!(dest_handler.pdu_sender.queue_empty()); assert_eq!(dest_handler.state(), State::Idle); assert_eq!(dest_handler.step(), TransactionStep::Idle); } #[test] fn test_cancelling_idle_fsm() { let fault_handler = TestFaultHandler::default(); let test_sender = TestCfdpSender::default(); let mut dest_handler = default_dest_handler(fault_handler, test_sender, &TimerExpiryControl::default()); assert!(!dest_handler.cancel_request(&TransactionId::new( UnsignedByteFieldU8::new(0).into(), UnsignedByteFieldU8::new(0).into() ))); } #[test] fn test_empty_file_transfer_not_acked_no_closure() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, false, ); let mut test_user = tb.test_user_from_cached_paths(0); tb.generic_transfer_init(&mut test_user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_eof_no_error(&mut test_user, Vec::new()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut test_user); } #[test] fn test_empty_file_transfer_invalid_remote_id() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, false, ); let mut test_user = tb.test_user_from_cached_paths(0); let mut conf = tb.pdu_conf; // Just swap them.. conf.set_source_and_dest_id(REMOTE_ID, LOCAL_ID).unwrap(); let pdu_header = PduHeader::new_for_file_directive(conf, 0); let metadata_pdu = create_metadata_pdu( &pdu_header, tb.src_path.as_path(), tb.dest_path.as_path(), 0, false, ); let packet_info = create_packet_info(&metadata_pdu, &mut tb.buf); if let Err(DestError::NoRemoteConfigFound(id)) = tb.handler.state_machine(&mut test_user, Some(&packet_info)) { assert_eq!(id, REMOTE_ID.into()); } else { panic!("expected no remote config found error"); } tb.check_dest_file = false; } #[test] fn test_empty_file_transfer_acked() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Acknowledged, false, ); let mut user = tb.test_user_from_cached_paths(0); let transfer_info = tb .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_eof_no_error(&mut user, Vec::new()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut user); assert_eq!(tb.pdu_queue_len(), 2); tb.check_eof_ack_pdu(ConditionCode::NoError); tb.check_finished_pdu_success(); tb.acknowledge_finished_pdu(&mut user, &transfer_info); } #[test] fn test_small_file_transfer_not_acked() { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, false, ); let mut user = tb.test_user_from_cached_paths(file_size); let _transfer_info = tb .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); assert_eq!( tb.generic_file_data_insert(&mut user, 0, file_data) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_eof_no_error(&mut user, file_data.to_vec()) .expect("EOF no error insertion failed"), 0 ); tb.check_completion_indication_success(&mut user); } #[test] fn test_small_file_transfer_acked() { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Acknowledged, false, ); let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); assert_eq!( tb.generic_file_data_insert(&mut user, 0, file_data) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_eof_no_error(&mut user, file_data.to_vec()) .expect("EOF no error insertion failed"), 2 ); tb.check_completion_indication_success(&mut user); assert_eq!(tb.pdu_queue_len(), 2); tb.check_eof_ack_pdu(ConditionCode::NoError); tb.check_finished_pdu_success(); tb.acknowledge_finished_pdu(&mut user, &transfer_info); } #[test] fn test_segmented_file_transfer_not_acked() { let mut rng = rand::rng(); let mut random_data = [0u8; 512]; rng.fill(&mut random_data); let file_size = random_data.len() as u64; let segment_len = 256; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, false, ); let mut test_user = tb.test_user_from_cached_paths(file_size); tb.generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len]) .expect("file data insertion failed"); tb.generic_file_data_insert( &mut test_user, segment_len as u64, &random_data[segment_len..], ) .expect("file data insertion failed"); tb.generic_eof_no_error(&mut test_user, random_data.to_vec()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut test_user); } #[test] fn test_segmented_file_transfer_acked() { let mut rng = rand::rng(); let mut random_data = [0u8; 512]; rng.fill(&mut random_data); let file_size = random_data.len() as u64; let segment_len = 256; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Acknowledged, false, ); let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut user, 0, &random_data[0..segment_len]) .expect("file data insertion failed"); tb.generic_file_data_insert(&mut user, segment_len as u64, &random_data[segment_len..]) .expect("file data insertion failed"); tb.generic_eof_no_error(&mut user, random_data.to_vec()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut user); assert_eq!(tb.pdu_queue_len(), 2); tb.check_eof_ack_pdu(ConditionCode::NoError); tb.check_finished_pdu_success(); tb.acknowledge_finished_pdu(&mut user, &transfer_info); } #[test] fn test_check_limit_handling_transfer_success() { let mut rng = rand::rng(); let mut random_data = [0u8; 512]; rng.fill(&mut random_data); let file_size = random_data.len() as u64; let segment_len = 256; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, false, ); let mut test_user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb .generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len]) .expect("file data insertion 0 failed"); tb.generic_eof_no_error(&mut test_user, random_data.to_vec()) .expect("EOF no error insertion failed"); // Checksum failure. let mut fault_handler = tb.handler.local_cfg.fault_handler.user_hook.borrow_mut(); assert_eq!(fault_handler.ignored_queue.len(), 1); let cancelled = fault_handler.ignored_queue.pop_front().unwrap(); assert_eq!(cancelled.transaction_id(), transfer_info.id); assert_eq!( cancelled.condition_code(), ConditionCode::FileChecksumFailure ); assert_eq!(cancelled.progress(), segment_len as u64); drop(fault_handler); tb.state_check( State::Busy, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); tb.set_check_timer_expired(); tb.generic_file_data_insert( &mut test_user, segment_len as u64, &random_data[segment_len..], ) .expect("file data insertion 1 failed"); tb.handler .state_machine_no_packet(&mut test_user) .expect("fsm failure"); tb.check_completion_indication_success(&mut test_user); assert!(test_user.indication_queues_empty()); } #[test] fn test_check_limit_handling_limit_reached() { let mut rng = rand::rng(); let mut random_data = [0u8; 512]; rng.fill(&mut random_data); let file_size = random_data.len() as u64; let segment_len: usize = 256; let check_checksum_failure = |testbench: &mut DestHandlerTestbench, transaction_id: TransactionId| { let mut fault_hook = testbench.fault_handler().user_hook.borrow_mut(); assert!(fault_hook.notice_of_suspension_queue.is_empty()); let ignored_queue = &mut fault_hook.ignored_queue; assert_eq!(ignored_queue.len(), 1); let ignored = ignored_queue.pop_front().unwrap(); assert_eq!(ignored.transaction_id(), transaction_id); assert_eq!(ignored.condition_code(), ConditionCode::FileChecksumFailure); assert_eq!(ignored.progress(), segment_len as u64); }; let fault_handler = TestFaultHandler::default(); let mut testbench = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, false, ); let mut test_user = testbench.test_user_from_cached_paths(file_size); let transfer_info = testbench .generic_transfer_init(&mut test_user, file_size) .expect("transfer init failed"); testbench.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); testbench .generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len]) .expect("file data insertion 0 failed"); testbench .generic_eof_no_error(&mut test_user, random_data.to_vec()) .expect("EOF no error insertion failed"); check_checksum_failure(&mut testbench, transfer_info.id); testbench.state_check( State::Busy, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); testbench.set_check_timer_expired(); testbench .handler .state_machine_no_packet(&mut test_user) .expect("fsm error"); testbench.state_check( State::Busy, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); check_checksum_failure(&mut testbench, transfer_info.id); testbench.set_check_timer_expired(); testbench .handler .state_machine_no_packet(&mut test_user) .expect("fsm error"); check_checksum_failure(&mut testbench, transfer_info.id); testbench.state_check(State::Idle, TransactionStep::Idle); { let mut fault_hook = testbench.fault_handler().user_hook.borrow_mut(); let cancelled_queue = &mut fault_hook.notice_of_cancellation_queue; assert_eq!(cancelled_queue.len(), 1); let cancelled = cancelled_queue.pop_front().unwrap(); assert_eq!(cancelled.transaction_id(), transfer_info.id); assert_eq!(cancelled.condition_code(), ConditionCode::CheckLimitReached); assert_eq!(cancelled.progress(), segment_len as u64); } assert_eq!(test_user.finished_indic_queue.len(), 1); let finished_indication = test_user.finished_indic_queue.pop_front().unwrap(); assert_eq!(finished_indication.id, transfer_info.id); assert_eq!( finished_indication.condition_code, ConditionCode::CheckLimitReached ); assert_eq!(finished_indication.delivery_code, DeliveryCode::Incomplete); assert_eq!(finished_indication.file_status, FileStatus::Retained); assert!(testbench.handler.pdu_sender.queue_empty()); // Check that the broken file exists. testbench.check_dest_file = false; assert!(Path::exists(testbench.dest_path())); let read_content = fs::read(testbench.dest_path()).expect("reading back string failed"); assert_eq!(read_content.len(), segment_len); assert_eq!(read_content, &random_data[0..segment_len]); assert!(fs::remove_file(testbench.dest_path().as_path()).is_ok()); assert!(test_user.indication_queues_empty()); } fn check_finished_pdu_success(sent_pdu: &SentPdu) { assert_eq!(sent_pdu.pdu_type, PduType::FileDirective); assert_eq!( sent_pdu.file_directive_type, Some(FileDirectiveType::FinishedPdu) ); let finished_pdu = FinishedPduReader::from_bytes(&sent_pdu.raw_pdu).unwrap(); assert_eq!(finished_pdu.file_status(), FileStatus::Retained); assert_eq!(finished_pdu.condition_code(), ConditionCode::NoError); assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Complete); assert!(finished_pdu.fault_location().is_none()); assert_eq!(finished_pdu.fs_responses_raw(), &[]); } #[test] fn test_file_transfer_with_closure() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, true, ); let mut test_user = tb.test_user_from_cached_paths(0); tb.generic_transfer_init(&mut test_user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); let sent_packets = tb .generic_eof_no_error(&mut test_user, Vec::new()) .expect("EOF no error insertion failed"); assert_eq!(sent_packets, 1); assert!(tb.all_fault_queues_empty()); // The Finished PDU was sent, so the state machine is done. tb.state_check(State::Idle, TransactionStep::Idle); assert!(!tb.handler.pdu_sender.queue_empty()); let sent_pdu = tb.handler.pdu_sender.retrieve_next_pdu().unwrap(); check_finished_pdu_success(&sent_pdu); tb.check_completion_indication_success(&mut test_user); assert!(test_user.indication_queues_empty()); } #[test] fn test_finished_pdu_insertion_rejected() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, false, ); tb.check_dest_file = false; let mut user = tb.test_user_from_cached_paths(0); let finished_pdu = FinishedPduCreator::new_no_error( PduHeader::new_for_file_directive(CommonPduConfig::default(), 0), DeliveryCode::Complete, FileStatus::Retained, ); let finished_pdu_raw = finished_pdu.to_vec().unwrap(); let packet_info = PduRawWithInfo::new(&finished_pdu_raw).unwrap(); let error = tb.handler.state_machine(&mut user, Some(&packet_info)); assert!(error.is_err()); let error = error.unwrap_err(); if let DestError::CantProcessPacketType { pdu_type, directive_type, } = error { assert_eq!(pdu_type, PduType::FileDirective); assert_eq!(directive_type, Some(FileDirectiveType::FinishedPdu)); } } #[test] fn test_metadata_insertion_twice_fails() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, true, ); let mut user = tb.test_user_from_cached_paths(0); tb.generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.check_handler_idle_at_drop = false; tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); let pdu_header = PduHeader::new_for_file_directive(tb.pdu_conf, 0); let metadata_pdu = create_metadata_pdu( &pdu_header, tb.src_path.as_path(), tb.dest_path.as_path(), 0, tb.closure_requested, ); let packet_info = create_packet_info(&metadata_pdu, &mut tb.buf); let error = tb.handler.state_machine(&mut user, Some(&packet_info)); assert!(error.is_err()); let error = error.unwrap_err(); if let DestError::RecvdMetadataButIsBusy = error { } else { panic!("unexpected error: {:?}", error); } } #[test] fn test_checksum_failure_not_acked() { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, true, ); let mut user = tb.test_user_from_cached_paths(file_size); tb.generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); let faulty_file_data = b"Hemlo World!"; assert_eq!( tb.generic_file_data_insert(&mut user, 0, faulty_file_data) .expect("file data insertion failed"), 0 ); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); let sent_packets = tb .generic_eof_no_error(&mut user, file_data.into()) .expect("EOF no error insertion failed"); // FSM enters check limit algorithm here, so no finished PDU is created. assert_eq!(sent_packets, 0); let transaction_id = tb.handler.transaction_id().unwrap(); let mut fault_hook = tb.handler.local_cfg.user_fault_hook().borrow_mut(); assert!(fault_hook.notice_of_suspension_queue.is_empty()); // The file checksum failure is ignored by default and check limit handling is now // performed. let ignored_queue = &mut fault_hook.ignored_queue; assert_eq!(ignored_queue.len(), 1); let cancelled = ignored_queue.pop_front().unwrap(); assert_eq!(cancelled.transaction_id(), transaction_id); assert_eq!( cancelled.condition_code(), ConditionCode::FileChecksumFailure ); assert_eq!(cancelled.progress(), file_size); drop(fault_hook); tb.state_check( State::Busy, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); tb.set_check_timer_expired(); tb.handler .state_machine_no_packet(&mut user) .expect("fsm error"); tb.state_check( State::Busy, TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling, ); tb.set_check_timer_expired(); tb.handler .state_machine_no_packet(&mut user) .expect("fsm error"); tb.state_check(State::Idle, TransactionStep::Idle); // Transaction is cancelled because the check limit is reached. let mut fault_hook = tb.handler.local_cfg.user_fault_hook().borrow_mut(); let cancelled_queue = &mut fault_hook.notice_of_cancellation_queue; assert_eq!(cancelled_queue.len(), 1); let cancelled = cancelled_queue.pop_front().unwrap(); assert_eq!(cancelled.transaction_id(), transaction_id); assert_eq!(cancelled.condition_code(), ConditionCode::CheckLimitReached); assert_eq!(cancelled.progress(), file_size); drop(fault_hook); let sent_pdu = tb.handler.pdu_sender.retrieve_next_pdu().unwrap(); assert_eq!(sent_pdu.pdu_type, PduType::FileDirective); assert_eq!( sent_pdu.file_directive_type, Some(FileDirectiveType::FinishedPdu) ); let finished_pdu = FinishedPduReader::from_bytes(&sent_pdu.raw_pdu).unwrap(); assert_eq!(finished_pdu.file_status(), FileStatus::Retained); assert_eq!( finished_pdu.condition_code(), ConditionCode::CheckLimitReached ); let mut fault_hook = tb.handler.local_cfg.user_fault_hook().borrow_mut(); let ignored_queue = &mut fault_hook.ignored_queue; assert_eq!(ignored_queue.len(), 2); let mut ignored = ignored_queue.pop_front().unwrap(); assert_eq!(ignored.transaction_id(), transaction_id); assert_eq!(ignored.condition_code(), ConditionCode::FileChecksumFailure); assert_eq!(ignored.progress(), file_size); ignored = ignored_queue.pop_front().unwrap(); assert_eq!(ignored.transaction_id(), transaction_id); assert_eq!(ignored.condition_code(), ConditionCode::FileChecksumFailure); assert_eq!(ignored.progress(), file_size); assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Incomplete); assert!(finished_pdu.fault_location().is_some()); assert_eq!( *finished_pdu.fault_location().unwrap().entity_id(), REMOTE_ID.into() ); assert_eq!(finished_pdu.fs_responses_raw(), &[]); assert!(tb.handler.pdu_sender.queue_empty()); user.verify_finished_indication_retained( DeliveryCode::Incomplete, ConditionCode::CheckLimitReached, transaction_id, ); tb.expected_full_data = faulty_file_data.to_vec(); } #[test] fn test_file_copy_to_directory() { let fault_handler = TestFaultHandler::default(); let src_path = tempfile::TempPath::from_path("/tmp/test.txt").to_path_buf(); let dest_path = tempfile::TempDir::new().unwrap(); let mut dest_path_buf = dest_path.keep(); let mut tb = DestHandlerTestbench::new( fault_handler, TransmissionMode::Unacknowledged, false, false, src_path.clone(), dest_path_buf.clone(), ); dest_path_buf.push(src_path.file_name().unwrap()); tb.dest_path = dest_path_buf; let mut user = tb.test_user_from_cached_paths(0); tb.generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_eof_no_error(&mut user, Vec::new()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut user); } #[test] fn test_tranfer_cancellation_empty_file_with_eof_pdu() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, false, ); let mut user = tb.test_user_from_cached_paths(0); let transfer_info = tb .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); let pdu_header = PduHeader::new_for_file_directive(tb.pdu_conf, 0); let cancel_eof = EofPdu::new( pdu_header, ConditionCode::CancelRequestReceived, 0, 0, Some(EntityIdTlv::new(LOCAL_ID.into())), ); let packets = tb .handler .state_machine( &mut user, Some(&PduRawWithInfo::new(&cancel_eof.to_vec().unwrap()).unwrap()), ) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 0); user.verify_finished_indication_retained( DeliveryCode::Complete, ConditionCode::CancelRequestReceived, transfer_info.id, ); } fn generic_tranfer_cancellation_partial_file_with_eof_pdu( with_closure: bool, insert_packet: bool, ) { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = 5; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, with_closure, ); let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); if insert_packet { tb.generic_file_data_insert(&mut user, 0, &file_data[0..5]) .expect("file data insertion failed"); } let mut digest = CRC_32.digest(); digest.update(&file_data[0..5]); let checksum = digest.finalize(); let pdu_header = PduHeader::new_for_file_directive(tb.pdu_conf, 0); let cancel_eof = EofPdu::new( pdu_header, ConditionCode::CancelRequestReceived, checksum, 5, Some(EntityIdTlv::new(LOCAL_ID.into())), ); let packets = tb .handler .state_machine( &mut user, Some(&PduRawWithInfo::new(&cancel_eof.to_vec().unwrap()).unwrap()), ) .expect("state machine call with EOF insertion failed"); if with_closure { assert_eq!(packets, 1); let finished_pdu = tb.handler.pdu_sender.retrieve_next_pdu().unwrap(); assert_eq!(finished_pdu.pdu_type, PduType::FileDirective); assert_eq!( finished_pdu.file_directive_type.unwrap(), FileDirectiveType::FinishedPdu ); let finished_pdu = FinishedPduReader::from_bytes(&finished_pdu.raw_pdu).unwrap(); assert_eq!( finished_pdu.condition_code(), ConditionCode::CancelRequestReceived ); if insert_packet { // Checksum success, so data is complete. assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Complete); user.verify_finished_indication_retained( DeliveryCode::Complete, ConditionCode::CancelRequestReceived, transfer_info.id, ); } else { // Checksum failure, so data is incomplete. assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Incomplete); tb.check_dest_file = false; let mut fault_hook = tb.handler.local_cfg.user_fault_hook().borrow_mut(); let ignored_queue = &mut fault_hook.ignored_queue; let ignored = ignored_queue.pop_front().unwrap(); assert_eq!(ignored.transaction_id(), transfer_info.id); assert_eq!(ignored.condition_code(), ConditionCode::FileChecksumFailure); assert_eq!(ignored.progress(), 5); user.verify_finished_indication_retained( DeliveryCode::Incomplete, ConditionCode::CancelRequestReceived, transfer_info.id, ); } assert_eq!(finished_pdu.file_status(), FileStatus::Retained); assert_eq!( finished_pdu .fault_location() .expect("no fault location set"), EntityIdTlv::new(LOCAL_ID.into()) ); } else { user.verify_finished_indication_retained( DeliveryCode::Complete, ConditionCode::CancelRequestReceived, transfer_info.id, ); assert_eq!(packets, 0); } tb.expected_file_size = file_size; tb.expected_full_data = file_data[0..file_size as usize].to_vec(); } #[test] fn test_tranfer_cancellation_partial_file_with_eof_pdu_no_closure_complete() { generic_tranfer_cancellation_partial_file_with_eof_pdu(false, true); } #[test] fn test_tranfer_cancellation_partial_file_with_eof_pdu_with_closure_complete() { generic_tranfer_cancellation_partial_file_with_eof_pdu(true, true); } #[test] fn test_tranfer_cancellation_partial_file_with_eof_pdu_with_closure_incomplete() { generic_tranfer_cancellation_partial_file_with_eof_pdu(true, false); } #[test] fn test_tranfer_cancellation_empty_file_with_cancel_api() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, false, ); let mut user = tb.test_user_from_cached_paths(0); let transfer_info = tb .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.handler.cancel_request(&transfer_info.id); let packets = tb .handler .state_machine_no_packet(&mut user) .expect("state machine call with EOF insertion failed"); user.verify_finished_indication_retained( DeliveryCode::Complete, ConditionCode::CancelRequestReceived, transfer_info.id, ); assert_eq!(packets, 0); } #[test] fn test_tranfer_cancellation_empty_file_with_cancel_api_and_closure() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, true, ); let mut user = tb.test_user_from_cached_paths(0); let transfer_info = tb .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.handler.cancel_request(&transfer_info.id); let packets = tb .handler .state_machine_no_packet(&mut user) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 1); let next_pdu = tb.get_next_pdu().unwrap(); assert_eq!(next_pdu.pdu_type, PduType::FileDirective); assert_eq!( next_pdu.file_directive_type.unwrap(), FileDirectiveType::FinishedPdu ); let finished_pdu = FinishedPduReader::new(&next_pdu.raw_pdu).expect("finished pdu read failed"); assert_eq!( finished_pdu.condition_code(), ConditionCode::CancelRequestReceived ); assert_eq!(finished_pdu.file_status(), FileStatus::Retained); // Empty file, so still complete. assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Complete); assert_eq!( finished_pdu.fault_location(), Some(EntityIdTlv::new(REMOTE_ID.into())) ); user.verify_finished_indication_retained( DeliveryCode::Complete, ConditionCode::CancelRequestReceived, transfer_info.id, ); } #[test] fn test_tranfer_cancellation_partial_file_with_cancel_api_and_closure() { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = 5; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, true, ); let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut user, 0, &file_data[0..5]) .expect("file data insertion failed"); tb.handler.cancel_request(&transfer_info.id); let packets = tb .handler .state_machine_no_packet(&mut user) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 1); let next_pdu = tb.get_next_pdu().unwrap(); assert_eq!(next_pdu.pdu_type, PduType::FileDirective); assert_eq!( next_pdu.file_directive_type.unwrap(), FileDirectiveType::FinishedPdu ); let finished_pdu = FinishedPduReader::new(&next_pdu.raw_pdu).expect("finished pdu read failed"); assert_eq!( finished_pdu.condition_code(), ConditionCode::CancelRequestReceived ); assert_eq!(finished_pdu.file_status(), FileStatus::Retained); assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Incomplete); assert_eq!( finished_pdu.fault_location(), Some(EntityIdTlv::new(REMOTE_ID.into())) ); tb.expected_file_size = file_size; tb.expected_full_data = file_data[0..file_size as usize].to_vec(); user.verify_finished_indication_retained( DeliveryCode::Incomplete, ConditionCode::CancelRequestReceived, transfer_info.id, ); } // Only incomplete received files will be removed. #[test] fn test_tranfer_cancellation_file_disposition_not_done_for_empty_file() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, false, ); let remote_cfg_mut = tb .handler .remote_cfg_table .get_mut(LOCAL_ID.value()) .unwrap(); remote_cfg_mut.disposition_on_cancellation = true; let mut user = tb.test_user_from_cached_paths(0); let transfer_info = tb .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.handler.cancel_request(&transfer_info.id); let packets = tb .handler .state_machine_no_packet(&mut user) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 0); user.verify_finished_indication_retained( DeliveryCode::Complete, ConditionCode::CancelRequestReceived, transfer_info.id, ); } #[test] fn test_tranfer_cancellation_file_disposition_not_done_for_incomplete_file() { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Unacknowledged, false, ); tb.check_dest_file = false; let remote_cfg_mut = tb .handler .remote_cfg_table .get_mut(LOCAL_ID.value()) .unwrap(); remote_cfg_mut.disposition_on_cancellation = true; let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_file_data_insert(&mut user, 0, &file_data[0..5]) .expect("file data insertion failed"); tb.handler.cancel_request(&transfer_info.id); let packets = tb .handler .state_machine_no_packet(&mut user) .expect("state machine call with EOF insertion failed"); assert_eq!(packets, 0); // File was disposed. assert!(!Path::exists(tb.dest_path())); assert_eq!(user.finished_indic_queue.len(), 1); user.verify_finished_indication( DeliveryCode::Incomplete, ConditionCode::CancelRequestReceived, transfer_info.id, FileStatus::DiscardDeliberately, ); } fn generic_test_immediate_nak_request(file_flag: LargeFileFlag) { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Acknowledged, false, ); tb.set_large_file_flag(file_flag); tb.remote_cfg_mut().immediate_nak_mode = true; let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); // This should immediately trigger a NAK request for file segment 0..4 assert_eq!( tb.generic_file_data_insert(&mut user, 4, &file_data[4..]) .expect("file data insertion failed"), 1 ); assert_eq!(tb.pdu_queue_len(), 1); let pdu = tb.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); assert_eq!(pdu.file_directive_type.unwrap(), FileDirectiveType::NakPdu); let nak_pdu = NakPduReader::new(&pdu.raw_pdu).unwrap(); assert_eq!(nak_pdu.pdu_header().common_pdu_conf().file_flag, file_flag); assert_eq!(nak_pdu.start_of_scope(), 0); assert_eq!(nak_pdu.end_of_scope(), file_size); let seg_reqs: Vec<(u64, u64)> = nak_pdu.get_segment_requests_iterator().unwrap().collect(); assert_eq!(seg_reqs.len(), 1); assert_eq!(seg_reqs[0], (0, 4)); // We simulate the reply by re-inserting the missing file segment. tb.generic_file_data_insert(&mut user, 0, &file_data[0..4]) .expect("file data insertion failed"); tb.generic_eof_no_error(&mut user, file_data.to_vec()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut user); assert_eq!(tb.pdu_queue_len(), 2); tb.check_eof_ack_pdu(ConditionCode::NoError); tb.check_finished_pdu_success(); tb.acknowledge_finished_pdu(&mut user, &transfer_info); } #[test] fn test_immediate_nak_request() { generic_test_immediate_nak_request(LargeFileFlag::Normal); } #[test] fn test_immediate_nak_request_large_file() { generic_test_immediate_nak_request(LargeFileFlag::Large); } fn generic_test_deferred_nak_request(file_flag: LargeFileFlag) { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Acknowledged, false, ); tb.set_large_file_flag(file_flag); // Disable this, we only want to check the deferred procedure. tb.remote_cfg_mut().immediate_nak_mode = false; let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); assert_eq!( tb.generic_file_data_insert(&mut user, 4, &file_data[4..]) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_eof_no_error(&mut user, file_data.to_vec()) .expect("EOF no error insertion failed"), 2 ); assert_eq!(tb.pdu_queue_len(), 2); tb.check_eof_ack_pdu(ConditionCode::NoError); assert_eq!(tb.pdu_queue_len(), 1); let pdu = tb.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); assert_eq!(pdu.file_directive_type.unwrap(), FileDirectiveType::NakPdu); let nak_pdu = NakPduReader::new(&pdu.raw_pdu).unwrap(); assert_eq!(nak_pdu.start_of_scope(), 0); assert_eq!(nak_pdu.end_of_scope(), file_size); let seg_reqs: Vec<(u64, u64)> = nak_pdu.get_segment_requests_iterator().unwrap().collect(); assert_eq!(seg_reqs.len(), 1); assert_eq!(seg_reqs[0], (0, 4)); // We simulate the reply by re-inserting the missing file segment. tb.generic_file_data_insert(&mut user, 0, &file_data[0..4]) .expect("file data insertion failed"); tb.check_completion_indication_success(&mut user); assert_eq!(tb.pdu_queue_len(), 1); tb.check_finished_pdu_success(); tb.acknowledge_finished_pdu(&mut user, &transfer_info); } #[test] fn test_deferred_nak_request() { generic_test_deferred_nak_request(LargeFileFlag::Normal); } #[test] fn test_deferred_nak_request_large() { generic_test_deferred_nak_request(LargeFileFlag::Large); } #[test] fn test_file_data_before_metadata() { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Acknowledged, false, ); tb.remote_cfg_mut().immediate_nak_mode = true; let mut user = tb.test_user_from_cached_paths(file_size); assert_eq!( tb.generic_file_data_insert(&mut user, 0, file_data) .expect("file data insertion failed"), 1 ); tb.state_check(State::Busy, TransactionStep::WaitingForMetadata); assert_eq!(tb.pdu_queue_len(), 1); let pdu = tb.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); assert_eq!(pdu.file_directive_type.unwrap(), FileDirectiveType::NakPdu); let nak_pdu = NakPduReader::new(&pdu.raw_pdu).unwrap(); assert_eq!(nak_pdu.start_of_scope(), 0); assert_eq!(nak_pdu.end_of_scope(), file_size); let seg_reqs: Vec<(u64, u64)> = nak_pdu.get_segment_requests_iterator().unwrap().collect(); assert_eq!(seg_reqs.len(), 2); // Metadata is re-requested. assert_eq!(seg_reqs[0], (0, 0)); // File data is re-requested. The destination handler can not do anything with the file // data as long as no metadata has been received. Buffering of FD PDUs is not supported. assert_eq!(seg_reqs[1], (0, 12)); let transfer_info = tb .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); // Re-insert missing file data. assert_eq!( tb.generic_file_data_insert(&mut user, 0, file_data) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_eof_no_error(&mut user, file_data.to_vec()) .expect("EOF no error insertion failed"), 2 ); tb.check_completion_indication_success(&mut user); assert_eq!(tb.pdu_queue_len(), 2); tb.check_eof_ack_pdu(ConditionCode::NoError); tb.check_finished_pdu_success(); tb.acknowledge_finished_pdu(&mut user, &transfer_info); } #[test] fn test_eof_before_metadata() { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Acknowledged, false, ); tb.remote_cfg_mut().immediate_nak_mode = true; let mut user = tb.test_user_from_cached_paths(file_size); // We expect the ACK for the EOF and a NAK packet re-requesting the metadata and file data. assert_eq!( tb.generic_eof_no_error(&mut user, file_data.to_vec()) .expect("file data insertion failed"), 2 ); tb.state_check(State::Busy, TransactionStep::WaitingForMetadata); assert_eq!(tb.pdu_queue_len(), 2); tb.check_eof_ack_pdu(ConditionCode::NoError); let pdu = tb.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); assert_eq!(pdu.file_directive_type.unwrap(), FileDirectiveType::NakPdu); let nak_pdu = NakPduReader::new(&pdu.raw_pdu).unwrap(); assert_eq!(nak_pdu.start_of_scope(), 0); assert_eq!(nak_pdu.end_of_scope(), file_size); let seg_reqs: Vec<(u64, u64)> = nak_pdu.get_segment_requests_iterator().unwrap().collect(); assert_eq!(seg_reqs.len(), 2); // Metadata is re-requested. assert_eq!(seg_reqs[0], (0, 0)); // File data is re-requested. The destination handler can not do anything with the file // data as long as no metadata has been received. Buffering of FD PDUs is not supported. assert_eq!(seg_reqs[1], (0, 12)); let transfer_info = tb .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); // Re-insert missing file data. assert_eq!( tb.generic_file_data_insert(&mut user, 0, file_data) .expect("file data insertion failed"), 1 ); tb.check_completion_indication_success(&mut user); assert_eq!(tb.pdu_queue_len(), 1); tb.check_finished_pdu_success(); tb.acknowledge_finished_pdu(&mut user, &transfer_info); } #[test] fn test_nak_limit_reached() { let file_data_str = "Hello World!"; let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Acknowledged, false, ); // Disable this, we only want to check the deferred procedure. tb.remote_cfg_mut().immediate_nak_mode = false; let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); assert_eq!( tb.generic_file_data_insert(&mut user, 4, &file_data[4..]) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_eof_no_error(&mut user, file_data.to_vec()) .expect("EOF no error insertion failed"), 2 ); assert_eq!(tb.pdu_queue_len(), 2); tb.check_eof_ack_pdu(ConditionCode::NoError); assert_eq!(tb.pdu_queue_len(), 1); let pdu = tb.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); assert_eq!(pdu.file_directive_type.unwrap(), FileDirectiveType::NakPdu); let nak_pdu = NakPduReader::new(&pdu.raw_pdu).unwrap(); assert_eq!(nak_pdu.start_of_scope(), 0); assert_eq!(nak_pdu.end_of_scope(), file_size); let seg_reqs: Vec<(u64, u64)> = nak_pdu.get_segment_requests_iterator().unwrap().collect(); assert_eq!(seg_reqs.len(), 1); assert_eq!(seg_reqs[0], (0, 4)); // Let the NAK timer expire tb.set_nak_activity_timer_expired(); assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 1); assert_eq!(tb.pdu_queue_len(), 1); let pdu = tb.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); assert_eq!(pdu.file_directive_type.unwrap(), FileDirectiveType::NakPdu); let nak_pdu = NakPduReader::new(&pdu.raw_pdu).unwrap(); assert_eq!(nak_pdu.start_of_scope(), 0); assert_eq!(nak_pdu.end_of_scope(), file_size); let seg_reqs: Vec<(u64, u64)> = nak_pdu.get_segment_requests_iterator().unwrap().collect(); assert_eq!(seg_reqs.len(), 1); assert_eq!(seg_reqs[0], (0, 4)); // Let the NAK timer expire again. tb.set_nak_activity_timer_expired(); assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 1); assert_eq!(tb.pdu_queue_len(), 1); tb.check_completion_indication_failure( &mut user, ConditionCode::NakLimitReached, FileStatus::Retained, DeliveryCode::Incomplete, ); tb.check_finished_pdu_failure( ConditionCode::NakLimitReached, FileStatus::Retained, DeliveryCode::Incomplete, ); { let mut fault_hook = tb.fault_handler().user_hook.borrow_mut(); assert_eq!(fault_hook.notice_of_cancellation_queue.len(), 1); let cancellation = fault_hook.notice_of_cancellation_queue.pop_front().unwrap(); assert_eq!(cancellation.transaction_id(), transfer_info.id); assert_eq!( cancellation.condition_code(), ConditionCode::NakLimitReached ); assert_eq!(cancellation.progress(), file_size); } tb.acknowledge_finished_pdu(&mut user, &transfer_info); tb.check_dest_file = false; } #[test] fn test_positive_ack_procedure() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Acknowledged, false, ); let mut user = tb.test_user_from_cached_paths(0); let transfer_info = tb .generic_transfer_init(&mut user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_eof_no_error(&mut user, Vec::new()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut user); assert_eq!(tb.pdu_queue_len(), 2); tb.check_eof_ack_pdu(ConditionCode::NoError); tb.check_finished_pdu_success(); tb.set_positive_ack_expired(); // This should cause the PDU to be sent again. assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 1); tb.check_finished_pdu_success(); tb.acknowledge_finished_pdu(&mut user, &transfer_info); } fn generic_positive_ack_test( tb: &mut DestHandlerTestbench, user: &mut TestCfdpUser, ) -> TransferInfo { let transfer_info = tb .generic_transfer_init(user, 0) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); tb.generic_eof_no_error(user, Vec::new()) .expect("EOF no error insertion failed"); tb.check_completion_indication_success(user); assert_eq!(tb.pdu_queue_len(), 2); tb.check_eof_ack_pdu(ConditionCode::NoError); tb.check_finished_pdu_success(); // This should cause the PDU to be sent again. tb.set_positive_ack_expired(); assert_eq!(tb.handler.state_machine_no_packet(user).unwrap(), 1); tb.check_finished_pdu_success(); // Positive ACK limit reached. tb.set_positive_ack_expired(); assert_eq!(tb.handler.state_machine_no_packet(user).unwrap(), 1); tb.check_finished_pdu_failure( ConditionCode::PositiveAckLimitReached, FileStatus::Retained, DeliveryCode::Complete, ); tb.check_completion_indication_failure( user, ConditionCode::PositiveAckLimitReached, FileStatus::Retained, DeliveryCode::Complete, ); { let mut fault_handler = tb.fault_handler().user_hook.borrow_mut(); assert!(!fault_handler.cancellation_queue_empty()); let cancellation = fault_handler .notice_of_cancellation_queue .pop_front() .unwrap(); assert_eq!(cancellation.transaction_id(), transfer_info.id); assert_eq!( cancellation.condition_code(), ConditionCode::PositiveAckLimitReached ); assert_eq!(cancellation.progress(), 0); } transfer_info } #[test] fn test_positive_ack_limit_reached() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Acknowledged, false, ); let mut user = tb.test_user_from_cached_paths(0); let transfer_info = generic_positive_ack_test(&mut tb, &mut user); // Chances are that this one won't work either leading to transfer abandonment, but we // acknowledge it here tb.acknowledge_finished_pdu(&mut user, &transfer_info); } #[test] fn test_positive_ack_limit_reached_with_subsequent_abandonment() { let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Acknowledged, false, ); let mut user = tb.test_user_from_cached_paths(0); let transfer_info = generic_positive_ack_test(&mut tb, &mut user); // This should cause the PDU to be sent again. tb.set_positive_ack_expired(); assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 1); tb.check_finished_pdu_failure( ConditionCode::PositiveAckLimitReached, FileStatus::Retained, DeliveryCode::Complete, ); // Postive ACK limit reached which leads to abandonment. tb.set_positive_ack_expired(); assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 0); { let mut fault_handler = tb.fault_handler().user_hook.borrow_mut(); assert!(!fault_handler.abandoned_queue_empty()); let cancellation = fault_handler.abandoned_queue.pop_front().unwrap(); assert_eq!(cancellation.transaction_id(), transfer_info.id); assert_eq!( cancellation.condition_code(), ConditionCode::PositiveAckLimitReached ); assert_eq!(cancellation.progress(), 0); } } #[test] fn test_multi_segment_nak() { let file_data_str = "Hello Wooorld!"; let file_data = file_data_str.as_bytes(); let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Acknowledged, false, ); // Disable this, we only want to check the deferred procedure. tb.remote_cfg_mut().immediate_nak_mode = false; let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); assert_eq!( tb.generic_file_data_insert(&mut user, 2, &file_data[2..4]) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_file_data_insert(&mut user, 6, &file_data[6..8]) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_file_data_insert(&mut user, 10, &file_data[10..]) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_eof_no_error(&mut user, file_data.to_vec()) .expect("EOF no error insertion failed"), 2 ); assert_eq!(tb.pdu_queue_len(), 2); tb.check_eof_ack_pdu(ConditionCode::NoError); assert_eq!(tb.pdu_queue_len(), 1); let pdu = tb.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); assert_eq!(pdu.file_directive_type.unwrap(), FileDirectiveType::NakPdu); let nak_pdu = NakPduReader::new(&pdu.raw_pdu).unwrap(); assert_eq!(nak_pdu.start_of_scope(), 0); assert_eq!(nak_pdu.end_of_scope(), file_size); let seg_reqs: Vec<(u64, u64)> = nak_pdu.get_segment_requests_iterator().unwrap().collect(); assert_eq!(seg_reqs.len(), 3); assert_eq!(seg_reqs[0], (0, 2)); assert_eq!(seg_reqs[1], (4, 6)); assert_eq!(seg_reqs[2], (8, 10)); // We simulate the reply by re-inserting the missing file segment. tb.generic_file_data_insert(&mut user, 0, &file_data[0..2]) .expect("file data insertion failed"); tb.generic_file_data_insert(&mut user, 4, &file_data[4..6]) .expect("file data insertion failed"); tb.generic_file_data_insert(&mut user, 8, &file_data[8..10]) .expect("file data insertion failed"); tb.check_completion_indication_success(&mut user); assert_eq!(tb.pdu_queue_len(), 1); tb.check_finished_pdu_success(); tb.acknowledge_finished_pdu(&mut user, &transfer_info); } #[test] fn test_multi_packet_nak_sequence_large_file_flag() { let file_data = [0_u8; 64]; let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Acknowledged, false, ); // Disable this, we only want to check the deferred procedure. tb.remote_cfg_mut().immediate_nak_mode = false; let max_packet_len = 80; tb.remote_cfg_mut().max_packet_len = max_packet_len; tb.set_large_file_flag(LargeFileFlag::Large); let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); assert_eq!( NakPduCreatorWithReservedSeqReqsBuf::calculate_max_segment_requests( max_packet_len, &PduHeader::new_for_file_directive(tb.pdu_conf, 0), ) .unwrap(), 3 ); let missing_segs_0: &[(u64, u64)] = &[(0, 2), (4, 6), (8, 10)]; let missing_segs_1: &[(u64, u64)] = &[(12, 14)]; assert_eq!( tb.generic_file_data_insert(&mut user, 2, &file_data[2..4]) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_file_data_insert(&mut user, 6, &file_data[6..8]) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_file_data_insert(&mut user, 10, &file_data[10..12]) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_file_data_insert(&mut user, 14, &file_data[14..file_size as usize]) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_eof_no_error(&mut user, file_data.to_vec()) .expect("EOF no error insertion failed"), 3 ); assert_eq!(tb.pdu_queue_len(), 3); tb.check_eof_ack_pdu(ConditionCode::NoError); let pdu = tb.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); assert_eq!(pdu.file_directive_type.unwrap(), FileDirectiveType::NakPdu); let nak_pdu = NakPduReader::new(&pdu.raw_pdu).unwrap(); assert_eq!(nak_pdu.start_of_scope(), 0); assert_eq!(nak_pdu.end_of_scope(), 10); let seg_reqs: Vec<(u64, u64)> = nak_pdu.get_segment_requests_iterator().unwrap().collect(); assert_eq!(seg_reqs, missing_segs_0); let pdu = tb.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); assert_eq!(pdu.file_directive_type.unwrap(), FileDirectiveType::NakPdu); let nak_pdu = NakPduReader::new(&pdu.raw_pdu).unwrap(); assert_eq!(nak_pdu.start_of_scope(), 10); assert_eq!(nak_pdu.end_of_scope(), file_size); let seg_reqs: Vec<(u64, u64)> = nak_pdu.get_segment_requests_iterator().unwrap().collect(); assert_eq!(seg_reqs, missing_segs_1); for missing_seg in missing_segs_0 { assert_eq!( tb.generic_file_data_insert( &mut user, missing_seg.0, &file_data[missing_seg.0 as usize..missing_seg.1 as usize] ) .expect("file data insertion failed"), 0 ); } for missing_seg in missing_segs_1 { assert_eq!( tb.generic_file_data_insert( &mut user, missing_seg.0, &file_data[missing_seg.0 as usize..missing_seg.1 as usize] ) .expect("file data insertion failed"), 1 ); } tb.check_completion_indication_success(&mut user); assert_eq!(tb.pdu_queue_len(), 1); tb.check_finished_pdu_success(); tb.acknowledge_finished_pdu(&mut user, &transfer_info); } #[test] fn test_multi_packet_nak_sequence_normal_file_flag() { let file_data = [0_u8; 64]; let file_size = file_data.len() as u64; let fault_handler = TestFaultHandler::default(); let mut tb = DestHandlerTestbench::new_with_fixed_paths( fault_handler, TransmissionMode::Acknowledged, false, ); // Disable this, we only want to check the deferred procedure. tb.remote_cfg_mut().immediate_nak_mode = false; let max_packet_len = 50; tb.remote_cfg_mut().max_packet_len = max_packet_len; let mut user = tb.test_user_from_cached_paths(file_size); let transfer_info = tb .generic_transfer_init(&mut user, file_size) .expect("transfer init failed"); tb.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus); assert_eq!( NakPduCreatorWithReservedSeqReqsBuf::calculate_max_segment_requests( max_packet_len, &PduHeader::new_for_file_directive(tb.pdu_conf, 0), ) .unwrap(), 4 ); let missing_segs_0: &[(u64, u64)] = &[(0, 2), (4, 6), (8, 10), (12, 14)]; let missing_segs_1: &[(u64, u64)] = &[(16, 18)]; assert_eq!( tb.generic_file_data_insert(&mut user, 2, &file_data[2..4]) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_file_data_insert(&mut user, 6, &file_data[6..8]) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_file_data_insert(&mut user, 10, &file_data[10..12]) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_file_data_insert(&mut user, 14, &file_data[14..16]) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_file_data_insert(&mut user, 18, &file_data[18..file_size as usize]) .expect("file data insertion failed"), 0 ); assert_eq!( tb.generic_eof_no_error(&mut user, file_data.to_vec()) .expect("EOF no error insertion failed"), 3 ); assert_eq!(tb.pdu_queue_len(), 3); tb.check_eof_ack_pdu(ConditionCode::NoError); let pdu = tb.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); assert_eq!(pdu.file_directive_type.unwrap(), FileDirectiveType::NakPdu); let nak_pdu = NakPduReader::new(&pdu.raw_pdu).unwrap(); assert_eq!(nak_pdu.start_of_scope(), 0); assert_eq!(nak_pdu.end_of_scope(), 14); let seg_reqs: Vec<(u64, u64)> = nak_pdu.get_segment_requests_iterator().unwrap().collect(); assert_eq!(seg_reqs, missing_segs_0); let pdu = tb.get_next_pdu().unwrap(); assert_eq!(pdu.pdu_type, PduType::FileDirective); assert_eq!(pdu.file_directive_type.unwrap(), FileDirectiveType::NakPdu); let nak_pdu = NakPduReader::new(&pdu.raw_pdu).unwrap(); assert_eq!(nak_pdu.start_of_scope(), 14); assert_eq!(nak_pdu.end_of_scope(), file_size); let seg_reqs: Vec<(u64, u64)> = nak_pdu.get_segment_requests_iterator().unwrap().collect(); assert_eq!(seg_reqs, missing_segs_1); for missing_seg in missing_segs_0 { assert_eq!( tb.generic_file_data_insert( &mut user, missing_seg.0, &file_data[missing_seg.0 as usize..missing_seg.1 as usize] ) .expect("file data insertion failed"), 0 ); } for missing_seg in missing_segs_1 { assert_eq!( tb.generic_file_data_insert( &mut user, missing_seg.0, &file_data[missing_seg.0 as usize..missing_seg.1 as usize] ) .expect("file data insertion failed"), 1 ); } tb.check_completion_indication_success(&mut user); assert_eq!(tb.pdu_queue_len(), 1); tb.check_finished_pdu_success(); tb.acknowledge_finished_pdu(&mut user, &transfer_info); } }